0
votes

I have many topics in kafka with format as such : value: {big json string with many subkeys etc}.

print topic looks like :

rowtime: 3/10/20 7:10:43 AM UTC, key: , value: {"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx","name": "xxxxxxxxxx","version": "5.2.1"}, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}", "offset": 70827770, "source": "/var/log/xxxx.log", "type": "topicname" }

I have tried using

CREATE STREAM test
 (value STRUCT<
    server_name VARCHAR,
    remote_address VARCHAR,
    forwarded_for VARCHAR,
    remote_user VARCHAR,
    timestamp_start VARCHAR
 ..

WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');

But I get a stream with value as NULL. Is there a way to grab under the value key?

1
Welcome to StackOverflow! Can you edit your question to include an actual sample of your message?Robin Moffatt
Added to question. Thanks!Damian O'Sullivan
How are you populating it? Looks like it comes from Elastic Beats?Robin Moffatt
If you use something like kafkacat to dump the raw message, is it actually escaped JSON within JSON like the PRINT shows?Robin Moffatt
Yes. The json in the message field is escaped JSON. And yes it comes from beats.Damian O'Sullivan

1 Answers

0
votes

The escaped JSON is not valid JSON, which is probably going to have made this more difficult :)

In this snippet:

…\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\…

the leading double-quote for o_name is not escaped. You can validate this with something like jq:

echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\","o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
parse error: Invalid numeric literal at line 1, column 685

With the JSON fixed this then parses successfully:

➜ echo '{"message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_m
ethod\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_
wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddd
dddddd\"},\"type\":\"http\",\"format\":1}"}' | jq '.message|fromjson'
{
  "server_name": "xxxxxxxxxxxxxxx",
  "remote_address": "10.x.x.x",
  "user": "xxxxxx",
  "timestamp_start": "xxxxxxxx",
  "timestamp_finish": "xxxxxxxxxx",
  "time_start": "10/Mar/2020:07:10:39 +0000",
  "time_finish": "10/Mar/2020:07:10:39 +0000",
  "request_method": "PUT",
  "request_uri": "xxxxxxxxxxxxxxxxxxxxxxx",
  "protocol": "HTTP/1.1",
  "status": 200,
…

So now let's get this into ksqlDB. I'm using kafkacat to load it into a topic:

kafkacat -b localhost:9092 -t testing -P<<EOF
{ "@timestamp": "XXXXXXXX", "beat": { "hostname": "xxxxxxxxxx", "name": "xxxxxxxxxx", "version": "5.2.1" }, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HTTP/1.1\",\"status\":200,\"response_length\":\"0\",\"request_length\":\"0\",\"user_agent\":\"xxxxxxxxx\",\"request_id\":\"zzzzzzzzzzzzzzzzzzzzz\",\"request_type\":\"zzzzzzzz\",\"stat\":{\"c_wait\":0.004,\"s_wait\":0.432,\"digest\":0.0,\"commit\":31.878,\"turn_around_time\":0.0,\"t_transfer\":32.319},\"object_length\":\"0\",\"o_name\":\"xxxxx\",\"https\":{\"protocol\":\"TLSv1.2\",\"cipher_suite\":\"TLS_RSA_WITH_AES_256_GCM_SHA384\"},\"principals\":{\"identity\":\"zzzzzz\",\"asv\":\"dddddddddd\"},\"type\":\"http\",\"format\":1}", "offset": 70827770, "source": "/var/log/xxxx.log", "type": "topicname" }
EOF

Now with ksqlDB let's declare the outline schema, in which the message field is just a lump of VARCHAR:

CREATE STREAM TEST (BEAT STRUCT<HOSTNAME VARCHAR, NAME VARCHAR, VERSION VARCHAR>,
                    INPUT_TYPE VARCHAR, 
                    MESSAGE VARCHAR, 
                    OFFSET BIGINT, 
                    SOURCE VARCHAR) 
            WITH (KAFKA_TOPIC='testing', VALUE_FORMAT='JSON');

We can query this stream to check that it's working:

SET 'auto.offset.reset' = 'earliest';
SELECT BEAT->HOSTNAME, 
       BEAT->VERSION, 
       SOURCE, 
       MESSAGE 
  FROM TEST 
EMIT CHANGES LIMIT 1;
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|BEAT__HOSTNAME   |BEAT__VERSION  |SOURCE              |MESSAGE                                                             |
+-----------------+---------------+--------------------+--------------------------------------------------------------------+
|xxxxxxxxxx       |5.2.1          |/var/log/xxxx.log   |{"server_name":"xxxxxxxxxxxxxxx","remote_address":"10.x.x.x","user":|
|                 |               |                    |"xxxxxx","timestamp_start":"xxxxxxxx","timestamp_finish":"xxxxxxxxxx|
|                 |               |                    |","time_start":"10/Mar/2020:07:10:39 +0000","time_finish":"10/Mar/20|
|                 |               |                    |20:07:10:39 +0000","request_method":"PUT","request_uri":"xxxxxxxxxxx|
|                 |               |                    |xxxxxxxxxxxx","protocol":"HTTP/1.1","status":200,"response_length":"|
|                 |               |                    |0","request_length":"0","user_agent":"xxxxxxxxx","request_id":"zzzzz|
|                 |               |                    |zzzzzzzzzzzzzzzz","request_type":"zzzzzzzz","stat":{"c_wait":0.004,"|
|                 |               |                    |s_wait":0.432,"digest":0.0,"commit":31.878,"turn_around_time":0.0,"t|
|                 |               |                    |_transfer":32.319},"object_length":"0","o_name":"xxxxx","https":{"pr|
|                 |               |                    |otocol":"TLSv1.2","cipher_suite":"TLS_RSA_WITH_AES_256_GCM_SHA384"},|
|                 |               |                    |"principals":{"identity":"zzzzzz","asv":"dddddddddd"},"type":"http",|
|                 |               |                    |"format":1}                                                         |
Limit Reached
Query terminated

Now let's extract the embedded JSON fields using the EXTRACTJSONFIELD function (I've not done every field, just a handful of them to illustrate the pattern to follow):

SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address')        AS REMOTE_ADDRESS,
       EXTRACTJSONFIELD(MESSAGE,'$.time_start')            AS TIME_START,
       EXTRACTJSONFIELD(MESSAGE,'$.protocol')              AS PROTOCOL,
       EXTRACTJSONFIELD(MESSAGE,'$.status')                AS STATUS,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait')           AS STAT_C_WAIT,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait')           AS STAT_S_WAIT,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.digest')           AS STAT_DIGEST,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.commit')           AS STAT_COMMIT,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
       EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer')       AS STAT_T_TRANSFER 
  FROM TEST 
EMIT CHANGES LIMIT 1;
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|REMOTE_ADDRESS  |TIME_START                |PROTOCOL  |STATUS  |STAT_C_WAIT |STAT_S_WAIT  |STAT_DIGEST |STAT_COMMIT |STAT_TURN_AROUND_TIME |STAT_T_TRANSFER |
+----------------+--------------------------+----------+--------+------------+-------------+------------+------------+----------------------+----------------+
|10.x.x.x        |10/Mar/2020:07:10:39 +0000|HTTP/1.1  |200     |0.004       |0.432        |0           |31.878      |0                     |32.319          |

We can persist this to a new Kafka topic, and for good measure reserialise it to Avro to make it easier for downstream applications to use:

CREATE STREAM BEATS WITH (VALUE_FORMAT='AVRO') AS
    SELECT EXTRACTJSONFIELD(MESSAGE,'$.remote_address')        AS REMOTE_ADDRESS,
        EXTRACTJSONFIELD(MESSAGE,'$.time_start')            AS TIME_START,
        EXTRACTJSONFIELD(MESSAGE,'$.protocol')              AS PROTOCOL,
        EXTRACTJSONFIELD(MESSAGE,'$.status')                AS STATUS,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.c_wait')           AS STAT_C_WAIT,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.s_wait')           AS STAT_S_WAIT,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.digest')           AS STAT_DIGEST,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.commit')           AS STAT_COMMIT,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.turn_around_time') AS STAT_TURN_AROUND_TIME,
        EXTRACTJSONFIELD(MESSAGE,'$.stat.t_transfer')       AS STAT_T_TRANSFER 
    FROM TEST 
    EMIT CHANGES LIMIT 1;
ksql> DESCRIBE BEATS;

Name                 : BEATS
 Field                 | Type
---------------------------------------------------
 ROWTIME               | BIGINT           (system)
 ROWKEY                | VARCHAR(STRING)  (system)
 REMOTE_ADDRESS        | VARCHAR(STRING)
 TIME_START            | VARCHAR(STRING)
 PROTOCOL              | VARCHAR(STRING)
 STATUS                | VARCHAR(STRING)
 STAT_C_WAIT           | VARCHAR(STRING)
 STAT_S_WAIT           | VARCHAR(STRING)
 STAT_DIGEST           | VARCHAR(STRING)
 STAT_COMMIT           | VARCHAR(STRING)
 STAT_TURN_AROUND_TIME | VARCHAR(STRING)
 STAT_T_TRANSFER       | VARCHAR(STRING)
---------------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

To debug issues with ksqlDB returning NULLs check out this article. A lot of the time it's down to serialisation errors. For example, if you look at the ksqlDB server log you'll see this error when it tries to parse the badly-formed escaped JSON before I fixed it:

WARN Exception caught during Deserialization, taskId: 0_0, topic: testing, partition: 0, offset: 1 (org.apache.kafka.streams.processor.internals.StreamThread:36)
org.apache.kafka.common.errors.SerializationException: mvn value from topic: testing
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('o' (code 111)): was expecting comma to separate Object entries
 at [Source: (byte[])"{"@timestamp": "XXXXXXXX", "beat": {"hostname": "xxxxxxxxxx","name": "xxxxxxxxxx","version": "5.2.1"}, "input_type": "log", "log_dc": "xxxxxxxxxxx", "message": "{\"server_name\":\"xxxxxxxxxxxxxxx\",\"remote_address\":\"10.x.x.x\",\"user\":\
"xxxxxx\",\"timestamp_start\":\"xxxxxxxx\",\"timestamp_finish\":\"xxxxxxxxxx\",\"time_start\":\"10/Mar/2020:07:10:39 +0000\",\"time_finish\":\"10/Mar/2020:07:10:39 +0000\",\"request_method\":\"PUT\",\"request_uri\":\"xxxxxxxxxxxxxxxxxxxxxxx\",\"protocol\":\"HT"[truncated 604 bytes];
 line: 1, column: 827]
   at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
   at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:693)
   at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:591)
   at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextFieldName(UTF8StreamJsonParser.java:986)
…