Story Behind Ballerina Streams

Mohanadarshan Vivekanandalingam
8 min readDec 9, 2018

In my previous post, I have discussed on Stream Processing and its need in the integration world; in this post, I am mainly focusing on Ballerina Streams and how it is evolved over the time, promising capabilities and its direction.

The Start

After the evolution of Ballerina programming language, we have identified the need for the Streaming capabilities in the language. If you have a question on “why” then better you read the previous article.

As the first step, we wanted to bring the streaming capabilities to Ballerina language within a very short time period. As some of you aware, we already had an in-house built and most powerful Complex Event Processing engine; which is called as Siddhi. After few rounds of heated discussions we came up with a streaming language syntax for Ballerina; it was more like a marriage between Siddhi query syntax and Ballerina programming syntax.

Siddhi Streaming Query Syntax

from TempStream#window.time(10 min)
select avg(temp) as avgTemp, roomNo, deviceID
insert into AvgTempStream;

Ballerina Streaming Query Syntax

forever {
from tempStream window time(600000)
select avg(tempStream.temp) as avgTemp, tempStream.roomNo,
tempStream.deviceID
=> (AvgTemperature [] values) {
foreach var value in values {
avgTempStream.publish(value);
}
}
}

Within 2 months time period (with 2 engineers), we were able to integrate the Siddhi CEP engine with Ballerina language. This is more likely a marathon implementation. In this approach when a user writes the streaming query in Ballerina syntax it will be converted into the Siddhi query and pushed into the Siddhi runtime as shown in below figure.

Query Flow in Siddhi Based Streaming Implementation

Even Though, this approach caters necessary capabilities that required for Streaming needs; we also identified some areas that we wanted to improve. Below are the key reasons behind it.

  • Inability to access the runtime variables and functions which are in Ballerina runtime from Streaming queries which runs in Siddhi engine.
  • Underneath implementation language of Siddhi is not full comparable with Ballerina.

Due to these reasons, our direction changed towards implementing the streaming processing capabilities from the scratch using the Ballerina language itself. This also helps us to prove the feature richness of the Ballerina language and stand as an example. Now, rest of this post discusses on Ballerina Streams implementation which done using Ballerina language itself.

Streaming Implementation — For Ballerina By Ballerina

It was decided to implement the streaming capabilities for Ballerina programming language using the Ballerina itself. In this approach, we have implemented the streaming constructs such as windows, aggregators, groupby and etc.. as reusable models and they were used to build complete queries.

Query Flow in Native Ballerina Streaming Implementation

As shown in above Figure, streaming query written by a developer goes through few phases at the compile time. It passes through the lexical analyzer, parser, semantic analyzer, code analyzer, taint analyzer, desugar and code generator. Here, when streaming query reaches to the Desugar phase, the respective query is converted into typical Ballerina first-class constructs and code is converted into another form by removing the syntactic sugar. Then, the desugared code is code gen in the code generator phase. In the desugaring phase, the pre-built streaming models such as windows, aggregators and etc.. are used to perform this.

Note: If you like to understand more about the Ballerina Compiler and Ballerina Virtual Machine then please refer below posts written by Sameera Jayasoma, lead architect of Ballerina Compiler and Runtime.

As per the above approach, the streaming query written by a user is converted into typical Ballerina code. For example,

Above streaming query will be desugared into below Ballerina code and execution continues.

As you can see, a simple streaming query which is written by the developer is converted into a few lines of code. This reduces the complexity of dealing the huge code for stream processing and provides a very simple and easily understandable layer for the developers.

Ballerina Streaming Capabilities

Ballerina Streaming Implementation supports below streaming constructs. These constructs are required enough to achieve most of the streaming needs in the integration world.

Projection

This involves selecting only some of the attributes from the input stream to be inserted into an output stream, renaming the attributes, add constant values to the output stream and perform logical or mathematical operations.

Example Query

Filter

Filters are included in queries to filter information from input streams based on a specified condition.

Purpose : A filter allows you to separate events that match a specific condition as the output or for further processing.

Example Query

Window

Windows allow you to capture a subset of events based on a specific criterion from an input stream for calculation. Each input stream can only have a maximum of one window.

Purpose : To create subsets of events within a stream based on time duration, number of events, etc for processing. A window can operate in a sliding or tumbling (batch) manner.

At the moment, Ballerina Streaming supports below window implementations.

  • time
  • timeBatch
  • timeLength
  • length
  • lengthBatch
  • externalTime
  • externalTimeBatch
  • uniqueLength
  • delayWindow
  • sort

Example Query

Aggregate Functions

Aggregate functions perform aggregate calculations in the query. When a window is defined the aggregation is restricted within that window. If no window is provided aggregation is performed from the start.

Below aggregations are supported at the moment.

  • avg : Calculates the average for a given argument for all the events.
  • sum : Returns the sum of a given argument for all the events.
  • max : Returns the maximum value of a given argument for all the events.
  • min : Returns the minimum value of a given argument for all the events.
  • count : Returns the count of all the events.
  • distinctCount : Returns the count of distinct occurrences for a given argument.
  • maxForever : This stores the maximum value for a given attribute throughout the lifetime of the query regardless of any windows in-front.
  • minForever : This stores the minimum value for a given attribute throughout the lifetime of the query regardless of any windows in-front.
  • stdDev : Returns the calculated standard deviation of a given argument for all the events.

Example Query

Group By

The group by clause allows you to group the aggregate based on specified attributes.

Sample Query

Having

The having clause allows you to filter events after processing the select statement.

Sample Query

Order By

The order by clause allows you to order the aggregated result in ascending and/or descending order based on specified attributes. By default ordering will be done in ascending manner. User can use descending keyword to order in descending manner.

Sample Query

Stream Joins

Joins allow you to get a combined result from two streams in real-time based on a specified condition.

Purpose : Streams are stateless. Therefore, in order to join two streams, they need to be connected to a window so that there is a pool of events that can be used for joining. Joins also accept conditions to join the appropriate events from each stream.

During the joining process each incoming event of each stream is matched against all the events in the other stream’s window based on the given condition, and the output events are generated for all the matching event pairs.

Unidirectional Join Operation

By default, events arriving at either stream can trigger the joining process. However, if you want to control the join execution, you can add the unidirectional keyword next to a stream in the join definition as depicted in the syntax in order to enable that stream to trigger the join operation. Here, events arriving at other stream only update the window of that stream, and this stream does not trigger the join operation.

Note : The unidirectional keyword cannot be applied to both the input streams because the default behavior already allows both streams to trigger the join operation.

Following are the supported operations of a join clause.

Inner join (join)

This is the default behavior of a join operation. join is used as the keyword to join both the streams. The output is generated only if there is a matching event in both the streams.

Left outer join

The left outer join operation allows you to join two streams to be merged based on a condition. left outer join is used as the keyword to join both the streams. Here, it returns all the events of left stream even if there are no matching events in the right stream by having null values for the attributes of the right stream.

Right outer join

This is similar to a left outer join. right outer join is used as the keyword to join both the streams. It returns all the events of the right stream even if there are no matching events in the left stream.

Full outer join

The full outer join combines the results of left outer join and right outer join. full outer join is used as the keyword to join both the streams. Here, output events are generated for each incoming event even if there are no matching events in the other stream.

Example Query

Table Operations with Stream

Ballerina provides extensive support to deal with tables. It provides various types of operations such as create, insert, delete and etc.. with in-memory or external storage tables. You could refer below link to find more information on this.

In Streaming context, a table is a stored version of a stream or a table of events. Ballerina provides support to interactively query the state of the stored events in the table when processing events which are arrived through a stream.

Example Query

Progress So Far

As of now, Ballerina based Streaming Implementation provides almost all of the features (as given in the previous section) that required for stream processing. We are currently focusing on below areas as the next step.

  • Patterns and Sequences
  • State Snapshot Persistence

How to Use?

Stream processing capabilities which are available in Ballerina added as the experimental feature. Current Ballerina distribution comes with both Siddhi based Streams and native Ballerina based streams implementation. And Ballerina based streaming implementation is enabled by default from Ballerina release 0.990.1.

If you want to tryout streaming capabilities in Ballerina then run bal files with --experimental flag. For example, if you want to run the ballerina file abcd.bal then run/compile the code with command ballerina run --experimental abcd.bal .

Please try out and let us know your feedback. Use Ballerina release version 0.990.1 or above for this.

Thank you…

https://www.flickr.com/photos/jayneandd/4450623309/

--

--

Mohanadarshan Vivekanandalingam

Senior Tech Lead, Speaker @ WSO2. Closely works on Stream Processing and Integration