data transformation not implemented.
check the official doc:
https://github.com/bazaarvoice/jolt#stock-transforms
Stock Transforms
The Stock transforms are:
shift : copy data from the input tree and put it the output tree
default : apply default values to the tree
remove : remove data from the tree
sort : sort the Map key values alphabetically ( for debugging and human readability )
cardinality : "fix" the cardinality of input data. Eg, the "urls" element is usually a List,
but if there is only one, then it is a String
Currently, all the Stock transforms just effect the "structure" of the data.
To do data manipulation, you will need to write Java code.
If you write your Java "data manipulation" code to implement the Transform interface, then you can insert your code in the transform chain.
So, to complete your task I see two main variants:
V1:
Use the sequence of following processors:
EvaluateJsonPath -> UpdateAttributes -> AttributesToJSON
in EvaluateJsonPath define for each field attributes with expressions like $.name, $.timeStamp, ...
in UpdateAttributes convert the format of timeStamp and define new attributes:
attribute | value/expression
-----------------------------------------------------------
timeStamp | timeStamp:format('yyyy-MM-dd HH:mm:ss.SSS')
created_ts | now():format('yyyy-MM-dd HH:mm:ss.SSS')
in AttributesToJSON define Attributes List to be stored as json object into file content
V2: use ExecuteScript processor with following code:
import groovy.json.JsonSlurper
import groovy.json.JsonBuilder
def ff = session.get()
if(!ff)return
ff = session.write(ff, {rawIn, rawOut->
// transform streams into reader and writer
rawIn.withReader("UTF-8"){reader->
rawOut.withWriter("UTF-8"){writer->
//parse reader into Map
def json = new JsonSlurper().parse(reader)
//change/set values
json.timeStamp = new Date(json.timeStamp as Long).format('yyyy-MM-dd HH:mm:ss.SSS')
json.created_ts = new Date().format('yyyy-MM-dd HH:mm:ss.SSS')
//write changed object to writer
new JsonBuilder(json).writeTo(writer)
}
}
} as StreamCallback)
session.transfer(ff, REL_SUCCESS)