0
votes

I am working on new PoC to establish a new data pipeline for our platform. I am shipping logs from application to the EventHub [ Kafka enabled] and trying to consume the messages into the ADX table. I have created the data source from the ADX to map to the EH.

My Table Definition in ADX is :

.create table Trident ( Context:string, Latency:string, TimeStampUtc:string, Status:string, Source:string, Destination:string, LatencyType:string, CorrelationId:string)

I have tried the following json mapping but ADX never able to map the incoming event values to the corresponding column

.create table Trident ( Context:dynamic, Latency:dynamic, TimeStampUtc:dynamic, Status:dynamic, Source:dynamic, Destination:dynamic, LatencyType:dynamic, CorrelationId:dynamic) .create-or-alter table Trident ingestion json mapping 'TridentMapping' '[{'column':'Context','path':'$.message.Context','datatype':'dynamic'},{'column':'Latency','path':'$.message.Latency','datatype':'dynamic'},{'column':'TimeStampUtc','path':'$.message.TimeStampUtc','datatype':'dynamic'},{'column':'Status','path':'$.message.Status','datatype':'dynamic'},{'column':'Source','path':'$.message.Source','datatype':'dynamic'},{'column':'Destination','path':'$.message.Destination','datatype':'dynamic'},{'column':'LatencyType','path':'$.message.LatencyType','datatype':'dynamic'}, {'column':'CorrelationId','path':'$.message.CorrelationId','datatype':'dynamic'}]'

.create-or-alter table Trident ingestion json mapping 'TridentMapping' '[{'column':'Context','path':'$.message[Context]','datatype':'string'},{'column':'Latency','path':'$.message[Latency]','datatype':'string'},{'column':'TimeStampUtc','path':'$.message[TimeStampUtc]','datatype':'string'},{'column':'Status','path':'$.message[Status]','datatype':'string'},{'column':'Source','path':'$.message[Source]','datatype':'string'},{'column':'Destination','path':'$.message[Destination]','datatype':'string'},{'column':'LatencyType','path':'$.message[LatencyType]','datatype':'string'}, {'column':'CorrelationId','path':'$.message[CorrelationId]','datatype':'string'}]'

.create-or-alter table Trident ingestion json mapping 'TridentMapping' '[{'column':'Context','transform' : 'Context'},{'column':'Latency','transform' : 'Latency'},{'column':'TimeStampUtc','transform':'TimeStampUtc'},{'column':'Status','transform':'Status'},{'column':'Source','transform':'Source'},{'column':'Destination','transform':'Destination'},{'column':'LatencyType','transform':'$.LatencyType'}, {'column':'CorrelationId','transform':'CorrelationId'}]'

None of the mapping were able to map the incoming request to the corresponding columns in to the Trident table.

The json payload generated by FileBeat is as follows.

Message Received: { '@timestamp': '2019-07-12T01:43:34.196Z', '@metadata': { 'beat': 'filebeat', 'type': '_doc', 'version': '7.2.0', 'topic': 'trident2' }, 'host': { 'name': 'tridenet-st-az-vm-pragna' }, 'agent': { 'version': '7.2.0', 'type': 'filebeat', 'ephemeral_id': '2fb76a89-2d30-45e2-8ac3-8e47f086bb60', 'hostname': 'tridenet-st-az-vm-pragna', 'id': 'eb1c4b07-75f5-4c0c-bfc8-5a56016760ee' }, 'log': { 'offset': 2801247, 'file': { 'path': '/home/prmoh/trident/CatE2ECSharpLoadGenerator/CatE2ECSharpLoadGen/temp/test.log' } }, 'message': '{\'Context\':\'Trident-AZ-EastUS2-AzurePublicCloud-0ea43e61-f92c-4dc7-bab6-c9bf049d50d1\',\'Latency\':\'39.3731389843734\',\'TimeStampUtc\':\'7/12/19 1:43:34 AM\',\'Status\':\'200\',\'Source\':\'BC5BCA47-A882-4096-BB2D-D76E6C170534\',\'Destination\':\'090556DA-D4FA-764F-A9F1-63614EDA019A\',\'LatencyType\':\'File-Write\',\'CorrelationId\':\'3e8f064a-2477-490a-88fc-3f55b035cfee\'}', 'ecs': { 'version': '1.0.0' } }

1
Did you try to ingest the data using direct ingest? you can upload a file to azure blob storage and ingest it using the mapping provided to test if it works. take a look at this docs.microsoft.com/en-us/azure/kusto/management/data-ingestion/…. Also, you can run a control command to show ingestion failures (.show ingestion failures) which might help pinpoint the problem. more on that: docs.microsoft.com/en-us/azure/kusto/management/…Daniel Dubovski

1 Answers

0
votes

The document you pasted does not look like a valid json.

Can you start with a mapping that maps the whole document to a single column in a test table? for example .create table test(message:dynamic) .create table test ingestion json mapping "map" '[{"column":"message", "path":"$"}]'

This will allow you see the actual json document that arrived to ADX and to easily create the applicable mapping.