python:2.6.6
kafka-python:1.4.3
i had run the kafka producer,but it always tips me this error:
Traceback (most recent call last):
File "KafkaOperation.py", line 11, in
from kafka import KafkaProducer
File "/usr/lib/python2.6/site-packages/kafka/init.py", line 21, in
from kafka.consumer import KafkaConsumer
File "/usr/lib/python2.6/site-packages/kafka/consumer/init.py", line 5, in
from kafka.consumer.group import KafkaConsumer
File "/usr/lib/python2.6/site-packages/kafka/consumer/group.py", line 13, in
from kafka.consumer.fetcher import Fetcher
File "/usr/lib/python2.6/site-packages/kafka/consumer/fetcher.py", line 19, in
from kafka.record import MemoryRecords
File "/usr/lib/python2.6/site-packages/kafka/record/init.py", line 1, in
from kafka.record.memory_records import MemoryRecords
File "/usr/lib/python2.6/site-packages/kafka/record/memory_records.py", line 27, in
from kafka.record.default_records import DefaultRecordBatch, DefaultRecordBatchBuilder
File "/usr/lib/python2.6/site-packages/kafka/record/default_records.py", line 338, in
class DefaultRecordBatchBuilder(DefaultRecordBase, ABCRecordBatchBuilder):
File "/usr/lib/python2.6/site-packages/kafka/record/default_records.py", line 378, in DefaultRecordBatchBuilder
byte_like=(bytes, bytearray, memoryview),
NameError: name 'memoryview' is not defined
the codes these:
from kafka import KafkaProducer
from kafka import KafkaConsumer
from kafka.errors import KafkaError
import json
class Kafka_producer():
def __init__(self, kafkahost,kafkaport, kafkatopic):
self.kafkatopic = kafkatopic
service_host = kafkahost+":"+kafkaport
self.producer = KafkaProducer(bootstrap_servers=service_host)
def sendjsondata(self, params):
try:
# parmas_message = json.dumps(params)
producer = self.producer
futur = producer.send(self.kafkatopic, params.encode('utf-8'))
res = futur.get(timeout=60)
producer.flush()
producer.close()
except KafkaError as e:
print e
if __name__ == '__main__':
# test = {
# "test":"testtets"
# }
# Kafka_producer("http://10.25.245.192","9092","nori-log").sendjsondata(test)
producer = KafkaProducer(bootstrap_servers='10.25.245.192:9092')
for _ in range(100):
producer.send('nori-log', {"test":"test_content"})
And i had exchange the toronto version to 2.2.1