源码

首页 » 归档 » 源码 » python – 如何在pySpark数据帧中添加行ID [复…

python – 如何在pySpark数据帧中添加行ID [复…


参见英文答案 > Primary keys with Apache Spark????????????????????????????????????3个
我有一个csv文件;我在pyspark中转换为DataFrame(df);经过一番改造;我想在df中添加一列;这应该是简单的行id(从0或1开始到N).

我在rdd中转换了df并使用“zipwithindex”.我将生成的rdd转换回df.这种方法有效,但它产生了250k的任务,并且需要花费大量的时间来执行.我想知道是否还有其他方法可以减少运行时间.

以下是我的代码片段;我正在处理的csv文件很大;包含数十亿行.

debug_csv_rdd = (sc.textFile("debug.csv")
  .filter(lambda x: x.find('header') == -1)
  .map(lambda x : x.replace("NULL","0")).map(lambda p: p.split(','))
  .map(lambda x:Row(c1=int(x[0]),c2=int(x[1]),c3=int(x[2]),c4=int(x[3]))))

debug_csv_df = sqlContext.createDataFrame(debug_csv_rdd)
debug_csv_df.registerTempTable("debug_csv_table")
sqlContext.cacheTable("debug_csv_table")

r0 = sqlContext.sql("SELECT c2 FROM debug_csv_table WHERE c1 = 'str'")
r0.registerTempTable("r0_table")

r0_1 = (r0.flatMap(lambda x:x)
    .zipWithIndex()
    .map(lambda x: Row(c1=x[0],id=int(x[1]))))

r0_df=sqlContext.createDataFrame(r0_2)
r0_df.show(10) 
(0)

本文由 投稿者 创作,文章地址:https://blog.isoyu.com/archives/python-ruhezaipysparkshujuzhenzhongtianjiaxingid-fu.html
采用知识共享署名4.0 国际许可协议进行许可。除注明转载/出处外,均为本站原创或翻译,转载前请务必署名。最后编辑时间为:9 月 26, 2019 at 07:44 上午

热评文章