When I came across this requirement I try to look into JoltTransformJSON processor, but here we can modify or delete the json attributes and also we can append new attributes to your flow file, but this allow us to insert static attributes, but not dynamic.
After my long search I found executescript processor, by this processor we can write a python program and we can achieve the requirement.
ExecuteScript:
Execute script allow us to modify the incoming flow file and its content, in this blog I will explain how to do it.
Execute script allow us to access few standard objects like session, content, log.
We used InputStreamCallBack and OutputStreamCallback interfaces to modify the input flowfile content and sending it back.
Execute script support multiple script engines like python, groovy, ECMA script, ruby,lua and clojure,
I’m using python which I’m familiar with.
Requirement :
I have a requirement to store customer name with suggested stock details to hbase, like name, suggested stocks and suggested date, From kafka producer I’m getting all suggested stocks related a customer, as single string separated with comma (,).
Example of incoming data:
{"user": "rajesh", "sug_date":12-31-2020 11:23:03, "sug_stocks": "GOOGL, MSFT"}
{"user": "rohit", "sug_date": "12/31/2020, 16:02:15", "sug_stocks": "MRF, AMZN, TSLA"}
We need to bring output like below.
Required output:
{"GOOGL": "", "sug_date": "12/31/2020, 15:11:27", " MSFT": "", "user": "rajesh"}
{"MRF": "", " AMZN": "", "sug_date": "12/31/2020, 16:02:15", "user": "rohit", " TSLA": ""}
If we observe above input and required output once, input sug_stocks value GOOGL, MSFT become as the keys with empty string as value and we removed sug_stocks attribute.
Publishing input data:
In below code snippet we are sending two customer objects with suggest stocks and publish as kafka message through test topic
#Kafka_producer.py
import uuid
from datetime import datetime
from kafka import KafkaProducer
import json
def connect_kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=["localhost:9092"], max_request_size=3222000000)
except Exception as ex:
print("Exception while connecting Kafka")
print(str(ex))
finally:
return _producer
def publish_msg(key, value):
producer_obj = connect_kafka_producer()
key_bytes = bytes(key, encoding="utf-8")
value_bytes = bytes(value, encoding="utf-8")
topic_name = "test"
producer_obj.send(topic_name, key=key_bytes, value=value_bytes)
producer_obj.flush()
if __name__ =='__main__':
person_obj1 = {"user": "rohit", "sug_date": datetime.now().strftime("%m/%d/%Y, %H:%M:%S"),
"sug_stocks": "MRF, AMZN, TSLA"}
person_obj2 = {"user": "rajesh", "sug_date": datetime.now().strftime("%m/%d/%Y, %H:%M:%S"),
"sug_stocks": "GOOGL, MSFT"}
publish_msg('person', json.dumps(person_obj1))
publish_msg('person', json.dumps(person_obj2))
kafka_producer.py 1
If we execute the above script, message will be published on the consumer kafka we need to read messages.
Here I’m attaching my nifi flow and I will explain each processor that I used
ConsumeKafka:
Used to read the published messages which we published using kafka producer. Here we need to do some configurations.
Kafka broker: we need to provide kafka broker host and port
Topic names: I have used test at producer side, so I need to use same topic to read published messages
We got 2 messages that we send from producer
EvaluateJsonPath:
Here I’m adding one propery sug_stocks which we are sending on producer side, it is accessible and do manipulate it. We can see this under attribute section of next processor.
In execute script configuration we add some properties like engine script path and virtual environment path under module path.
In the script we need to modify the json attribute sug_stocks like read value of it prepare it as key.
Input for the script:
Output from after script execution:
We need to insert the above output to our hbase table
Here I’m sharing the script
from org.apache.commons.io import IOUtils
from java.nio.charset import StandardCharsets
from org.apache.nifi.processor.io import StreamCallback, InputStreamCallback, OutputStreamCallback
class PyInputStreamCallback(InputStreamCallback):
def __init__(self):
self.json_content = {}
def process(self, inputStream):
text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)
self.json_content = json.loads(text)
# atrribute that we need to modify
categories = self.json_content["sug_stocks"]
for x in str(categories).split(","):
# insert it as a key
self.json_content[x] = ""
# removing sug_stocks attribute
self.json_content.pop("sug_stocks")
class OutputWrite(OutputStreamCallback):
def __init__(self, obj):
self.obj = obj
def process(self, outputStream):
outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))
flowfile = session.get()
if flowfile != None:
py_is = PyInputStreamCallback()
session.read(flowfile, py_is)
flowfile = session.write(flowfile, OutputWrite(py_is.json_content))
session.transfer(flowfile, REL_SUCCESS)
session.commit()
session.close()
Note:
While working with executescript, make sure whichever the latest flowfile object that we had that need to be write, if we write old flowfile object we can get session isn't closed exception inside nifi custom processor
PutHBaseJSON:
We need to add few properties to Puthbasejson processor like under which table and column family need to store
We can see output in hbase customer table
References:
ExecuteScript: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-0-9-nar/1.5.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/
EvaluateJsonPath: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.EvaluateJsonPath/index.html
ExecuteScript : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.5.0/org.apache.nifi.processors.script.ExecuteScript/index.html
PutHBaseJSON: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-hbase-nar/1.5.0/org.apache.nifi.hbase.PutHBaseJSON/
https://stackoverflow.com/questions/46570210/session-isnt-closed-exception-inside-nifi-custom-processor