我试图找出Spark中所有非确定性的来源.我知道非确定性可以来自用户提供的功能,例如在地图(f)中,f涉及随机.相反,我正在寻找可能导致非确定性的操作,无论是在较低级别的转换/动作方面,例如改组. 最佳答案 脱离我的头顶…
标签:apache-spark
apache-spark – 如何在pyspark中加载gzip压缩的csv文件?
文件名不以.gz结尾,我无法更改它们,因为它们与其他程序共享. file1.log.gz.processed只是一个csv文件.但是如何在pyspark中读取它,最好是在pyspark.sql中? 我试图指定格式和压缩,…
apache-spark – Apache Spark中的类似风暴的结构
您知道如何在Apache Storm中将Spout流数据传输到多个螺栓.有没有办法在Apache Spark中做类似的事情? 我基本上希望有一个程序从Kafka Queue读取数据并将其输出到2个不同的程序,然后可以用不…
apache-spark – 检查点RDD ReliableCheckpointRDD与原始RDD具有不同数量的分区
我有两台机器的火花簇,当我运行火花流应用程序时,我得到以下错误: Exception in thread "main" org.apache.spark.SparkException: Checkpoint RDD Re…
apache-spark – Spark DStream排序并获取N个元素
我正在使用spark stream从kafka集群中读取数据.我想对DStream对进行排序并单独获得前N个.到目前为止,我已经使用了 val result = ds.reduceByKeyAndWindow((x: D…
apache-spark – 如何在流用例中更新大型广播变量?
我有一个用例,我有一个流作业运行从kafka队列获取输入数据.我有一百万行的参考数据,每小时更新一次.我在驱动程序中加载参考数据,然后将其广播给工作人员.我想更新这个广播变量(在驱动程序中)并将其重新发送给工作人员. 如…
apache-spark – MLlib MatrixFactorizationModel recommendedProducts(user,num)对某些用户失败
我使用 ALS.train()训练了一个 MatrixFactorizationModel模型,现在使用 model.recommendProducts(user, num)来获得推荐的最佳产品,但是代码在某些用户上失败…
apache-spark – spark RDD saveAsTextFile gzip
是否可以将spark rdd文本文件保存为gzip? 我能以某种方式运行它:combPrdGrp3.repartition(10).saveAsTextFile(“Combined”)并将其保存为gzip文件? 最佳答案…
apache-spark – 检查GraphX图形对象
Spark 1.6.1版 创建边缘和顶点RDD val vertices_raw = sqlContext.read.json("vertices.json.gz") val vertices = vertices_ra…
apache-spark – 编写Spark函数,它接受Column参数并返回一个Column
我想写一个与dateiff类似的yeardiff函数. yeardiff应该获取两个Column参数并返回一个Column,其中包含这些参数列之间的年数. 我们使用以下示例数据: val testDf = Seq( ("…
apache-spark – Apache Spark:为什么reduceByKey转换会执行DAG?
我面临一个奇怪的问题.据我所知,Spark中的操作DAG仅在执行操作时执行.但是,我可以看到reduceByKey()opertation(是一个转换)开始执行DAG. 重现步骤 .尝试下面的代码 SparkConf c…
apache-spark – Spark流式传输JavaCustomReceiver
Spark在EMR客户端模式和集群模式下流式传输 java自定义接收器问题. 以下是问题. 当我们在EMR集群(yarn)上运行sparkCrayom的JavaCustomReceiver时,它会随机绑定一个执行器上…