Monday 4 September 2017

TouK Nussknacker - using Apache Flink made easier for analysts and business

Few weeks ago we (TouK) revealed on Github our latest open source project - Nussknacker. What is it and why should you care?


First, some history: more than year ago one of our clients decided it’s high time for Real Time Marketing. They had pretty large data streams and lots of ideas for marketing campaigns, so one of the key success factors was the ability to process the data fast. We prepared a POC based on Apache Flink and it turned out that it’s really great piece of technology - fast and accurate.

There was just one problem - how to write and customize processes? Flink, as many modern stream processing engines, provides rich and friendly DSL, both in Java and Scala. But for our client that was not really enough. See, they are enterprise that do not employ developers - they have many decent, competent analysts who know the business and SQL - but not Java or (Heaven forbid…) Scala.

Nussknacker to the rescue!

So we decided to create simple process designer for them. It looks more or less like this:


You draw a diagram, fill the details (like filter expressions - “#input.usageData > 0” or SMS/mail content), then you press the Deploy button and voilà - your brand new process in running on Flink cluster!

Of course first somebody (that is, developer) has to prepare data sources (most probably Kafka topics), design data model (POJOs or case classes) and implement actions - like sending email or sending some event to another Kafka topic. But once model of data and external services are defined, an analyst can define and deploy processes all by him/herself.

More features

Sounds a bit scary to let your users run stream processes with GUI? Bad filter condition can have serious performance implications if you’re dealing with streams of tens of thousands of events per second... That’s why we let user test their diagrams first - each test case can be generated by sample of real data coming from e.g. Kafka and then run in Flink sandbox mini-cluster.

We have also many more features that make working with Nussknacker and Flink easier: subprocesses, versioning, generating PDF documentation, migration between environments and last but certainly not least - integration with InfluxDB/Grafana to provide detailed insight into how process is doing:


Where can I use it?

What are Nussknacker use cases? Our main deployments deal with RTM (Real Time Marketing). One of our clients started with RTM and then found out that Nussknacker is also great choice for fraud detection in real time. Industries we are working with include telcos, banks and media companies. We are also thinking about other possibilities - for example IoT.

Sounds cool?

If you are interested in easy access for semi-technical analysts to streaming data - give Nussknacker a try!

You can find the code at Github:, we also have a nice, Docker-based quickstart:

And if you are coming to Flink Forward next week in Berlin - join me on Wednesday afternoon to hear more -

In next days/weeks we’ll post more information on TouK blog both on Nussknacker architecture and internals and on interesting use cases - stay tuned :)

Thursday 26 November 2015

ScheduledExecutorService leaking memory

Lately we've found yet another memory leak in our applicaiton. It wasn't big - as it took almost about a week to fill 500MB heap, but still...

The culprit was not (entirely) on our side this time... Memory analyzer showed something more or less like this:

Now, after some searching I've found what this ScheduledTaskExecutor did. It scheduled timeout tasks for some jobs. They took longer or shorter time - but they all ended. And when they ended we called ScheduledFuture.cancel - to clean up executor:

ScheduledTask task = ...

The problem was that some jobs have almost infinite timeout - like dating to year 3000. They all were canceled after few seconds but...
It turns out that with default settings it's not enough. Googling aroung wasn't very helpful, only after debugging I've found THE flag:

setRemoveOnCancelPolicy - defaults to false. Meaning that cancelled tasks do not get removed from executors internal queue. My first impression was WTF$%& - why is it false by default?? - but then I learned that the flag was added only in Java 7, so to maintain backwards compatibility...

So now we initialize our executors like this:

        ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);

Saturday 16 February 2013

SortedSet + Joda DateTime == danger

It's been quite a long time since I wrote something on this blog... Two things occurred that made me do this.
Firstly, I'm going to talk at Java Developer's Conference in Cairo and at Booster conference in Bergen next month, so I want to have some content when I put a link at my slides ;)
Secondly, last week I encountered really weird situation. In fact it was endless loop.
In was in rather critical place of our app and it was on semi-production environment so it was quite embarassing. What's more, the code was working before, it was untouched for about half a year, and it had pretty good test coverage. It looked more or less like this (I've left some stuff out, so now it looks too complex for it's task):

def findDates(dates:SortedSet[DateTime],a:List[DateTime])=
  if (dates.isEmpty || dates.head.toMilis < date) {
    (dates, a)
  } else {
    findDates(dates - dates.head, a+dates.head)

Just simple tail recursion, how can it loop endlessly? It turns out it can. Actually, for some specific data dates - dates.head == dates.
Why? The reason is DateTime is not consistent with equals. If you look into Comparable definition, it says:
It is strongly recommended (though not required) that natural orderings be consistent with equals. This is so because sorted sets (and sorted maps) without explicit comparators behave "strangely" when they are used with elements (or keys) whose natural ordering is inconsistent with equals. In particular, such a sorted set (or sorted map) violates the general contract for set (or map), which is defined in terms of the equals method.
What does this mean? That you should only use sorted collections for classes that satisfy following:if a.compareTo(b) == 0 then a.equals(b) == true And in joda's DateTime javadoc you can read:
Compares this object with the specified object for ascending millisecond instant order. This ordering is inconsistent with equals, as it ignores the Chronology.
And it turns out that this was our case - in our data there were dates that were equal with respect to miliseconds, but in different timezones. What's more, not every pair of such dates can lead to disaster. They have to cause some mess in underlying black-red tree... The solution was to introduce some wrapper (we used it anyway actually) that defined comparison consistent with equality...

Thursday 31 March 2011

Testing ServiceMix 4.3 services with PaxExam

In the project that I'm currently working on we are developing a set of extensions for ServiceMix - more or less some monitoring stuff. It's pretty simple using SMX ExchangeListeners - you implement the interface, register it via Spring DM and voila - you have everything you need. But how are you going to test it? Of course, we write unit tests, and functional tests, but all of them are leaving NMR/OSGi stuff behind - you just don't include osgi-beans.xml's.
This leaves us with untested code - my baaaaad ;)

Fortunately, here comes a rescue - it's PaxExam. It's a framework for testing OSGi services in various containers - most notably felix & equinox. But making it play nicely with latest & greates ServiceMix 4.3 was not so easy - so I what to share it. I made a sample project, it's on my github

It consists of 2 modules: first contains service & listener under test, and the second one contains The Test.

I won't go into details about the service bundle - it's just cxf-bc endpoint passing invocations to camel router, which sets output message. The listener is also very simple. The point is that we have 2 of most widely used smx components, with OSGi packaging and OSGi-config configuration.

This leaves us with itests module. It's core is BaseIntegrationTest, which contains Pax Exam configuration and some utility methods. Pax Exam configuration was the most difficult part for me. The reason is that servicemix consists of many, many dependant projects, each of them having own release cycle, and Pax Exam has it's own.
Anyway, here is what works for me (note - it'll work with sun jdk 1.6 only I think).

JUnit test (which is of course no unit test btw) using Pax Exam has to have static configuration method, which tells testing framework how to prepare container - which one, which parameters, which bundles to install. It looks like this:

So, what should go into configuration method?
First, we have to set up system packages. We have to exclude StAX API and some SOAP APIs from system packages:

The system property "" tells PaxRunner where to look for system packages. I copied them from SMX distro - except that I had to add two Sun packages containing Schema and JAXP implementations - otherwise XPath & schema failed to work. Still, to make SchemaFactory work I had to manually add system property specifying implementation class.

Next thing is rather standard PaxExam configuration: OSGi container, logging profile, spring and spring DM setup (note - you have to use 1.2 version, not the latest one):

Two notes here. A profile is a set of bundles predefined to be used in PaxExam. Also note that we don't use System.setProperty, but PaxExam DSL expression. The reason is that the OSGi container is run separately, so we have to distinguish system properties used to start it, and properties that will be accessible to bundles inside it.

Then comes felix install configuration. I wanted to mimic SMX configuration conventions (files in etc dir, ended with .cfg), so I copied felixInstall configuration and add appropriate system properties. Note, that we set start level to 2. In Karaf deafatul start level is 60, in PaxRunner - it's 5. But I decided to stay with Pax way - don't have that many bundles to need so fine grained configuration.

Now, karaf configuration. It's also pretty standard, just remember that we need Aries bluepring in version 0.2-incubation, and not 0.1 (as PaxExam has it) or latest 0.3

Now, time for Servicemix components configuration. We'll use nice PaxExam feature - feature scan. This means that we can specify Karaf features package, and PaxExam will install appropriate bundles. So, we choose servicemix features, specify servicemix-camel, camel-nmr and servicemix-cxf-bc, and ...?
And it turns out that servicemix-cxf-bc has unresolved dependencies :/ The reason is that it was (at least it seems so ;)) tested with full distro, which contains also activemq-blueprint feature. Since we don't want activemq for our test setup, we'll install required bundles manually:

Now, everything is ready for the final step - installation of services bundle:
mavenBundle("pl.touk.smx4", "paxExamSample-service")

To make testing easier I also created few util methods. Two interesting ones are: getBean and sendMessage.

getBean uses ServiceTracker to get Spring/Blueprint bean registered as a OSGi service. Note, that it is done asynchronously - that's why we need Thread.sleep for ServiceTracker.

sendMessage wraps interaction with NMR - it creates inOut exchange, populates it and send synchronously via channel.

Well... this setup is not particularly easy, but once we get through it, the tests look quite nice:

This sends test message down the NMR, check that output is ok, and that it was caught by Listener.

What's left to do?
You can see some nasty explicit dependencies in configuration - should be taken from maven. Next goal would also be to run SOAPUI tests for testing end to end interaction. But let's leave it for next entry.

To sum up: I think PaxExam is a great tool, although the setup is not so easy. Tests setup is quite long - actually it's faster to build the service bundle and run update on Servicemix to see changes. That's why I would rather recommend using PaxExam for regression tests, run on Hudson (whooops, Jenkins ;))

Sunday 27 March 2011

Activiti - are you ready for BPMN 2.0?

From the beginning of this year I got quite involved in project called Activiti. It's a "light-weight workflow and Business Process Management (BPM) Platform" (according to creators. The project is led by JBPM creators - Tom Bayens and Joram Barrez and is backed by Alfresco and several other companies. I also contributed some small features :)

Activiti has already made some noise in BPM world - check this, this, or this, and of course InfoQ.

So, what's the fuss all about? What makes Activiti special? Of course - it depends what are you comparing it with. My experience involves mainly working with open source BPEL implementation, but I think some points remain valid.
So, here is my list of distinctive features:

BPMN 2.0 support

I took quite a lot of time for this spec to arrive, but it's finally here. The biggest step ahead comparing with BPMN 1.x is execution model - no more BPEL, you can use the same diagram for modelling and execution, and it has proper xsd schema! Of course, it won't solve all round-trip headaches, but I think it's quite important improvement.
XML describing BPMN 2.0 process consists of nodes definitions and transition definitions:
Activiti is one of the first BPM engines offering BPMN 2.0 support. Currently not all nodes are supported, but the list includes:

  • exclusive and paralell gateways
  • timer boundary and intermediate events (timer start event almost ready)
  • various tasks: script, user, service, manual, rules, receive
  • error events and handling them
  • subprocesses (both embedded or not)

Not all of these nodes are fully defined in spec - e.g. it does not describe how service task invocation should look like. Therefore, Activiti comes with a set of custom extensions. They are meant to be as non-intrusive as possible - to make processes more portable. One of most commonly used are ones for describing service task behaviour:

Another useful extension enables to associate html form and candidate user with user task:

One of nice features of BPMN 2.0 is also providing xml schema for describing process diagram - aka Diagram Interchange. This enables good engines (such as Activiti ;)) to generate process diagram just on the basis of xml definition - which makes importing processes modelled in some external tool much easier. It looks like this:

Maybe not too beautiful, but usable.

Goal of supporting full BPM cycle
Do you (still) believe that future tools for creating business processes will allow users to get rid of developers? I do not... Unless of course, business people will learn how to code ;) Otherwise what we'll be left with are some nice zero-coding tools which look great on 15minutes (or event 2hours if they're exprensive enough) but after running into real-life problems will demand extensive hacking.
Activiti pursues different goal, and proposes developing process cycle layer.
Key points of this proposition are:
  • zero coding solutions won't work
  • analysts are needed to model the process, developers are needed to create executable processes, and operations are needed to deploy and monitor them
  • each of these groups have their own set of tools which they're familiar with
  • so let's not reinvent the wheel but encourage them to collaborate but use their set of tools
So, how to achieve this? By creating another web application, of course ;) It's name is Activiti Cycle and it's meant to encourage collaboration between business, developers and operations, while allowing each of them to use their own, specific tools in their daily job. It's more like a federated repository of BPM assets, such as Visio Diagrams, BPMN process definitions, maven process projects, deployment packages and so on. These artifacts can be linked, commented and tracked by various process stakeholders and also transformed.

Easy to embed and extend, also by quasi-REST API
One of biggest pains of BPEL based solutions is that they force you to integrate with the rest of you app using webservices. Fortunately, this is no longer the case. You can embed activiti engine in your (for example) Spring application just by importing few jars and configuring it as any other Spring component:
This is of course great for running processes that handle some Java tasks. What about user tasks? Activiti comes with decent webapps for handling human tasks and monitoring process state:

Activiti Explorer - screens shows the list of tasks for a given user:
Activiti Probe - screen shows monitoring process instance:
But what if you want to/have to use some other frontend technology? Webapps that I mentioned before are really thin clients - all logic is hidden behind Activiti's quasi-REST API (I use the word quasi not to be beaten by RESTafarians who will surely point out that Activiti API is just RPC over HTTP...). That means you can embed Activiti in you webapps/OSGi container/any other environment and integrate with frontend webapps using handy JSON/HTTP communication. Which looks more or less like this:
Using (defacto) standards
When you create application using Activiti chances are high that you know many (if not all) building blocks & techniques:
  • development? Eclipse plugin & maven
  • connecting components together? you can choose: spring or (for JEE6 lovers) CDI
  • testing? just do your normal TDD (you do it, right? ;)) using Activiti JUnit testing utils
Eclipse plugin includes visual modeler, which enables you to draw executable BPMN 2.0 processes, and fill all needed properties:
It uses Diagram Interchange format, so process diagram layout will remain the same when displaying process diagram in other applications.
Testing is also pretty easy, as Activiti comes with good JUnit support. One of small, but important features is ability to simulate the clock - very handy when dealing with long running tasks.

Good integration capabilities
Activiti comes with capabilities allowing for integration with three most popular open source integration frameworks:
  • Mule ESB - integration is written by MuleSoft
  • SpringIntegration, contributed by SpringSource
  • last but cetainly not least: Apache Camel - which is contributed by TouK ;) - it's still work in progress, but I hope to write a blog post soon about integrating Camel & Activiti
This allows to build processes that are closer to orchestration than simple workflows, containing only (or mostly) human tasks. Each of these integration frameworks comes with a vast collection of adapters using all popular (and not so popular) communication protocols. This allows process engine to concentrate on the process, and not on the communication details.

I think it's quite impressive set of features for a product that is less than year old. And what are Activiti plans for the future?
Tom Bayens recently announced that Activiti is going to support some sort of Adaptive Case Management - which is one of top buzzwords in process world. Other goals include:
  • asynchronous continuations
  • moving towards full support of BPMN 2.0
  • extending Activiti Cycle - check Bernd's Ruecker screencast showing Activiti Cycle approach to handling collaboration between analysts, developers and admins - it's quite impressive
As for me, I'm finishing adding support for start timer tasks and hope to post something on Activiti-Camel intergration and running Activiti in OSGi environment soon - especially Apache Servicemix - so stay tuned.

If you've found Activiti interesting, please start with 10 minutes Getting started guide, and if you know Polish, you can also have a look at my slides from Warsaw JUG presentation

Thanks for reading my first post on this blog - hope you liked it.