Using Complex Event Processing (CEP) with Microsoft StreamInsight to Analyze Twitter Tweets 3: StreamInsight Programming
Note: This post is one of a series, the overview can be found here: Complex Event Processing with StreamInsight
In this part, we ar going to look at the Streaminsight programming model.
What do we need?
- Visual Studio. In order to use StreamInsight there is always some .NET coding required.
- A hosted StreamInsight server. The server can be installed from the StreamInsight executable as a windows service. (about 10MB)
- The StreamInsight Installer executable. It contains the server component and the assemblies that need to be referenced in Visual Studio.
- An SqlServer license. For productive use of StreamInsight, we need a valid SqlServer license.
Important to note:
- We do not need an SqlServer instance in order to use StreamInsight
- StreamInsight is more a framework than a complete product. There is always some coding involved.
This section will explain the artifacts and concepts that need to be understood in order to start developing the StreamInsight solution.
A StreamInsight solution always consists of the items listed below. In the demo application we will see all of these again in our code.
- A StreamInsight server. The server usually runs as a windows service and can provide multiple instances where we deploy our data sources, sinks and queries.
- A StreamInsight Application. The StreamInsight application is a logical container in the server that contains our artifacts.
- A data source. The source can either be standing, e.g. historical data or it can be moving data from a live data stream. implementation is the same.
- A data sink. The place where StreamInsight puts its output.
- A query. The query is written in LINQ
- A binding. The binding connects the query to a sink.
StreamInsight Event Types
The data items that we feed from our .NET code into and out of the StreamInsight engine are always generic types with a POCO payload object. These generic types contain temporal information such as start and end times plus the POCO payload object. We differentiate three different types according to the nature of the event:
- PointEvent<T>: A point event is an event with a single timestamp. It can be used for events that occur exactly once and would be visualized in the form of a point.
- IntervalEvent<T>: An interval event is an event with a start time, an end time and a fixed duration larger than 0.
- EdgeEvent<T>: Edge events are as well defined by a start and an end point. The difference to IntervalEvent is that as the start time passes and the interval begins, we do not yet know either the duration nor the end time of the event.
Another event type in contrast to the data events are Current Time Increments (CTIs). StreamInsight needs to have CTI events in the stream that it is going to analyze. The developer has to define how CTI are inserted into the stream. CTIs mark constant time windows inside the stream:
StreamInsight needs CTIs to process the stream for the following reasons:
- Stream processing: A CTI “closes” the current timeframe. Let’s make an example: If we work with 1-minute CTIs, when the 12:05 CTI arrives, StreamInsight will process the 12:04 – 12:05 window. If we have late arriving events that belong to the window but arrive after the 12:05 CTI, StreamInsight will ignore them.
- Stream sync: If we evaluate data from multiple streams, StreamInsight will synchronize the two streams based on their respective CTIs.
- Non-events: We can look for missing events in CTI windows. We could use this approach to implement a monitoring solution that regularly checks on a heartbeat signal.
Getting the data in and out of StreamInsight
In order to connect StreamInsight to data sources or data consumers we need to start programming.
StreamInsight knows 2 different implementation approaches. The first one is the adapter approach. It has been there forever and can be used for complex input and output. As of StreamInsight 2.1, the adapter approach is often called the legacy approach.
The second approach is a feature of StreamInsight 2.1 and relies on existing classes from .NET or from the .NET Reactive Extensions (Rx Extensions). The idea is that if we already have a class that implements the interfaces IEnumerable<T> or IObservable<T> we can hook these into our processing.
The Adapter (Legacy) Approach
The adapter approach is pretty straightforward. We use the IInputAdapter<T> and IOutputAdapter<T> interfaces and implement them in our classes. Then we provide a config and a factory class and instruct StreamInsight to request an adapter from the factory.
The StreamInsight 2.1 Interface Approach
The interface approach is said to be much easier. I personally think that it is way harder to understand for a beginner. There are a lot of interfaces and once you grasp the purpose of each one, it starts to get more understandable.
We have three groups of interfaces that we use depending on the nature of our data source:
- IEnumerable<T>/IQueryable<T>: If we have a collection as a data source.
- IObservable<T>, IQbServable<T>: If we have a stream as a data source.
- IQStreamable<T>: IQStreamable is the interface that StreamInsight understands and describes a data stream.
StreamInsight provides extension methods that let us create an IQStreamable object from any of the other interfaces. This is very convenient for us. In short, this means that we have to create or convert our data source into IEnumerable (for PULL) of IObservable (for PUSH) and can then convert this object in to an IQStreamable.
I will go into the details of all the interfaces in the next chapter.