1. 程式人生 > >spark讀取elasticsearch中陣列型別的欄位

spark讀取elasticsearch中陣列型別的欄位

之前做的一個專案需要用sparksql讀取elasticsearch的資料,當讀取的型別中包含陣列時報錯.

讀取方式大概是

val options = Map("pushdown" -> "true",
  "strict" -> "false",
  "es.nodes" -> "127.0.0.1",
  "es.port" -> "9200")
val df = spark.read.format("es").options(options).load("spark/scorearray")

報錯資訊如下:

WARN ScalaRowValueReader: Field 'array' is backed by an array but the associated Spark Schema does not reflect this;
              (use es.read.field.as.array.include/exclude) 
ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 3)
java.lang.ClassCastException: scala.collection.convert.Wrappers$JListWrapper cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:194)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

WARN TaskSetManager: Lost task 2.0 in stage 1.0 (TID 3, localhost, executor driver): java.lang.ClassCastException: scala.collection.convert.Wrappers$JListWrapper cannot be cast to java.lang.Long
    at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:105)
    at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow$class.getLong(rows.scala:42)
    at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getLong(rows.scala:194)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:395)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:234)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:228)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:827)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

錯誤原因:

我先是看了看官方文件

這裡有spark的型別與es的型別對照表,但沒有找到關於不能讀取陣列的問題描述。

後來終於找到原因(我忘記了是在哪裡看到的了,但應該是es官方文件的某個地方),是因為es的mapping只會記錄欄位的型別,不會記錄是否是陣列,也就是說如果是int陣列,es的mapping只是記錄成int。當sparksql讀取的規範是先獲取資料型別,定義好dataframe的格式,然後再從資料來源抽取資料。這就導致dataframe的某個欄位型別是int,但讀取資料的時候硬生生想把int陣列放進去,當然就報錯了。

解決方法:

在options里加一個es.read.field.as.array.include,標明陣列欄位

val options = Map("pushdown" -> "true",
  "strict" -> "false",
  "es.nodes" -> "127.0.0.1",
  "es.port" -> "9200",
  "es.read.field.as.array.include" -> "陣列欄位的名字")

如果是object裡的某個欄位,寫成"object名字.陣列欄位名字",如果是多個欄位,欄位名之間用逗號分隔