apache spark - Persist RDD as Avro File -


i have written sample program persist rdd avro file.

i using cdh 5.4 spark 1.3

i wrote avsc file , generated code class user

{"namespace": "com.abhi",  "type": "record",  "name": "user",  "fields": [      {"name": "firstname", "type": "string"},      {"name": "lastname",  "type": "string"} ] } 

then generated code user

java -jar ~/downloads/avro-tools-1.7.7.jar compile schema user.avsc . 

the wrote example

package com.abhi  import org.apache.hadoop.mapreduce.job import org.apache.spark.sparkconf import org.apache.avro.generic.genericrecord import org.apache.avro.mapred.avrokey import org.apache.avro.mapreduce.{avrokeyoutputformat, avrojob, avrokeyinputformat} import org.apache.hadoop.io.nullwritable import org.apache.spark.sparkcontext  object myspark {   def main(args : array[string]) : unit = {     val sf = new sparkconf()       .setmaster("local[2]")       .setappname("myspark")     val sc = new sparkcontext(sf)      val user1 = new user();     user1.setfirstname("test1");     user1.setlastname("test2");      val user2 = new user("test3", "test4");      // construct via builder     val user3 = user.newbuilder()       .setfirstname("test5")       .setlastname("test6")       .build()      val list = array(user1, user2, user3)     val userrdd = sc.parallelize(list)      val job: job = job.getinstance()     avrojob.setoutputkeyschema(job, user1.getschema)      val output = "/user/cloudera/users.avro"     userrdd.map(row => (new avrokey(row), nullwritable.get()))       .saveasnewapihadoopfile(         output,         classof[avrokey[user]],         classof[nullwritable],         classof[avrokeyoutputformat[user]],         job.getconfiguration)   } } 

i have 2 concerns code

some of imports old mapreduce api , wonder why required spark code

import org.apache.hadoop.mapreduce.job import org.apache.avro.mapred.avrokey import org.apache.avro.mapreduce.{avrokeyoutputformat, avrojob,  avrokeyinputformat} 

the code throws exception when submit hadoop cluster create empty directory called /user/cloudera/users.avro in hdfs

15/11/01 08:20:42 info configuration.deprecation: mapred.output.dir deprecated. instead, use mapreduce.output.fileoutputformat.outputdir 15/11/01 08:20:42 info output.fileoutputcommitter: file output committer algorithm version 1 15/11/01 08:20:42 info spark.sparkcontext: starting job: saveasnewapihadoopfile @ myspark.scala:52 15/11/01 08:20:42 info scheduler.dagscheduler: got job 1 (saveasnewapihadoopfile @ myspark.scala:52) 2 output partitions (allowlocal=false) 15/11/01 08:20:42 info scheduler.dagscheduler: final stage: stage 1(saveasnewapihadoopfile @ myspark.scala:52) 15/11/01 08:20:42 info scheduler.dagscheduler: parents of final stage: list() 15/11/01 08:20:42 info scheduler.dagscheduler: missing parents: list() 15/11/01 08:20:42 info scheduler.dagscheduler: submitting stage 1 (mappartitionsrdd[2] @ map @ myspark.scala:51), has no missing parents 15/11/01 08:20:42 info storage.memorystore: ensurefreespace(66904) called curmem=301745, maxmem=280248975 15/11/01 08:20:42 info storage.memorystore: block broadcast_2 stored values in memory (estimated size 65.3 kb, free 266.9 mb) 15/11/01 08:20:42 info storage.memorystore: ensurefreespace(23066) called curmem=368649, maxmem=280248975 15/11/01 08:20:42 info storage.memorystore: block broadcast_2_piece0 stored bytes in memory (estimated size 22.5 kb, free 266.9 mb) 15/11/01 08:20:42 info storage.blockmanagerinfo: added broadcast_2_piece0 in memory on localhost:34630 (size: 22.5 kb, free: 267.2 mb) 15/11/01 08:20:42 info storage.blockmanagermaster: updated info of block broadcast_2_piece0 15/11/01 08:20:42 info spark.sparkcontext: created broadcast 2 broadcast @ dagscheduler.scala:839 15/11/01 08:20:42 info scheduler.dagscheduler: submitting 2 missing tasks stage 1 (mappartitionsrdd[2] @ map @ myspark.scala:51) 15/11/01 08:20:42 info scheduler.taskschedulerimpl: adding task set 1.0 2 tasks 15/11/01 08:20:42 error scheduler.tasksetmanager: failed serialize task 1, not attempting retry it. java.lang.reflect.invocationtargetexception     @ sun.reflect.nativemethodaccessorimpl.invoke0(native method)     @ sun.reflect.nativemethodaccessorimpl.invoke(nativemethodaccessorimpl.java:57)     @ sun.reflect.delegatingmethodaccessorimpl.invoke(delegatingmethodaccessorimpl.java:43)     @ java.lang.reflect.method.invoke(method.java:606)     @ org.apache.spark.serializer.serializationdebugger$objectstreamclassmethods$.getobjfieldvalues$extension(serializationdebugger.scala:240)     @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visitserializable(serializationdebugger.scala:150)     @ org.apache.spark.serializer.serializationdebugger$serializationdebugger.visit(serializationdebugger.scala:99)     @ org.apache.spark.serializer.serializationdebugger$.find(serializationdebugger.scala:58)     @ org.apache.spark.serializer.serializationdebugger$.improveexception(serializationdebugger.scala:39)     @ org.apache.spark.serializer.javaserializationstream.writeobject(javaserializer.scala:47)     @ org.apache.spark.serializer.javaserializerinstance.serialize(javaserializer.scala:80)     @ org.apache.spark.scheduler.task$.serializewithdependencies(task.scala:149)     @ org.apache.spark.scheduler.tasksetmanager.resourceoffer(tasksetmanager.scala:464)     @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$org$apache$spark$scheduler$taskschedulerimpl$$resourceoffersingletaskset$1.apply$mcvi$sp(taskschedulerimpl.scala:232)     @ scala.collection.immutable.range.foreach$mvc$sp(range.scala:141)     @ org.apache.spark.scheduler.taskschedulerimpl.org$apache$spark$scheduler$taskschedulerimpl$$resourceoffersingletaskset(taskschedulerimpl.scala:227)     @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3$$anonfun$apply$6.apply(taskschedulerimpl.scala:296)     @ org.apache.spark.scheduler.taskschedulerimpl$$anonfun$resourceoffers$3$$anonfun$apply$6.apply(taskschedulerimpl.scala:294)     @ scala.collection.indexedseqoptimized$class.foreach(indexedseqoptimized.scala:33) 

problem spark can't serialise user class, try setting kryoconfigurator , registering class there.


Comments

Popular posts from this blog

javascript - Slick Slider width recalculation -

jsf - PrimeFaces Datatable - What is f:facet actually doing? -

angular2 services - Angular 2 RC 4 Http post not firing -