apache spark - Persist RDD as Avro File -
i have written sample program persist rdd avro file.
i using cdh 5.4 spark 1.3
i wrote avsc file , generated code class user
{"namespace": "com.abhi", "type": "record", "name": "user", "fields": [ {"name": "firstname", "type": "string"}, {"name": "lastname", "type": "string"} ] }
then generated code user
java -jar ~/downloads/avro-tools-1.7.7.jar compile schema user.avsc .
the wrote example
package com.abhi import org.apache.hadoop.mapreduce.job import org.apache.spark.sparkconf import org.apache.avro.generic.genericrecord import org.apache.avro.mapred.avrokey import org.apache.avro.mapreduce.{avrokeyoutputformat, avrojob, avrokeyinputformat} import org.apache.hadoop.io.nullwritable import org.apache.spark.sparkcontext object myspark { def main(args : array[string]) : unit = { val sf = new sparkconf() .setmaster("local[2]") .setappname("myspark") val sc = new sparkcontext(sf) val user1 = new user(); user1.setfirstname("test1"); user1.setlastname("test2"); val user2 = new user("test3", "test4"); // construct via builder val user3 = user.newbuilder() .setfirstname("test5") .setlastname("test6") .build() val list = array(user1, user2, user3) val userrdd = sc.parallelize(list) val job: job = job.getinstance() avrojob.setoutputkeyschema(job, user1.getschema) val output = "/user/cloudera/users.avro" userrdd.map(row => (new avrokey(row), nullwritable.get())) .saveasnewapihadoopfile( output, classof[avrokey[user]], classof[nullwritable], classof[avrokeyoutputformat[user]], job.getconfiguration) } }
i have 2 concerns code
some of imports old mapreduce api , wonder why required spark code
import org.apache.hadoop.mapreduce.job import org.apache.avro.mapred.avrokey import org.apache.avro.mapreduce.{avrokeyoutputformat, avrojob, avrokeyinputformat}
the code throws exception when submit hadoop cluster create empty directory called /user/cloudera/users.avro in hdfs
15/11/01 08:20:42 info configuration.deprecation: mapred.output.dir deprecated. instead, use mapreduce.output.fileoutputformat.outputdir 15/11/01 08:20:42 info output.fileoutputcommitter: file output committer algorithm version 1 15/11/01 08:20:42 info spark.sparkcontext: starting job: saveasnewapihadoopfile @ myspark.scala:52 15/11/01 08:20:42 info scheduler.dagscheduler: got job 1 (saveasnewapihadoopfile @ myspark.scala:52) 2 output partitions (allowlocal=false) 15/11/01 08:20:42 info scheduler.dagscheduler: final stage: stage 1(saveasnewapihadoopfile @ myspark.scala:52) 15/11/01 08:20:42 info scheduler.dagscheduler: parents of final stage: list() 15/11/01 08:20:42 info scheduler.dagscheduler: missing parents: list() 15/11/01 08:20:42 info scheduler.dagscheduler: submitting stage 1 (mappartitionsrdd[2] @ map @ myspark.scala:51), has no missing parents 15/11/01 08:20:42 info storage.memorystore: ensurefreespace(66904) called curmem=301745, maxmem=280248975 15/11/01 08:20:42 info storage.memorystore: block broadcast_2 stored values in memory (estimated size 65.3 kb, free 266.9 mb) 15/11/01 08:20:42 info storage.memorystore: ensurefreespace(23066) called curmem=368649, maxmem=280248975 15/11/01 08:20:42 info storage.memorystore: block broadcast_2_piece0 stored bytes in memory (estimated size 22.5 kb, free 266.9 mb) 15/11/01 08:20:42 info storage.blockmanagerinfo: added broadcast_2_piece0 in memory on localhost:34630 (size: 22.5 kb, free: 267.2 mb) 15/11/01 08:20:42 info storage.blockmanagermaster: updated info of block broadcast_2_piece0 15/11/01 08:20:42 info spark.sparkcontext: created broadcast 2 broadcast @ dagscheduler.scala:839 15/11/01 08:20:42 info scheduler.dagscheduler: submitting 2 missing tasks stage 1 (mappartitionsrdd[2] @ map @ myspark.scala:51) 15/11/01 08:20:42 info scheduler.taskschedulerimpl: adding task set 1.0 2 tasks 15/11/01 08:20:42 error scheduler.tasksetmanager: failed serialize task 1, not attempting retry it. java.lang.reflect.invocationtargetexception @ sun.reflect.nativemethodaccessorimpl.invoke0(native method) @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57) @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43) @ java.lang.reflect.method.invoke(method.java:606) @ org.apache.spark.serializer.serializationdebugger$objectstreamclassmethods$.getobjfieldvalues$extension(serializationdebugger.scala:240) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:150) @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99) @ org.apache.spark.serializer.serializationdebugger$.find(serializationdebugger.scala:58) @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:39) @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47) @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80) @ org.apache.spark.scheduler.task$.serializewithdependencies(task.scala:149) @ org.apache.spark.scheduler.tasksetmanager.resourceoffer(tasksetmanager.scala:464) @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$org$apache$spark$scheduler$taskschedulerimpl$$resourceoffersingletaskset$1.apply$mcvi$sp(taskschedulerimpl.scala:232) @ scala.collection.immutable.range.foreach$mvc$sp(range.scala:141) @ org.apache.spark.scheduler.taskschedulerimpl.org$apache$spark$scheduler$taskschedulerimpl$$resourceoffersingletaskset(taskschedulerimpl.scala:227) @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3$$anonfun$apply$6.apply(taskschedulerimpl.scala:296) @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3$$anonfun$apply$6.apply(taskschedulerimpl.scala:294) @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33)
problem spark can't serialise user
class, try setting kryoconfigurator
, registering class there.
Comments
Post a Comment