Clojure on Apache NiFi

niagara_falls_before_a_rain_storm_2007

Apache NiFi is a flow oriented distribution and data processing platform with support for processes, data provenance, queuing between the processes and a whole lot of other useful features. Its basic unit of data is a FlowFile, which is a persistent data structure with contents and metadata expressed with attributes (e.g. file name, HTTP request headers, any other tags).

The workflow is defined as a series of processors that transform input flow files and pass them on to other processors via relationships (i.e. transitions). There are already quite a few built in processors for the most common operations, such as HTTP requests, database operations, common file format conversions and others, but in a typical data processing application you are very likely to need to implement your own.

Now, the only thing that could possibly increase the value of such a tool to us, Clojurians, is if we could develop custom processors in Clojure, not plain Java. Intuitively, it seems reasonable – both are on JVM and some NiFi concepts should fit better in a Clojure world anyway, namely immutable flow files.

In this post we will present the proof-of-concept toolkit we built for ourselves with exactly that purpose on mind – to develop NiFi processors in Clojure. The toolkit consists of the following projects:

  • clj-nifi – the main library that wraps NiFi Java API with Clojure friendly functions we can easily compose.
  • boot-nifi – a Boot plugin with several useful tasks (build nar files, set up and run a local NiFi server)
  • clj-nifi-bundle – a project template so you can quickly bootstrap your development.

So, lets start with our first Clojure NiFi processor. The prerequisites are Java 8 or higher and Boot installed. First,we’ll create a new project from the template:

boot -d clj-nifi-bundle/boot-template -d seancorfield/boot-new new -t clj-nifi-bundle -n first-nifi-project

If we need a Leiningen project.clj for our other development tools, e.g. Cursive, we can generate it like this:

cd first-nifi-project
boot -d onetom/boot-lein-generate generate

The initial project will contain both clj-nifi and boot-nifi as its dependencies, so we can proceed to the next step and that is to download Apache NiFi and set it up locally in the project:

boot download-nifi

This will create a new directory called nifi-home in your project directory with a NiFi server in it. It is ignored by git or hg by default and if you use IntelliJ Idea you may also want to exclude it from indexing.

The next step is to write our own processor. The template project already contains a skeleton processor, so we can take a peek at that first, just to see what is expected from us.


(def success-relationship
  (relationship :name "success" :description "Success"))

(def new-name-property
  (property :name        "New name"
            :description "Name to rename the file"
            :validators  [StandardValidators/NON_EMPTY_VALIDATOR]
            :required    true))

(gen-class :name ^{Tags                  ["nifi" "clojure" "example"]
                   CapabilityDescription "My Clojure-DSL processor"
                   SeeAlso               []
                   ReadsAttributes       []
                   WritesAttributes      []} first_nifi_project.core.ClojureProcessor
           :extends org.apache.nifi.processor.AbstractProcessor
           :prefix "pf-")

(defn pf-getSupportedPropertyDescriptors [_] [new-name-property])

(defn pf-getRelationships [_] #{success-relationship})

(defn pf-onTrigger [_ ^ProcessContext context ^ProcessSession session]
  (-> (init context session)
      (get-one)
      (put-attribute "filename" "renamed.txt")
      (write "APPENDED SOME CONTENT etc...")
      (transfer success-relationship)))

So, here we have one class called first_nifi_project.core.ClojureProcessor with three methods we must override:

  • getSupportedPropertyDescriptors with exactly one property – “New name”
  • getRelationships – with exactly one relationship – success
  • onTrigger – with the body of code that does the actual work.

In addition to that, there is a text file called org.apache.nifi.processor.Processor in resources/META-INF/services with one line for each processor we define:


first_nifi_project.core.ClojureProcessor

Most of this boilerplate is just weird Clojure syntax for what NiFi processors in Java have anyway, so we’ll “zoom in” on the important part that makes the difference:


(-> (init context session)                    ;create a scope object
    (get-one)                                 ;get one flow file from session
    (put-attribute "filename" "renamed.txt")  ;change the file attribute to rename the file
    (write "APPENDED SOME CONTENT etc...")    ;append some text content
    (transfer success-relationship))          ;pass the file on to the next processor

The first thing that we introduce is the scope object. Basically, it’s just a map that wraps a ProcessSession, ProcessContext and the current version of the FlowFile. All clj-nifi manipulation functions take scope object as its first argument and possibly other arguments in addition to it and return a scope object that represents the changed state.

Its only purpose is to bridge the gap between NiFi interface for immutable data through use of session object and the typical Clojure interface that allows easy use of thread-first macro to compose the functions. Specifically, in this example we first create the scope map, then pass it as the first (and only) argument to get-one. The result of get-one is another scope map with a flow file added to it and it is passed as the first argument to put-attribute along with two other arguments. Its result is a yet another scope with a changed flow file and so on all the way until the moment when we transfer to another processor.

We can change the code as much as we desire, but right now we’ll first verify that the setup works by running the default example:

boot run

This will first build the application, then build a nar package (by running a Maven plugin using our boot-mvn plugin we wrote about in one of our previous posts), then run the local NiFi distribution we just downloaded. It might take a moment of two, but eventually you should be able to access the dashboard at http://localhost:8080/nifi and try to add our new processor.

The one remaining thing to do is test all this. We can create one GetFile processor, pipe its output to our ClojureProcessor and its output to a PutFile.

Now, if everything works, whenever we put a file in the directory from which GetFile takes its inputs, we’ll get the same file with “APPENDED SOME CONTENT etc…” appended to it and renamed to renamed.txt regardless of how we configured “New name” property of our processor. Making the renamed file name depend on the processor property is left as an exercise for the reader.

That was it, we validated our assumptions that Clojure development on NiFi is in fact both possible and perfectly idiomatic. If you would like to know more about it – stay tuned. We’ll post more examples in our following posts.

Written by Goran Jovic

Software developer and founder of Big Solutions. He has been working with Clojure since 2010 on a number of projects usually involving Big Data processing and analytics. His current focus is application of Clojure-fu on IoT integration.

View all author posts ->

Leave a reply

Your email address will not be published. Required fields are marked *