本期学习代码如下:
def get_poishow_data(POISHOW_HDFS, event_day_end, num_days):
def read_poishow_data_one_day(current_date):
try:
cur_data = spark.read.csv(f"{POISHOW_HDFS}/{current_date}", sep='\t')\
.withColumn("cuid", F.col("_c0"))\
.filter("cuid is not null")\
.selectExpr("cuid", "'poishow' as poishow")
except Exception as e:
print(f"Error processing date {current_date}: {e}")
return None
return cur_data
date_list = [(datetime.datetime.strptime(event_day_end, '%Y%m%d') - datetime.timedelta(days=index)).strftime('%Y%m%d') for index in range(num_days)]
with ThreadPoolExecutor() as executor:
poi_show_data_list = list(filter(None, executor.map(read_poishow_data_one_day, date_list)))
poishow_data = poi_show_data_list[0]
for data in poi_show_data_list[1:]:
poishow_data = poishow_data.union(data)
poishow_data = poishow_data\
.dropDuplicates(subset=['cuid'])
return poishow_data
我会以一个具体的例子详细说明get_poishow_data
函数在处理数据时的每一步变化。假设我们有三个连续的日期(20240728, 20240729, 20240730),每一天都有一个CSV文件存储在HDFS上,文件的内容如下所示:
20240728.csv
cuid001\tinfo1
cuid002\tinfo2
cuid003\tinfo3
20240729.csv
cuid002\tinfo2
cuid003\tinfo3
cuid004\tinfo4
20240730.csv
cuid001\tinfo1
cuid004\tinfo4
cuid005\tinfo5
第1步:定义日期列表
- 函数首先计算出需要处理的日期列表。如果
event_day_end='20240730'
且num_days=3
,日期列表将是['20240728', '20240729', '20240730']
。
第2步:并行读取和处理每天的数据
- 使用多线程,针对每个日期并行执行
read_poishow_data_one_day
函数。现在让我们分别看看每天的数据处理细节:
处理20240728.csv
- 读取CSV文件。
- 将第一列重命名为
cuid
。 - 筛选掉
cuid
为空的行(此例中没有空行)。 - 添加固定的列
poishow
,其值为”poishow”。 - 结果DataFrame:
+-------+--------+ | cuid | poishow| +-------+--------+ |cuid001| poishow| |cuid002| poishow| |cuid003| poishow| +-------+--------+
处理20240729.csv
- 同上,处理后结果DataFrame:
+-------+--------+ | cuid | poishow| +-------+--------+ |cuid002| poishow| |cuid003| poishow| |cuid004| poishow| +-------+--------+
处理20240730.csv
- 同上,处理后结果DataFrame:
+-------+--------+ | cuid | poishow| +-------+--------+ |cuid001| poishow| |cuid004| poishow| |cuid005| poishow| +-------+--------+
第3步:合并数据
- 使用
union
将从不同日期读取的数据合并在一起。合并后的DataFrame如下:+-------+--------+ | cuid | poishow| +-------+--------+ |cuid001| poishow| |cuid002| poishow| |cuid003| poishow| |cuid002| poishow| |cuid003| poishow| |cuid004| poishow| |cuid001| poishow| |cuid004| poishow| |cuid005| poishow| +-------+--------+
第4步:删除重复数据
- 使用
dropDuplicates
根据cuid
列删除重复记录,确保每个cuid
只出现一次。最终的DataFrame如下:+-------+--------+ | cuid | poishow| +-------+--------+ |cuid001| poishow| |cuid002| poishow| |cuid003| poishow| |cuid004| poishow| |cuid005| poishow| +-------+--------+
返回结果
- 函数返回处理和清洗后的DataFrame,每个
cuid
唯一,每个条目都标记为”poishow”。
这个过程展示了如何从分布式文件系统中读取多天的数据,合并和清理数据,并为进一步分析提供一个干净的数据集。
后记
实际上,在提供的代码示例中,只涉及到了用户ID(cuid
)和一个固定的标签(poishow
或fansou
),而没有包含其他可能有用的信息。这样的数据处理可能主要是为了以下几个目的:
- 统计特定事件的用户参与度:例如,
poishow
和fansou
可能代表不同的活动或产品,代码的目的可能是为了统计在指定日期范围内参与每种活动的唯一用户数量。 - 数据清洗和去重:从不同日期中提取数据,并确保所有数据合并后没有重复的用户ID,这对于进行准确的用户行为分析是非常重要的。
- 简化数据集:有时候,为了特定的分析目的,可能需要简化数据集,仅保留关键的标识符和类别标签。