apache-spark - 多次运行时在同一组数据上不同的Pyspark代码输出

我已经编写了一个pyspark函数,但是当我多次运行时,每次输出不同时都会给我
在同一组输入数据上。

-pyspark函数

def give_percentile(plat,metrics,perc):
df_perc = df_final.filter(df_final.platform.like('%' + plat + '%'))
df_perc = df_perc.filter(df_final[metrics]!=0)
percentile_val = df_perc.approxQuantile(metrics, [perc], 0.05)
if len(percentile_val)>0:
    percentile_val = float(percentile_val[0])
else:
    percentile_val = float(0)
return percentile_val

调用函数
   df_agg = sqlContext.createDataFrame([Row(platform='iOS',
                                        percentile_page_load_50=give_percentile("iOS","page_load",0.5),
                                        percentile_time_diff_50=give_percentile("iOS","session_duration",0.5)),
                                        Row(platform='Android',
                                        percentile_page_load_50=give_percentile("Android","page_load",0.5),
                                        percentile_time_diff_50=give_percentile("Android","session_duration",0.5)),
                                        Row(platform='Web',
                                        percentile_page_load_50=give_percentile("Web","page_load",0.5),
                                        percentile_time_diff_50=give_percentile("Web","session_duration",0.5)))

Spark Job提交:-
    spark-submit --deploy-mode cluster  --executor-cores 4 --executor-memory 12G --driver-cores 4 --driver-memory 12G --conf spark.driver.extraJavaOptions=-Duser.timezone=UTC --conf spark.executor.extraJavaOptions=-Duser.timezone=UTC "path"

我们以Parquet文件格式存储pyspark代码的输出,并在此之上创建impala表,如下所示:

1.从表名称1 a中选择a.percentile_page_load_50,a.percentile_time_diff_50,其中a.platform ='Colvalue'并
a.dt ='20190501'限制5;
表记录数= 22093826

输出= 0.62400001287460327
0.35100001096725464

2.从表名称2a中选择a.percentile_page_load_50,a.percentile_time_diff_50,其中a.platform ='Colvalue',
a.dt ='20190501'限制5;
表记录数= 22093826

输出= 0.61500000953674316
0.28499999642372131
3.从表名3a中选择a.percentile_page_load_50,a.percentile_time_diff_50,其中a.platform ='Colvalue',
a.dt ='20190501'限制5;
表记录数= 22093826

输出= 0.61799997091293335
0.27799999713897705

现在在这里,Tablename1,Tablename2和Tablename3是在同一组输入数据上多次运行pyspark代码的输出。
但由于我们的pyspark代码在cluser模式/分布式模式下运行,因此值仍然不同。当我们检查样本数据时
独立模式下,其值未更改。
因此,您能否在这里帮助我,并告诉我上述功能代码或任何其他群集问题有什么问题?

最佳答案

根据给定的relativeError函数,roximateile函数可为您提供近似的解决方案。您将roximateQuantile函数的允许relativeError设置为0.05,这意味着它仅在以下范围内具有确定性:

“如果DataFrame具有N个元素,并且如果我们以概率p要求分位数直到错误err,则该算法将从DataFrame返回样本x ,以使x的精确等级接近(p * N)。 ” (我强调了为什么您得到不同结果的部分)。

如果需要精确的分位数,则必须将relativeError设置为0.0,但这也会增加运行时间。
可以在documentation中找到更多信息。

https://stackoverflow.com/questions/56215992/

相关文章:

hadoop - hadoop 目录与 hadoop-x.x.x 有何不同

docker - Docker构建输入/输出错误

hadoop - 如何编辑Hadoop存储日志文件的位置?

scala - Hadoop copyMerge无法正常工作:scala

amazon-web-services - 在具有默认配置的EMR群集模式下会发生什么?

docker - 外部服务在运行时设置的转发容器端口

docker - 如何在家庭 Web 服务器上托管 Ghost 博客

bash - 为什么parse_git_branch仅在Docker终端窗口中失败

apache-spark - yarn 服务器重启后如何在Spark Web-UI中保留完成的应用程

hadoop - hdfs:现有文件上的 “No such file or directory”