如何将SFTP服务器中的文件加载到spark RDD中.加载此文件后,我需要对数据执行一些过滤.该文件也是csv文件,所以请你帮我决定是否应该使用Dataframes或RDDs. 最佳答案 您可以通过以下方式在程序中使用spark-sftp库:
对于Spark 2.x
Maven依赖
<dependency>
<groupId>com.springml</groupId>
<artifactId>spark-sftp_2.11</artifactId>
<version>1.1.0</version>
</dependency>
SBT依赖
libraryDependencies += "com.springml" % "spark-sftp_2.11" % "1.1.0"
与Spark shell一起使用
可以使用–packages命令行选项将此包添加到Spark.例如,要在启动spark shell时包含它:
$bin/spark-shell --packages com.springml:spark-sftp_2.11:1.1.0
Scala API
// Construct Spark dataframe using file in FTP server
val df = spark.read.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
option("inferSchema", "true").
load("/ftp/files/sample.csv")
// Write dataframe as CSV file to FTP server
df.write.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
save("/ftp/files/sample.csv")
适用于Spark 1.x(1.5)
Maven依赖
<dependency>
<groupId>com.springml</groupId>
<artifactId>spark-sftp_2.10</artifactId>
<version>1.0.2</version>
</dependency>
SBT依赖
libraryDependencies += "com.springml" % "spark-sftp_2.10" % "1.0.2"
与Spark shell一起使用
可以使用–packages命令行选项将此包添加到Spark.例如,要在启动spark shell时包含它:
$bin/spark-shell --packages com.springml:spark-sftp_2.10:1.0.2
Scala API
import org.apache.spark.sql.SQLContext
// Construct Spark dataframe using file in FTP server
val sqlContext = new SQLContext(sc)
val df = sqlContext.read.
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
option("inferSchema", "true").
load("/ftp/files/sample.csv")
// Write dataframe as CSV file to FTP server
df.write().
format("com.springml.spark.sftp").
option("host", "SFTP_HOST").
option("username", "SFTP_USER").
option("password", "****").
option("fileType", "csv").
save("/ftp/files/sample.csv")
有关spark-sftp的更多信息,您可以访问github页面springml/spark-sftp