I am working on new PoC to establish a new data pipeline for our platform. I am shipping logs from application to the EventHub [ Kafka enabled] and trying to consume the messages into the ADX table. I have created the data source from the ADX to map to the EH.
My Table Definition in ADX is :
.create table Trident ( Context:string, Latency:string, TimeStampUtc:string, Status:string, Source:string, Destination:string, LatencyType:string, CorrelationId:string)
I have tried the following json mapping but ADX never able to map the incoming event values to the corresponding column
.create table Trident ( Context:dynamic, Latency:dynamic, TimeStampUtc:dynamic, Status:dynamic, Source:dynamic, Destination:dynamic, LatencyType:dynamic, CorrelationId:dynamic) .create-or-alter table Trident ingestion json mapping 'TridentMapping' '[{'column':'Context','path':'$.message.Context','datatype':'dynamic'},{'column':'Latency','path':'$.message.Latency','datatype':'dynamic'},{'column':'TimeStampUtc','path':'$.message.TimeStampUtc','datatype':'dynamic'},{'column':'Status','path':'$.message.Status','datatype':'dynamic'},{'column':'Source','path':'$.message.Source','datatype':'dynamic'},{'column':'Destination','path':'$.message.Destination','datatype':'dynamic'},{'column':'LatencyType','path':'$.message.LatencyType','datatype':'dynamic'}, {'column':'CorrelationId','path':'$.message.CorrelationId','datatype':'dynamic'}]'
.create-or-alter table Trident ingestion json mapping 'TridentMapping' '[{'column':'Context','path':'$.message[Context]','datatype':'string'},{'column':'Latency','path':'$.message[Latency]','datatype':'string'},{'column':'TimeStampUtc','path':'$.message[TimeStampUtc]','datatype':'string'},{'column':'Status','path':'$.message[Status]','datatype':'string'},{'column':'Source','path':'$.message[Source]','datatype':'string'},{'column':'Destination','path':'$.message[Destination]','datatype':'string'},{'column':'LatencyType','path':'$.message[LatencyType]','datatype':'string'}, {'column':'CorrelationId','path':'$.message[CorrelationId]','datatype':'string'}]'
.create-or-alter table Trident ingestion json mapping 'TridentMapping' '[{'column':'Context','transform' : 'Context'},{'column':'Latency','transform' : 'Latency'},{'column':'TimeStampUtc','transform':'TimeStampUtc'},{'column':'Status','transform':'Status'},{'column':'Source','transform':'Source'},{'column':'Destination','transform':'Destination'},{'column':'LatencyType','transform':'$.LatencyType'}, {'column':'CorrelationId','transform':'CorrelationId'}]'
None of the mapping were able to map the incoming request to the corresponding columns in to the Trident table.
The json payload generated by FileBeat is as follows.
Message Received: { '@timestamp': '2019-07-12T01:43:34.196Z', '@metadata': { 'beat': 'filebeat', 'type': '_doc', 'version': '7.2.0', 'topic': 'trident2' }, 'host': { 'name': 'tridenet-st-az-vm-pragna' }, 'agent': { 'version': '7.2.0', 'type': 'filebeat', 'ephemeral_id': '2fb76a89-2d30-45e2-8ac3-8e47f086bb60', 'hostname': 'tridenet-st-az-vm-pragna', 'id': 'eb1c4b07-75f5-4c0c-bfc8-5a56016760ee' }, 'log': { 'offset': 2801247, 'file': { 'path': '/home/prmoh/trident/CatE2ECSharpLoadGenerator/CatE2ECSharpLoadGen/temp/test.log' } }, 'message': '{\'Context\':\'Trident-AZ-EastUS2-AzurePublicCloud-0ea43e61-f92c-4dc7-bab6-c9bf049d50d1\',\'Latency\':\'39.3731389843734\',\'TimeStampUtc\':\'7/12/19 1:43:34 AM\',\'Status\':\'200\',\'Source\':\'BC5BCA47-A882-4096-BB2D-D76E6C170534\',\'Destination\':\'090556DA-D4FA-764F-A9F1-63614EDA019A\',\'LatencyType\':\'File-Write\',\'CorrelationId\':\'3e8f064a-2477-490a-88fc-3f55b035cfee\'}', 'ecs': { 'version': '1.0.0' } }