25
votes

I am writing record to Kinesis Firehose stream that is eventually written to a S3 file by Amazon Kinesis Firehose.

My record object looks like

ItemPurchase {
    String personId,
    String itemId
}

The data is written to S3 looks like:

{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}

NO COMMA SEPERATION.

NO STARTING BRACKET as in a Json Array

[

NO ENDING BRACKET as in a Json Array

]

I want to read this data get a list of ItemPurchase objects.

List<ItemPurchase> purchases = getPurchasesFromS3(IOUtils.toString(s3ObjectContent))

What is the correct way to read this data?

9

9 Answers

20
votes

It boggles my mind that Amazon Firehose dumps JSON messages to S3 in this manner, and doesn't allow you to set a delimiter or anything.

Ultimately, the trick I found to deal with the problem was to process the text file using the JSON raw_decode method

This will allow you to read a bunch of concatenated JSON records without any delimiters between them.

Python code:

import json

decoder = json.JSONDecoder()

with open('giant_kinesis_s3_text_file_with_concatenated_json_blobs.txt', 'r') as content_file:

    content = content_file.read()

    content_length = len(content)
    decode_index = 0

    while decode_index < content_length:
        try:
            obj, decode_index = decoder.raw_decode(content, decode_index)
            print("File index:", decode_index)
            print(obj)
        except JSONDecodeError as e:
            print("JSONDecodeError:", e)
            # Scan forward and keep trying to decode
            decode_index += 1
6
votes

I also had the same problem, here is how I solved.

  1. replace "}{" with "}\n{"
  2. line split by "\n".

    input_json_rdd.map(lambda x : re.sub("}{", "}\n{", x, flags=re.UNICODE))
                  .flatMap(lambda line: line.split("\n"))
    

A nested json object has several "}"s, so split line by "}" doesn't solve the problem.

3
votes

I've had the same issue.

It would have been better if AWS allowed us to set a delimiter but we can do it on our own.

In my use case, I've been listening on a stream of tweets, and once receiving a new tweet I immediately put it to Firehose.

This, of course, resulted in a 1-line file which could not be parsed.

So, to solve this, I have concatenated the tweet's JSON with a \n. This, in turn, let me use some packages that can output lines when reading stream contents, and parse the file easily.

Hope this helps you.

3
votes

I think the best ways to tackle this is to first create a properly formatted json file containing well separated json objects within them. In my case I added ',' to the events which was pushed into the firehose. Then After a file is saved in s3, all the files will contain json object separated by some delimitter(comma- in our case). Another thing that must be added are '[' and ']' at the beginning and end of the file. Then you have a proper json file containing multiple json objects. Parsing them will be possible now.

3
votes

If the input source for the firehose is an Analytics application, this concatenated JSON without a delimiter is a known issue as cited here. You should have a lambda function as here that outputs JSON objects in multiple lines.

2
votes

Use this simple Python code.

input_str = '''{"personId":"p-111","itemId":"i-111"}{"personId":"p-222","itemId":"i-222"}{"personId":"p-333","itemId":"i-333"}'''

data_str = "[{}]".format(input_str.replace("}{","},{"))
data_json = json.loads(data_str)

And then (if you want) convert to Pandas.

import pandas as pd   
df = pd.DataFrame().from_records(data_json)
print(df)

And this is result

itemId personId
0  i-111    p-111
1  i-222    p-222
2  i-333    p-333
1
votes

You can find the each valid JSON by counting the brackets. Assuming the file starts with a { this python snippet should work:

import json

def read_block(stream):
    open_brackets = 0
    block = ''
    while True:
        c = stream.read(1)
        if not c:
            break

        if c == '{':
            open_brackets += 1
        elif c == '}':
            open_brackets -= 1

        block += c

        if open_brackets == 0:
            yield block
            block = ''


if __name__ == "__main__":
    c = 0
    with open('firehose_json_blob', 'r') as f:
        for block in read_block(f):
            record = json.loads(block)
            print(record)
1
votes

I used a transformation Lambda to add a line break at the end of every record

def lambda_handler(event, context):
    output = []

    for record in event['records']:

        # Decode from base64 (Firehose records are base64 encoded)
        payload = base64.b64decode(record['data'])

        # Read json as utf-8    
        json_string = payload.decode("utf-8")

        # Add a line break
        output_json_with_line_break = json_string + "\n"

        # Encode the data
        encoded_bytes = base64.b64encode(bytearray(output_json_with_line_break, 'utf-8'))
        encoded_string = str(encoded_bytes, 'utf-8')

        # Create a deep copy of the record and append to output with transformed data
        output_record = copy.deepcopy(record)
        output_record['data'] = encoded_string
        output_record['result'] = 'Ok'

        output.append(output_record)

    print('Successfully processed {} records.'.format(len(event['records'])))

    return {'records': output}
0
votes

If there's a way to change the way data is written, please separate all the records by a line. That way you can read the data simply, line by line. If not, then simply build a scanner object which takes "}" as a delimiter and use the scanner to read. That would do the job.