如何用Direct Spark Cassandra表数据操作替换JAVA循环

我试图让我的代码更高效,因为我必须在cassandra中处理数十亿行数据.我目前在Datastax Cassandra Spark Connector中使用JAVA循环来提取数据并将其放入我所使用的格式中熟悉
(multimap)以获得操作的火花.我希望能够用cassandra表的直接火花操作替换这个Multimap循环,以节省时间并使一切更有效.我非常感谢任何代码建议实现这一点.这是我现有的代码:

        Statement stmt = new SimpleStatement("SELECT \"Power\",\"Bandwidth\",\"Start_Frequency\" FROM \"SB1000_49552019\".\"Measured_Value\";");
        stmt.setFetchSize(2000000);
        ResultSet results = session.execute(stmt);

// Get the Variables from each Row of Cassandra Data        
        Multimap<Double, Float> data = LinkedListMultimap.create();
        for (Row row : results){       
           // Column Names in Cassandra (Case Sensitive)
           start_frequency = row.getDouble("Start_Frequency");
           power = row.getFloat("Power");
           bandwidth = row.getDouble("Bandwidth"); 

// Create Channel Power Buckets    
                for(channel = 1.6000E8; channel <= channel_end;  ){ 
                    if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) {     
                     data.put(channel, power);
                    }  // end if
                    channel+=increment;
                }  // end for      
        } // end "row" for

// Create Spark List for DataFrame        
        List<Value> values = data.asMap().entrySet()
            .stream()
            .flatMap(x -> x.getValue()
                    .stream()
                    .map(y -> new Value(x.getKey(), y)))
            .collect(Collectors.toList());

// Create DataFrame and Calculate Results
    sqlContext.createDataFrame(sc.parallelize(values), Value.class).groupBy(col("channel"))
        .agg(min("power"), max("power"), avg("power"))
        .write().mode(SaveMode.Append)      
        .option("table", "results")
        .option("keyspace", "model")
        .format("org.apache.spark.sql.cassandra").save();

    } // end session
} // End Compute 

最佳答案

JavaRDD<MeasuredValue> rdd = javaFunctions(sc).cassandraTable("SB1000_47130646", "Measured_Value", mapRowTo(MeasuredValue.class));
JavaRDD<Value> valueRdd = rdd.flatMap(new FlatMapFunction<MeasuredValue, Value>(){
@Override 
public Iterable<Value> call(MeasuredValue row) throws Exception { 
double start_frequency = row.getStart_frequency(); 
float power = row.getPower(); 
double bandwidth = row.getBandwidth(); 

// Define Variable 
double channel,channel_end, increment;  

// Initialize Variables 
channel_end = 1.6159E8; 
increment = 5000; 

List<Value> list = new ArrayList<Value>(); 
// Create Channel Power Buckets 
for(channel = 1.6000E8; channel <= channel_end; ){ 
if( (channel >= start_frequency) && (channel <= (start_frequency + bandwidth)) ) { 
list.add(new Value(channel, power)); 
} // end if 
channel+=increment; 
} // end for 

return list; 
}    
    });

    sqlContext.createDataFrame(valueRdd, Value.class).groupBy(col("channel"))
    .agg(min("power"), max("power"), avg("power"))
    .write().mode(SaveMode.Append)      
    .option("table", "results")
    .option("keyspace", "model")
    .format("org.apache.spark.sql.cassandra").save();

} // end session

public static class MeasuredValue implements Serializable {

        public MeasuredValue() { }

        private double start_frequency;
        public double getStart_frequency() { return start_frequency; }
        public void setStart_frequency(double start_frequency) { this.start_frequency = start_frequency; }

        private double bandwidth ;
        public double getBandwidth() { return bandwidth; }
        public void setBandwidth(double bandwidth) { this.bandwidth = bandwidth; }

        private float power;    
        public float getPower() { return power; }
        public void setPower(float power) { this.power = power; }

    }
点赞