Note: This post is one of a series, the overview can be found here: Complex Event Processing with StreamInsight
The sample code to the application can be downloaded here:
Writing the Sample Application
In this post we are going to write a sample application from scratch. We start by installing StreamInsight and then jump into Visual Studio and start coding the project. First, we are going to use a library to get access to the Twitter feed. As soon as we have the basics we elaborate on some interesting queries on the Twitter data stream.
Installing StreamInsight
First, we need to download the StreamInsight installer executable. We can find it in the Microsoft Download Center under: http://www.microsoft.com/en-us/download/details.aspx?id=30149
We need the StreamInsight.msi Installers corresponding to your target platform bitness.
During the installation you will be asked if you would like to create a StreamInsight instance. Do so and give it any name you like. In this example I am going to name it “Thomas”.
Creating the Visual Studio Solution
Now, let’s fire up Visual Studio. I am using VS2012 but you can use VS2010 as well. We make it simple and create a new console project. Then we need to add the Microsoft StreamInsight assemblies. Add assembly references to the following files:
- Microsoft.ComplexEventProcessing: Found in the StreamInsight install directory: C:\Program Files\Microsoft StreamInsight 2.1\Bin\
- System.Reactive: Found under .NET 4.0
- System.Reactive.Providers: Found under .NET 4.0
Your References should look like this:
Next, we integrate Twitter into our solution. We use a handy library named Tweetinvi and install it via NuGet Package Manager. Maybe you need to install the NuGet Package Manager in your Visual Studio, if it is not available. Right-click on the References folder in the Solution Explorer and select “Manage NuGet Packages”. Do an online search for “Tweetinvi” and hit “install”.
First we are going to write a data class that will be our payload object. We call it TweetItem and fill it with a couple of properties. For the sake of simplicity, we restrict our item to the following fields: User, Text, CreationDate, Language, Followers, Friends.
In our TweetItem class we provide a constructor that accepts an object that implements the ITweet interface and converts it into a TweetItem. The ITweet interface can be found in the Tweetinvi Library and contains all the fields that we get for each tweet.
Creating the Twitter Observable
Next, we create the class that we use as our data source and feed into StreamInsight. Create a new class called TwitterStream.cs and implement the IObservable<TweetItem> interface. The interface contains a Subscribe() method:
First, we are going to add a list of IObserver<TweetItem> objects. Then we add code to the Subscribe() method that adds the observer parameter to the observer list.
The result of the subscribe method is an object that implements IDisposable. The subscriber can use it to cancel his subscription. The subscription will end when the subscriber disposes the object that he received upon subscription. We implement a small Unsubscriber class that implements IDisposable and keeps references to the observer list and its own observer instance. Upon disposal, it removes its own reference from the observer collection.
Next we need a Start() method that connects to twitter and starts receiving Tweets. In order to receive Tweets from Twitter you will need to register on the Twitter website and get the following four credential keys: UserKey, UserSecret, ConsumerKey, ConsumerSecret. You can start on: https://dev.twitter.com/docs. Once you can login to dev.twitter.com you can create an application and will get your authentication information.
There is a free subscription to Twitter that will give you one percent of all the worldwide Tweets. Once you have your credentials, we can implement the start method:
The start method uses the Token and SimpleStream classes from the Tweetinvi namespace. Note that the SimpleStream class has nothing to do with StreamInsight. In the StartStream() method we provide a lambda expression that points to the OnNewTweet(tweet) method that we will define next. The OnNewTweet() method is used to map the Tweets into TweetItems and forward each Tweet that we receive to all the items in our observer collection:
Finally, we implement the constructor. We initialize the myObservers collection and we call the Start() method. We execute the Start() method on a background thread using the Task Parallel Library (TPL).
In the next post we put it all together and write our first query. Stay tuned!