3
votes

EDIT#2: This might be memory related. Logs are showing out-of-heap.

Yes, definitely memory related. Basically docker logs reports all the spewage of out-of-heap from the java, but the jupyter web notebook does not pass that to the user. Instead the user gets kernel failures and occasional weird behavior like code not compiling correctly.


Spark 1.6, particularly docker run -d .... jupyter/all-spark-notebook

Would like to count accounts in a file of ~ 1 million transactions.

This is simple enough, it can be done without spark but I've hit an odd error trying with spark scala.

Input data is type RDD[etherTrans] where etherTrans is a custom type enclosing a single transaction: a timestamp, the from and to accounts, and the value transacted in ether.

class etherTrans(ts_in:Long, afrom_in:String, ato_in:String, ether_in: Float)
extends Serializable {
    var ts: Long = ts_in
    var afrom: String = afrom_in
    var ato: String = ato_in
    var ether: Float = ether_in
    override def toString():String =  ts.toString+","+afrom+","+ato+","+ether.toString    
}

data:RDD[etherTrans] looks ok:

data.take(10).foreach(println)

etherTrans(1438918233,0xa1e4380a3b1f749673e270229993ee55f35663b4,0x5df9b87991262f6ba471f09758cde1c0fc1de734,3.1337E-14)
etherTrans(1438918613,0xbd08e0cddec097db7901ea819a3d1fd9de8951a2,0x5c12a8e43faf884521c2454f39560e6c265a68c8,19.9)
etherTrans(1438918630,0x63ac545c991243fa18aec41d4f6f598e555015dc,0xc93f2250589a6563f5359051c1ea25746549f0d8,599.9895)
etherTrans(1438918983,0x037dd056e7fdbd641db5b6bea2a8780a83fae180,0x7e7ec15a5944e978257ddae0008c2f2ece0a6090,100.0)
etherTrans(1438919175,0x3f2f381491797cc5c0d48296c14fd0cd00cdfa2d,0x4bd5f0ee173c81d42765154865ee69361b6ad189,803.9895)
etherTrans(1438919394,0xa1e4380a3b1f749673e270229993ee55f35663b4,0xc9d4035f4a9226d50f79b73aafb5d874a1b6537e,3.1337E-14)
etherTrans(1438919451,0xc8ebccc5f5689fa8659d83713341e5ad19349448,0xc8ebccc5f5689fa8659d83713341e5ad19349448,0.0)
etherTrans(1438919461,0xa1e4380a3b1f749673e270229993ee55f35663b4,0x5df9b87991262f6ba471f09758cde1c0fc1de734,3.1337E-14)
etherTrans(1438919491,0xf0cf0af5bd7d8a3a1cad12a30b097265d49f255d,0xb608771949021d2f2f1c9c5afb980ad8bcda3985,100.0)
etherTrans(1438919571,0x1c68a66138783a63c98cc675a9ec77af4598d35e,0xc8ebccc5f5689fa8659d83713341e5ad19349448,50.0)

This next function parses ok and is written this way because earlier attempts were complaining of type mismatch between Array[String] or List[String] and TraversableOnce[?]:

def arrow(e:etherTrans):TraversableOnce[String] = Array(e.afrom,e.ato)

But then using this function with flatMap to get an RDD[String] of all accounts fails.

val accts:RDD[String] = data.flatMap(arrow)

Name: Compile Error
Message: :38: error: type mismatch;
 found   : etherTrans(in class $iwC)(in class $iwC)(in class $iwC)(in class $iwC) => TraversableOnce[String]
 required: etherTrans(in class $iwC)(in class $iwC)(in class $iwC)(in class $iwC) => TraversableOnce[String]
         val accts:RDD[String] = data.flatMap(arrow)
                                              ^
StackTrace: 

Make sure you scroll right to see it complain that TraversableOnce[String] doesn't match TraversableOnce[String]

This must be a fairly common problem as a more blatant type mismatch comes up in Generate List of Pairs and while there isn't enough context, is suggested in I have a Scala List, how can I get a TraversableOnce?.

What's going on here?


EDIT: The issue reported above doesn't appear, and code works fine in older spark-shell, Spark 1.3.1 running standalone in a docker container. Errors are generated running in the spark 1.6 scala jupyter environment with the jupyter/all-spark-notebook docker container.

Also @zero323 says that this toy example:

 val rdd = sc.parallelize(Seq((1L, "foo", "bar", 1))).map{ case (ts, fr, to, et) => new etherTrans(ts, fr, to, et)} 
 rdd.flatMap(arrow).collect

worked for him in the terminal spark-shell 1.6.0/spark 2.10.5 and also Scala 2.11.7 and Spark 1.5.2 work as well.

1
Works just fine for me. Have you tested this outside notebook?zero323
No, I haven't. Let me see if there's a spark shell in the docker container...Paul
@zero323 There's a spark shell in the docker container, but I'm getting some odd memory issues from it just reading in the text data. How are you testing this? Are you testing the compilation issue in spark-shell or with a compiler? If you are running it, are you calling sc.parallelize on the data I dumped out, or something else? Of course, you don't have to run it to test compilation.Paul
Just in a spark-shell. With something like this: val rdd = sc.parallelize(Seq((1L, "foo", "bar", 1))).map{ case (ts, fr, to, et) => new etherTrans(ts, fr, to, et)} rdd.flatMap(arrow).collectzero323
I don't think compiler will complain but let me check. I suspect that at some point class definition could be recompiled but arrow not.zero323

1 Answers

0
votes

I think you should switch to use case classes, and it should work fine. Using "regular" classes, might case weird issues when serializing them, and it looks like all you need are value objects, so case classes look like a better fit for your use case.

An example:

case class EtherTrans(ts: Long, afrom: String, ato: String, ether: Float)

val source = sc.parallelize(Array(
    (1L, "from1", "to1", 1.234F),
    (2L, "from2", "to2", 3.456F)
))

val data = source.as[EtherTrans]

val data = source.map { l => EtherTrans(l._1, l._2, l._3, l._4) }

def arrow(e: EtherTrans) = Array(e.afrom, e.ato)

data.map(arrow).take(5)
/*
res3: Array[Array[String]] = Array(Array(from1, to1), Array(from2, to2))
*/

data.map(arrow).take(5)
// res3: Array[Array[String]] = Array(Array(from1, to1), Array(from2, to2))

If you need to, you can just create some method / object to generate your case classes. If you don't really need the "toString" method for your logic, but just for "presentation", keep it out of the case class: you can always add it with a map operation before storing if or showing it.

Also, if you are in Spark 1.6.0 or higher, you could try using the DataSet API instead, that would look more or less like this:

val data = sqlContext.read.text("your_file").as[EtherTrans]

https://databricks.com/blog/2016/01/04/introducing-spark-datasets.html