null

Execute Script Processor

When I came across this requirement I try to look into JoltTransformJSON processor, but here we can modify or delete the json attributes and also we can append new attributes to your flow file, but this allow us to insert static attributes, but not dynamic.

After my long search I found executescript processor, by this processor we can write a python program and we can achieve the requirement.

ExecuteScript:

Execute script allow us to modify the incoming flow file and its content, in this blog I will explain how to do it.

Execute script allow us to access few standard objects like session, content, log.

We used InputStreamCallBack and OutputStreamCallback  interfaces to modify the input flowfile content and sending it back.

Execute script support multiple script engines like python, groovy, ECMA script, ruby,lua     and clojure, 

I’m using python which I’m familiar with.

Requirement :

I have a requirement to store customer name with suggested stock details to hbase, like name, suggested stocks and suggested date, From kafka producer I’m getting all suggested stocks related a customer, as single string separated with comma (,).

Example of incoming data: 

{"user": "rajesh", "sug_date":12-31-2020 11:23:03, "sug_stocks": "GOOGL, MSFT"}

{"user": "rohit", "sug_date": "12/31/2020, 16:02:15", "sug_stocks": "MRF, AMZN, TSLA"}

We need to bring output like below.

Required output:

{"GOOGL": "", "sug_date": "12/31/2020, 15:11:27", " MSFT": "", "user": "rajesh"}

{"MRF": "", " AMZN": "", "sug_date": "12/31/2020, 16:02:15", "user": "rohit", " TSLA": ""}

 

If we observe above input and required output once, input sug_stocks value GOOGL, MSFT become as the keys with empty string as value and we removed sug_stocks attribute.

Publishing input data:

In below code snippet we are sending two customer objects with suggest stocks and publish as kafka message through test topic

 

#Kafka_producer.py

import uuid

from datetime import datetime

from kafka import KafkaProducer

import json

 

def connect_kafka_producer():

    _producer = None

    try:

        _producer = KafkaProducer(bootstrap_servers=["localhost:9092"], max_request_size=3222000000)

    except Exception as ex:

        print("Exception while connecting Kafka")

        print(str(ex))

    finally:

        return _producer

 

def publish_msg(key, value):

    producer_obj = connect_kafka_producer()

    key_bytes = bytes(key, encoding="utf-8")

    value_bytes = bytes(value, encoding="utf-8")

    topic_name = "test"

    producer_obj.send(topic_name, key=key_bytes, value=value_bytes)

    producer_obj.flush()

 

if __name__ =='__main__':

    person_obj1 = {"user": "rohit", "sug_date": datetime.now().strftime("%m/%d/%Y, %H:%M:%S"),

                   "sug_stocks": "MRF, AMZN, TSLA"}

    person_obj2 = {"user": "rajesh", "sug_date": datetime.now().strftime("%m/%d/%Y, %H:%M:%S"),

                   "sug_stocks": "GOOGL, MSFT"}

    publish_msg('person', json.dumps(person_obj1))

    publish_msg('person', json.dumps(person_obj2))

kafka_producer.py 1

 

If we execute the above script, message will be published on the consumer kafka we need to read messages.

Here I’m attaching my nifi flow and I will explain each processor that I used

ConsumeKafka:

Used to read the published messages which we published using kafka producer. Here we need to do some configurations.

Kafka broker: we need to provide kafka broker host and port

Topic names: I have used test at producer side, so I need to use same topic to read published messages

 

We got 2 messages that we send from producer

EvaluateJsonPath:

Here I’m adding one propery sug_stocks which we are sending on producer side, it is accessible and do manipulate it. We can see this under attribute section of next processor.

 

ExecuteScript:

In execute script configuration we add some properties like engine script path and virtual environment path under module path.

In the script we need to modify the json attribute sug_stocks like read value of it prepare it as key.

Input for the script:

 

Output from after script execution:

We need to insert the above output to our hbase table

Here I’m sharing the script

import json

from org.apache.commons.io import IOUtils

from java.nio.charset import StandardCharsets

from org.apache.nifi.processor.io import StreamCallback, InputStreamCallback, OutputStreamCallback

 

 

class PyInputStreamCallback(InputStreamCallback):

    def __init__(self):

        self.json_content = {}

 

    def process(self, inputStream):

        text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)

        self.json_content = json.loads(text)

        # atrribute that we need to modify

        categories = self.json_content["sug_stocks"]

        for x in str(categories).split(","):

            # insert it as a key

            self.json_content[x] = ""

        # removing sug_stocks attribute

        self.json_content.pop("sug_stocks")

 

 

class OutputWrite(OutputStreamCallback):

    def __init__(self, obj):

        self.obj = obj

 

    def process(self, outputStream):

        outputStream.write(bytearray(json.dumps(self.obj).encode('utf')))

 

flowfile = session.get()

 

if flowfile != None:

    py_is = PyInputStreamCallback()

    session.read(flowfile, py_is)

    flowfile = session.write(flowfile, OutputWrite(py_is.json_content))

    session.transfer(flowfile, REL_SUCCESS)

    session.commit()

    session.close()

 

Note:

While working with executescript, make sure whichever  the latest flowfile object that we had that need to be write, if we write old flowfile object we can get session isn't closed exception inside nifi custom processor

PutHBaseJSON:

We need to add few properties to Puthbasejson processor like under which table and column family need to store

We can see output in hbase customer table

References:

ExecuteScript: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-kafka-0-9-nar/1.5.0/org.apache.nifi.processors.kafka.pubsub.ConsumeKafka/

EvaluateJsonPath: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-standard-nar/1.6.0/org.apache.nifi.processors.standard.EvaluateJsonPath/index.html

ExecuteScript : https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-scripting-nar/1.5.0/org.apache.nifi.processors.script.ExecuteScript/index.html

PutHBaseJSON: https://nifi.apache.org/docs/nifi-docs/components/org.apache.nifi/nifi-hbase-nar/1.5.0/org.apache.nifi.hbase.PutHBaseJSON/

https://stackoverflow.com/questions/46570210/session-isnt-closed-exception-inside-nifi-custom-processor