我正在使用datastax提供的spark-cassandra-connector 1.1.0.我注意到了互联网问题,我不确定为什么会发生这样的事情:
当我广播cassandra连接器并尝试在执行程序上使用它时,我收到异常,表明我的配置无效,无法连接到0.0.0的Cassandra.
示例堆栈跟踪:
java.io.IOException: Failed to open native connection to Cassandra at {0.0.0.0}:9042
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:174)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:160)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:71)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:97)
...
但如果我在没有广播的情况下使用它,一切都运行正常
对我来说也很奇怪,在驱动程序端广播值打印正确配置但在执行程序端没有.
司机方:
val dbConf = ssc.sparkContext.getConf
val connector = CassandraConnector(dbConf)
println(connector.hosts) //Set(10.20.1.5)
val broadcastedConnector = ssc.sparkContext.broadcast(connector)
println(broadcastedConnector.value.hosts) //Set(10.20.1.5)
执行方:
mapPartition{
...
println(broadcastedConnector.hosts) // Set(0.0.0.)
...
}
有人可以解释为什么它以这种方式工作以及如何以可以在执行器端使用的方式广播Cassandra连接器.
更新1.2.3版本的连接器中存在同样的问题.
最佳答案 没有理由播放Cassandra Connector.在并行化闭包中使用它只会序列化配置并在执行程序上创建新连接,或者使用现有的执行程序连接(如果存在).