0
votes

I have 3 files - kafka producer.py, consumer.py and spark-job.py. I dont know how to start spark file to make it process a stream of generated data coming from kafka.

  • start zookeeper server in 1st terminal:

    .\bin\windows\zookeeper-start.bat .\config\zookeeper.properties

  • then start kafka-server in 2nd separate terminal:

    .\bin\windows\kafka-server-start.bat .\config\server.properties

Then in 2 separate terminals I start a producer.py and consumer.py.

producer kafka file just generates some dictionary of data:

{branch, currency, amount}

and produces it to kafka cluster every 5 sec or so.

from json import dumps
from time import sleep
from numpy.random import choice, randint
from kafka import KafkaProducer

def get_random_value():
    new_dict = {}
    branch_list = ["Almaty", "Astana", "Taraz", "Semei"]
    currency_list = ["KZT", "RUB", "GBP", "USD"]

    new_dict['currency'] = choice(currency_list)
    new_dict['amount'] = randint(1, 100)
    new_dict['branch'] = choice(branch_list)
    # print(new_dict)
    return new_dict


if __name__ == "__main__":
    producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],
                             value_serializer=lambda x: dumps(x).encode('utf-8'),
                             compression_type='gzip')
    topic_name = 'transaction'

    while True:
        for _ in range(100):
            data = get_random_value()
            try:
                message = producer.send(topic=topic_name, value=data)
                record_data = message.get(timeout=10)
                print('data: {}, offset: {}' \
                      .format(data, record_data.offset))
                #print(data)
            except Exception as e:
                print(e)
            finally:
                producer.flush()
        sleep(5)
    producer.close()

consumer just prints that dict once it comes:

from kafka import KafkaConsumer
import json

consumer = KafkaConsumer('transaction',bootstrap_servers=['127.0.0.1:9092'])

print("start consuming")
for message in consumer:
    aa = json.loads(message.value.decode())
    print("currency: %s, amount: %d, branch: %s" %(aa['currency'], aa['amount'], aa['branch']))

producer,consumer works - outputs to terminal simultaneously.

kk But once these 2 are working, i need to start spark-job.

Spark-job.py listens to the localhost:9092(where kafka also sits) and simply writes incoming data to a db.

import sys
import os
import shutil

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import Row, SparkSession
from pyspark.streaming.kafka import KafkaUtils
import json

outputPath = 'C:/Users/Admin/Downloads/madi_kafka/logs/checkpoints01'

def get_sql_query():
    strSQL = 'select from_unixtime(unix_timestamp()) as curr_time,t.branch as city,t.currency as currency,sum(amount) as amount from exchanges_stream t'
    return strSQL
# -------------------------------------------------
# Lazily instantiated global instance of SparkSession
# -------------------------------------------------
def getSparkSessionInstance(sparkConf):
    if ('sparkSessionSingletonInstance' not in globals()):
        globals()['sparkSessionSingletonInstance'] = SparkSession \
            .builder \
            .config(conf=sparkConf) \
            .getOrCreate()
    return globals()['sparkSessionSingletonInstance']
# -------------------------------------------------
# What I want to do per each RDD...
# -------------------------------------------------
def process(time, rdd):
    print("===========-----> %s <-----===========" % str(time))
    try:
        spark = getSparkSessionInstance(rdd.context.getConf())
        rowRdd = rdd.map(lambda w: Row(city=w['branch'],
                                       currency=w['currency'],
                                       amount=w['amount']))

        testDataFrame = spark.createDataFrame(rowRdd)

        testDataFrame.createOrReplaceTempView("exchanges_stream")

        sql_query = get_sql_query()
        testResultDataFrame = spark.sql(sql_query)
        testResultDataFrame.show(n=5)

        # Insert into DB
        try:
            testResultDataFrame.write \
                .format("jdbc") \
                .mode("append") \
                .option("driver", 'org.postgresql.Driver') \
                .option("url", "jdbc:postgresql://xxx") \
                .option("dbtable", "transaction_flow") \
                .option("user", "habr") \
                .option("password", "habr12345") \
                .save()
            
            print('DB write succesfull !')
        except Exception as e:
            print("-->Error with DB working!", e)

    except Exception as e:
        print("--> Error!", e)


# -------------------------------------------------
# General function
# -------------------------------------------------
def createContext():
    sc = SparkContext(appName="PythonStreamingKafkaTransaction")
    sc.setLogLevel("ERROR")

    ssc = StreamingContext(sc, 10)#  2

    broker_list, topic = sys.argv[1:]

    try:
        directKafkaStream = KafkaUtils.createDirectStream(ssc,
                                                          [topic],
                                                          {"metadata.broker.list": broker_list})
    except:
        raise ConnectionError("Kafka error: Connection refused: \
                            broker_list={} topic={}".format(broker_list, topic))

    parsed_lines = directKafkaStream.map(lambda v: json.loads(v[1]))

    # RDD handling
    parsed_lines.foreachRDD(process)

    return ssc

if __name__ == "__main__":

    if len(sys.argv) != 3:
        print("Usage: spark_job.py <zk> <topic>", file=sys.stderr)
        exit(-1)

print("--> Creating new context")
if os.path.exists(outputPath):
    shutil.rmtree('outputPath')

ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
ssc.start()
ssc.awaitTermination()

I dont know how to launch spark-job.py.

while producer keeps generating msgs, i try to launch

spark-submit \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2,\
org.postgresql:postgresql:9.4.1207 \
spark_job.py localhost:9092 transaction

this gives:

Exception in thread "main" org.apache.spark.SparkException: Cannot load main class from JAR org.postgresql:postgresql:9.4.1207 with URI org.postgresql. Please specify a class through --class.

If I try to launch this cmd:

python.exe .\spark_job.py 127.0.0.1:2181 transaction

it does launch and creates new context, but still cant find some files:

--> Creating new context
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
20/07/25 06:12:46 WARN Checkpoint: Checkpoint directory C:/Users/Admin/Downloads/madi_kafka/logs/checkpoints01 does not exist

________________________________________________________________________________________________

  Spark Streaming's Kafka libraries not found in class path. Try one of the following.

  1. Include the Kafka library and its dependencies with in the
     spark-submit command as

     $ bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8:2.4.6 ...

  2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
     Group Id = org.apache.spark, Artifact Id = spark-streaming-kafka-0-8-assembly, Version = 2.4.6.
     Then, include the jar in the spark-submit command as

     $ bin/spark-submit --jars <spark-streaming-kafka-0-8-assembly.jar> ...

________________________________________________________________________________________________


Traceback (most recent call last):
  File ".\spark_job.py", line 88, in createContext
    {"metadata.broker.list": broker_list})
  File "C:\python37\lib\site-packages\pyspark\streaming\kafka.py", line 138, in createDirectStream
    helper = KafkaUtils._get_helper(ssc._sc)
  File "C:\python37\lib\site-packages\pyspark\streaming\kafka.py", line 217, in _get_helper
    return sc._jvm.org.apache.spark.streaming.kafka.KafkaUtilsPythonHelper()
TypeError: 'JavaPackage' object is not callable

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File ".\spark_job.py", line 114, in <module>
    ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
  File "C:\python37\lib\site-packages\pyspark\streaming\context.py", line 107, in getOrCreate
    ssc = setupFunc()
  File ".\spark_job.py", line 114, in <lambda>
    ssc = StreamingContext.getOrCreate(outputPath, lambda: createContext())
  File ".\spark_job.py", line 91, in createContext
    broker_list={} topic={}".format(broker_list, topic))
ConnectionError: Kafka error: Connection refused:                             broker_list=127.0.0.1:2181 topic=transaction
1

1 Answers

1
votes

--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2 is correct

Assuming

  1. You are using Spark 2.0.2 and have installed Scala 2.11 (both of which are old, and out of date), plus Spark Structured Streaming is the new hotness (packaged as spark-sql-kafka)... Would like to point out: Your error says you have Spark 2.4.6, where Spark Streaming is deprecated, and SQL-Kafka package would save you the headache of converting RDD into Dataframes

First error is related to Postgres classes missing. As answered before, I strongly advise not using Spark when Kafka Connect exists for this very purpose, but to fix, you need to add postgres JARs to the packages list (or more accurately, the Spark classpath)

Second error is becuase you are now missing the --packages argument via the environment variable called PYSPARK_SUBMIT_ARGS

os.environ["PYSPARK_SUBMIT_ARGS"] = "--packages <kafka>"

sc = get_context()