I am trying to setup a very basic example.
- Push random data to Output port in NiFi
- Use Spark streaming context to print received data.
Versions (All on single instance)
- HDF - 3.1.1.0-35
- HDP - 2.6.5.0-292
- nifi-spark-receiver & site-to-site-client - 1.7.1
I have configured spark-defaults.conf as follows
spark.driver.extraClassPath /usr/hdf/3.1.1.0-35/nifi/work/META-INF/bundled-dependencies/nifi-client-dto-1.5.0.3.1.1.0-35.jar:/opt/spark-receiver/httpcore-nio-4.0-alpha6.jar:/opt/spark-receiver/nifi-site-to-site-client-1.5.0.3.1.2.0-7.jar:/opt/spark-receiver/nifi-spark-receiver-1.5.0.3.1.2.0-7.jar:/usr/hdf/3.1.1.0-35/nifi/lib/nifi-api-1.5.0.3.1.1.0-35.jar:/usr/hdf/3.1.1.0-35/nifi/lib/bootstrap/nifi-utils-1.5.0.3.1.1.0-35.jar:/usr/hdf/3.1.1.0-35/nifi/lib/nifi-framework-api-1.5.0.3.1.1.0-35.jar
I am running following commands in spark-shell
import org.apache.nifi._
import java.nio.charset._
import org.apache.nifi.spark._
import org.apache.nifi.remote.client._
import org.apache.spark._
import org.apache.nifi.events._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.nifi.remote._
import org.apache.nifi.remote.client._
import org.apache.nifi.remote.protocol._
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import java.io._
import org.apache.spark.serializer._
val conf = new SiteToSiteClient.Builder().url("http://10.140.0.2:9090/nifi").portName("toSpark").buildConfig()
val ssc = new StreamingContext(sc, Seconds(10))
val lines = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY))
val text = lines.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))
text.print()
ssc.start()
After running this code, I get following error:
Exception in thread "NiFi Receiver" java.lang.NoClassDefFoundError: org/apache/http/nio/protocol/HttpAsyncResponseConsumer at org.apache.nifi.remote.client.SiteInfoProvider.createSiteToSiteRestApiClient(SiteInfoProvider.java:104) at org.apache.nifi.remote.client.SiteInfoProvider.refreshRemoteInfo(SiteInfoProvider.java:68) at org.apache.nifi.remote.client.SiteInfoProvider.getPortIdentifier(SiteInfoProvider.java:220) at org.apache.nifi.remote.client.SiteInfoProvider.getOutputPortIdentifier(SiteInfoProvider.java:204) at org.apache.nifi.remote.client.socket.SocketClient.getPortIdentifier(SocketClient.java:79) at org.apache.nifi.remote.client.socket.SocketClient.createTransaction(SocketClient.java:121) at org.apache.nifi.spark.NiFiReceiver$ReceiveRunnable.run(NiFiReceiver.java:149) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.ClassNotFoundException: org.apache.http.nio.protocol.HttpAsyncResponseConsumer at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349) at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
Pls help.