0
votes

Here I'm facing a issue that I receive message from Kafka source, and write a interceptor to extract two fields(dataSoure and businessType) from the kafka message(json format). Here I'm using gson.fromJson(). But the issue is I got below error.

Here I want to know whether the Flume truncate the Flume event when it exceed a limit? If yes, how to setup it to bigger value. As my kafka message always very long, about 60K bytes.

Looking forward reply. Thanks in advance!

2015-12-09 11:48:05,665 (PollableSourceRunner-KafkaSource-apply) [ERROR - org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:153)] KafkaSource EXCEPTION, {} com.google.gson.JsonSyntaxException: com.google.gson.stream.MalformedJsonException: Unterminated string at line 1 column 4096 at com.google.gson.Gson.fromJson(Gson.java:809) at com.google.gson.Gson.fromJson(Gson.java:761) at com.google.gson.Gson.fromJson(Gson.java:710) at com.xxx.flume.interceptor.JsonLogTypeInterceptor.intercept(JsonLogTypeInterceptor.java:43) at com.xxx.flume.interceptor.JsonLogTypeInterceptor.intercept(JsonLogTypeInterceptor.java:61) at org.apache.flume.interceptor.InterceptorChain.intercept(InterceptorChain.java:62) at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:146) at org.apache.flume.source.kafka.KafkaSource.process(KafkaSource.java:130)

1

1 Answers

0
votes

Finally, I find the root cause by debug the source code. It is becaues I tried to convert event.getBody() to a map using Gson, which is incorrect, as the event.getBody() is a byte[], not a String, which can't be converted. The correct code should be as below:

String body = new String(event.getBody(), "UTF-8");   
Map<String, Object> map = gson.fromJson(body, new TypeToken<Map<String, Object>>() {}.getType());