0
votes

I am running the code for using pipe in RDD spark operations:

following snippet I have tried:

//PIPE - run a external shell script in spark

val x = sc.parallelize(Array("A", "Ba", "C", "AD"))
val y = x.pipe("grep -i A")
println(x.collect())
println(y.collect())

But I am getting :

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 61.0 failed 1 times, most recent failure: Lost task 0.0 in stage 61.0 (TID 592, localhost, executor driver): java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: grep -i A for running the above snippet.

Is there a way to run the grep -i command through pipe?

I tried with calling a .sh script and it is working, but I wanted to run it as a shell command. Reference

1
Subprocess exited with status 1 indicated that the Grep command exited with a non zero status. Exit code 1 for grep simply means no lines were selected / matched - Which is true for all of the elements in the RDD without an A in them - tomgalpin
@tomgalpin: but I have an element A in the input RDD. - sathya
Correct, but the grep command is ran against all of the elements independently. So when it is ran agains "Ba" for example, the grep "fails" to match any lines so exits with a failed exit code, thus giving the above error. When you did it within a .sh file, the .sh file probably exited 0 every time. Does that make more sense? - tomgalpin
@tomgalpin Thanks,any alternate way to fix this issue - sathya

1 Answers

1
votes

It's because the data is partitioned. And even if you use the same command within .sh file as you mention you'll get the same error. If you repartition the RDD to one partition, It should work fine:

val y = x.repartition(1).pipe("grep -i A")

As per the official documentation:

pipe(command, [envVars]):

Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.

As you are using grep command you can not handle each line independently from each other because if it fails for one element it exists.