0

In a spark job I join two RDDs,

val data: RDD[(Long, (String, String))] = sc.objectFile[(Long, scala.collection.mutable.HashMap[String, Object])](outputFile)
  .leftOuterJoin(attributionData)

Here outputFile is output of another spark job which process data from hive. One of the tables in hive has 40 million records and when I limit to read table to fetch only 10 million records code works fine. However with full data (if I remove limit()) following error occurs,

10:43:27 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 2, buysub.com): java.lang.NegativeArraySizeException
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:409)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:227)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:228)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:221)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:117)
at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:23)
at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:598)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:566)
at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
at scala.collection.immutable.HashMap$HashMap1.foreach(HashMap.scala:224)
at scala.collection.immutable.HashMap$HashTrieMap.foreach(HashMap.scala:403)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:37)
at com.twitter.chill.Tuple2Serializer.write(TupleSerializers.scala:33)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)
at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:29)
at com.twitter.chill.TraversableSerializer$$anonfun$write$1.apply(Traversable.scala:27)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:27)
at com.twitter.chill.TraversableSerializer.write(Traversable.scala:21)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:568)

I am using Spark 1.6. Following is the spark configuration,

conf.set("spark.driver.memory", "4G")
conf.set("spark.executor.memory", "30G")
conf.set("spark.rdd.compress", "true")
conf.set("spark.storage.memoryFraction", "0.3")
conf.set("spark.shuffle.consolidateFiles", "true")
conf.set("spark.shuffle.memoryFraction", "0.5")
conf.set("spark.akka.frameSize", "384")
conf.set("spark.io.compression.codec", "lz4")
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

2 Answers 2

4

I found some info pointing to this being a bug in Kryo serialization:

https://github.com/EsotericSoftware/kryo/issues/382

It's fixed in Kryo 4, but spark is not yet using that version:

https://issues.apache.org/jira/browse/SPARK-20389

As a temporary work-around, sounds like this might help:

spark.executor.extraJavaOptions –XX:hashCode=0
spark.driver.extraJavaOptions –XX:hashCode=0

(From https://github.com/broadinstitute/gatk/issues/1524#issuecomment-189368808)

Or you could simply use a different serializer, though that might slow things down.

3
  • Passing arguments didn't help (I will do some more testing with this). Using java serialization solved the issue. However 40 million records doesn't sound like too big data for spark. Am I missing some other configuration? Jun 12, 2017 at 15:15
  • 40 million is not big for spark. But something within those 40 million records is triggering this bug, whereas only the first 10 million do not. It's not so much about the size of the data, just that as you have more and more data, you are more likely to encounter bugs by exercising more code paths.
    – Joe K
    Jun 12, 2017 at 17:59
  • Do we have a solution for this issue now? Passing arguments did not help.
    – passionate
    May 3, 2018 at 22:39
2

This happens when Kryo's reference table exceeds the max integer value (integer overflow). This solve this, set spark.kryo.referenceTracking to false

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Not the answer you're looking for? Browse other questions tagged or ask your own question.