In my previous post, I have discussed on Stream Processing and its need in the integration world; in this post, I am mainly focusing on Ballerina Streams and how it is evolved over the time, promising capabilities and its direction.

The Start

As the first step, we wanted to bring the streaming capabilities to Ballerina language within a very short time period. As some of you aware, we already had an in-house built and most powerful Complex Event Processing engine; which is called as Siddhi. After few rounds of heated discussions we came up with a streaming language syntax for Ballerina; it was more like a marriage between Siddhi query syntax and Ballerina programming syntax.

Siddhi Streaming Query Syntax

from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo, deviceID
insert into AvgTempStream;

Ballerina Streaming Query Syntax

forever {
from tempStream window time(600000)
select avg(tempStream.temp) as avgTemp, tempStream.roomNo,
tempStream.deviceID
=> (AvgTemperature [] values) {
foreach var value in values {
avgTempStream.publish(value);
}
}
}

Within 2 months time period (with 2 engineers), we were able to integrate the Siddhi CEP engine with Ballerina language. This is more likely a marathon implementation. In this approach when a user writes the streaming query in Ballerina syntax it will be converted into the Siddhi query and pushed into the Siddhi runtime as shown in below figure.

Query Flow in Siddhi Based Streaming Implementation

Even Though, this approach caters necessary capabilities that required for Streaming needs; we also identified some areas that we wanted to improve. Below are the key reasons behind it.

  • Inability to access the runtime variables and functions which are in Ballerina runtime from Streaming queries which runs in Siddhi engine.
  • Underneath implementation language of Siddhi is not full comparable with Ballerina.

Due to these reasons, our direction changed towards implementing the streaming processing capabilities from the scratch using the Ballerina language itself. This also helps us to prove the feature richness of the Ballerina language and stand as an example. Now, rest of this post discusses on Ballerina Streams implementation which done using Ballerina language itself.

Streaming Implementation — For Ballerina By Ballerina

Query Flow in Native Ballerina Streaming Implementation

As shown in above Figure, streaming query written by a developer goes through few phases at the compile time. It passes through the lexical analyzer, parser, semantic analyzer, code analyzer, taint analyzer, desugar and code generator. Here, when streaming query reaches to the Desugar phase, the respective query is converted into typical Ballerina first-class constructs and code is converted into another form by removing the syntactic sugar. Then, the desugared code is code gen in the code generator phase. In the desugaring phase, the pre-built streaming models such as windows, aggregators and etc.. are used to perform this.

Note: If you like to understand more about the Ballerina Compiler and Ballerina Virtual Machine then please refer below posts written by Sameera Jayasoma, lead architect of Ballerina Compiler and Runtime.

As per the above approach, the streaming query written by a user is converted into typical Ballerina code. For example,

Above streaming query will be desugared into below Ballerina code and execution continues.

As you can see, a simple streaming query which is written by the developer is converted into a few lines of code. This reduces the complexity of dealing the huge code for stream processing and provides a very simple and easily understandable layer for the developers.

Ballerina Streaming Capabilities

Projection

Example Query

Filter

Purpose : A filter allows you to separate events that match a specific condition as the output or for further processing.

Example Query

Window

Purpose : To create subsets of events within a stream based on time duration, number of events, etc for processing. A window can operate in a sliding or tumbling (batch) manner.

At the moment, Ballerina Streaming supports below window implementations.

  • time
  • timeBatch
  • timeLength
  • length
  • lengthBatch
  • externalTime
  • externalTimeBatch
  • uniqueLength
  • delayWindow
  • sort

Example Query

Aggregate Functions

Below aggregations are supported at the moment.

  • avg : Calculates the average for a given argument for all the events.
  • sum : Returns the sum of a given argument for all the events.
  • max : Returns the maximum value of a given argument for all the events.
  • min : Returns the minimum value of a given argument for all the events.
  • count : Returns the count of all the events.
  • distinctCount : Returns the count of distinct occurrences for a given argument.
  • maxForever : This stores the maximum value for a given attribute throughout the lifetime of the query regardless of any windows in-front.
  • minForever : This stores the minimum value for a given attribute throughout the lifetime of the query regardless of any windows in-front.
  • stdDev : Returns the calculated standard deviation of a given argument for all the events.

Example Query

Group By

Sample Query

Having

Sample Query

Order By

Sample Query

Stream Joins

Purpose : Streams are stateless. Therefore, in order to join two streams, they need to be connected to a window so that there is a pool of events that can be used for joining. Joins also accept conditions to join the appropriate events from each stream.

During the joining process each incoming event of each stream is matched against all the events in the other stream’s window based on the given condition, and the output events are generated for all the matching event pairs.

Unidirectional Join Operation

By default, events arriving at either stream can trigger the joining process. However, if you want to control the join execution, you can add the unidirectional keyword next to a stream in the join definition as depicted in the syntax in order to enable that stream to trigger the join operation. Here, events arriving at other stream only update the window of that stream, and this stream does not trigger the join operation.

Note : The unidirectional keyword cannot be applied to both the input streams because the default behavior already allows both streams to trigger the join operation.

Following are the supported operations of a join clause.

Inner join (join)

This is the default behavior of a join operation. join is used as the keyword to join both the streams. The output is generated only if there is a matching event in both the streams.

Left outer join

The left outer join operation allows you to join two streams to be merged based on a condition. left outer join is used as the keyword to join both the streams. Here, it returns all the events of left stream even if there are no matching events in the right stream by having null values for the attributes of the right stream.

Right outer join

This is similar to a left outer join. right outer join is used as the keyword to join both the streams. It returns all the events of the right stream even if there are no matching events in the left stream.

Full outer join

The full outer join combines the results of left outer join and right outer join. full outer join is used as the keyword to join both the streams. Here, output events are generated for each incoming event even if there are no matching events in the other stream.

Example Query

Table Operations with Stream

In Streaming context, a table is a stored version of a stream or a table of events. Ballerina provides support to interactively query the state of the stored events in the table when processing events which are arrived through a stream.

Example Query

Progress So Far

  • Patterns and Sequences
  • State Snapshot Persistence

How to Use?

If you want to tryout streaming capabilities in Ballerina then run bal files with --experimental flag. For example, if you want to run the ballerina file abcd.bal then run/compile the code with command ballerina run --experimental abcd.bal .

Please try out and let us know your feedback. Use Ballerina release version 0.990.1 or above for this.

Thank you…

https://www.flickr.com/photos/jayneandd/4450623309/

Senior Tech Lead, Speaker @ WSO2. Closely works on Stream Processing and Integration