所以我试图加载csv文件推断自定义架构,但每次我最终得到以下错误:
Project_Bank.csv不是Parquet文件.尾部的预期幻数[80,65,82,49],但发现[110,111,13,10]
这是我的程序的样子和我的csv文件条目,
年龄,职业,婚姻,教育,默认值;平衡;住房;贷款;联系;天;月;持续时间;活动; pdays;以前; poutcome; Y
58;管理;已婚;叔胺;无; 2143;是;无;未知; 5;可; 261; 1; 1; 0;未知;无
44;技师;单;次级;无; 29;是;无;未知; 5;可; 151; 1; 1; 0;未知;无
33,企业家;已婚;次级;无; 2;是;是;未知; 5;可; 76; 1; 1; 0;未知;无
我的代码:
$spark-shell –packages com.databricks:spark-csv_2.10:1.5.0
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import org.apache.spark.sql.types._
import org.apache.spark.sql.SQLContext
import sqlContext.implicits._
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
val bankSchema = StructType(Array(
StructField("age", IntegerType, true),
StructField("job", StringType, true),
StructField("marital", StringType, true),
StructField("education", StringType, true),
StructField("default", StringType, true),
StructField("balance", IntegerType, true),
StructField("housing", StringType, true),
StructField("loan", StringType, true),
StructField("contact", StringType, true),
StructField("day", IntegerType, true),
StructField("month", StringType, true),
StructField("duration", IntegerType, true),
StructField("campaign", IntegerType, true),
StructField("pdays", IntegerType, true),
StructField("previous", IntegerType, true),
StructField("poutcome", StringType, true),
StructField("y", StringType, true)))
val df = sqlContext.
read.
schema(bankSchema).
option("header", "true").
option("delimiter", ";").
load("/user/amit.kudnaver_gmail/hadoop/project_bank/Project_Bank.csv").toDF()
df.registerTempTable("people")
df.printSchema()
val distinctage = sqlContext.sql("select distinct age from people")
任何建议为什么在推送正确的架构后无法使用csv文件.在此先感谢您的建议.
谢谢
阿米特K.
最佳答案 这里的问题是Data Frame在处理它时需要Parquet文件.为了处理CSV中的数据.在这里你可以做什么.
首先,从数据中删除标题行.
58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no
33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no
接下来,我们编写以下代码来读取数据.
创建案例类
case class BankSchema(age: Int, job: String, marital:String, education:String, default:String, balance:Int, housing:String, loan:String, contact:String, day:Int, month:String, duration:Int, campaign:Int, pdays:Int, previous:Int, poutcome:String, y:String)
从HDFS读取数据并解析它
val bankData = sc.textFile("/user/myuser/Project_Bank.csv").map(_.split(";")).map(p => BankSchema(p(0).toInt, p(1), p(2),p(3),p(4), p(5).toInt, p(6), p(7), p(8), p(9).toInt, p(10), p(11).toInt, p(12).toInt, p(13).toInt, p(14).toInt, p(15), p(16))).toDF()
然后注册表并执行查询.
bankData.registerTempTable("bankData")
val distinctage = sqlContext.sql("select distinct age from bankData")
这是输出的样子
+---+
|age|
+---+
| 33|
| 44|
| 58|
+---+