1
votes

I am new to Spark Streaming. Using PySpark in PyCharm I am unable to get passed the socketTextStream initialization.

def start_streaming (self):
    sp = SparkContext('local[2]', 'streamingTest')
    stream = StreamingContext(sp, 1)
    **items = stream.socketTextStream('localhost', '9009')**
    print(items)
    lines = items.flatMap(lambda line : line.split('\n'))

I am receiving the following error. Any ideas on what the problem is?

Traceback (most recent call last): File "/Users/egorkorneev/PycharmProjects/BasicStreaming/SparkConsumer.py", line 75, in dc.start_streaming() File "/Users/egorkorneev/PycharmProjects/BasicStreaming/SparkConsumer.py", line 40, in start_streaming items = stream.socketTextStream('localhost', '9009') File "/Users/egorkorneev/rnd/spark-1.6.1-bin-hadoop2.6/python/pyspark/streaming/context.py", line 352, in socketTextStream return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self, File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/py4j/java_gateway.py", line 933, in call answer, self.gateway_client, self.target_id, self.name) File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/py4j/protocol.py", line 316, in get_return_value format(target_id, ".", name, value)) py4j.protocol.Py4JError: An error occurred while calling o19.socketTextStream. Trace: py4j.Py4JException: Method socketTextStream([class java.lang.String, class java.lang.String, class org.apache.spark.storage.StorageLevel]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344) at py4j.Gateway.invoke(Gateway.java:252) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:209) at java.lang.Thread.run(Thread.java:745)

Spark 1.6.1

1

1 Answers

1
votes

The port number should be an integer value.

items = stream.socketTextStream('localhost', 9009)