Microsoft StreamInsight CEP is a very powerful platform for developing complex event processing (CEP) systems. There are several development models that we can follow. In this post we will use the IObservable/IObserver model using .NET Reactive extensions (Rx). Since this will be a real-time application, we will also be using F# async workflows to pull stock data.
F# async workflows are the coolest part of using F# in a real-time application. They allow writing concise code that (1) executes in parallel and (2) exposes to another .NET library with ease.
I won’t go into detail about F# except for the async workflow used in this application. There is a three-part series on using design patterns for F# async workflows; I have used Pattern #3 in this post, since we are using Rx to invoke the workflows. In this design pattern, the worker reports the progress through events—a modified version of AsyncWorker<> code is shown below,
type JobCompletedEventArgs<'T>(job:int, result:'T) =
inherit EventArgs()
member x.Job with get() = job
member x.Result with get() = result
type AsyncWorker<'T>(jobs: seq>) =
// This declares an F# event that we can raise
let allCompleted = new Event<'T[]>()
let error = new Event()
#if WPF
let canceled = new Event()
#else
let canceled = new Event()
#endif
let jobCompleted = new Event>()
let cancellationCapability = new CancellationTokenSource()
/// Start an instance of the work
member x.Start() =
// Capture the synchronization context to allow us to raise events back on the GUI thread
let syncContext = SynchronizationContext.CaptureCurrent()
// Mark up the jobs with numbers
let jobs = jobs |> Seq.mapi (fun i job -> (job, i+1))
let raiseEventOnGuiThread(evt, args) = syncContext.RaiseEvent evt args
let work =
Async.Parallel
[ for (job,jobNumber) in jobs ->
async { let! result = job
syncContext.RaiseEvent jobCompleted (new JobCompletedEventArgs<'T>(jobNumber, result))
return result } ]
Async.StartWithContinuations
( work,
(fun res -> raiseEventOnGuiThread(allCompleted, res)),
(fun exn -> raiseEventOnGuiThread(error, exn)),
(fun exn -> raiseEventOnGuiThread(canceled, exn)),
cancellationCapability.Token)
/// Raised when a particular job completes
[]
member x.JobCompleted = jobCompleted.Publish
/// Raised when all jobs complete
[]
member x.AllCompleted = allCompleted.Publish
/// Raised when the composition is cancelled successfully
[]
member x.Canceled = canceled.Publish
/// Raised when the composition exhibits an error
[]
member x.Error = error.Publish
We have used [
Stock Quotes Reader
The Stock Quotes Reader defines a wrapper that performs a request to the server (it would be Yahoo finance here) and pulls the stocks.
type StockAvailableEventArgs(stocks:string[]) =
inherit EventArgs()
member x.Stocks with get() = stocks
type StockQuotesReader(quotes:string) =
let stockAvailableEvent = new Event()
let httpLines (uri:string) =
async { let request = WebRequest.Create uri
use! response = Async.FromBeginEnd(request.BeginGetResponse, request.EndGetResponse)
use stream = response.GetResponseStream()
use reader = new StreamReader(stream)
let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ]
return lines }
// n - name, s - symbol, x - Stock Exchange, l1 - Last Trade, p2 - change in percent, h - high, l - low, o - open, p - previous close, v - volume
let yahooUri (quotes:string) =
let uri = String.Format("http://finance.yahoo.com/d/quotes.csv?s={0}&f=nsxl1hlopv", quotes)
uri
member x.GetStocks() =
let stocks = [httpLines(yahooUri quotes)]
stocks
// member x.TestAsync() =
// let stocks = httpLines(yahooUri quotes)
// Async.RunSynchronously(stocks)
member x.PullStocks() =
let stocks = x.GetStocks()
let worker = new AsyncWorker<_>(stocks)
worker.JobCompleted.Add(fun args ->
stockAvailableEvent.Trigger(new StockAvailableEventArgs(args.Result |> List.toArray))
)
worker.Start()
static member GetAsyncReader(quotes) =
let reader = new StockQuotesReader(quotes)
let stocks = reader.GetStocks()
let worker = new AsyncWorker<_>(stocks)
worker
[]
member x.StockAvailable = stockAvailableEvent.Publish
The above wrapper class does some interesting things:
- It has an async block code that returns a line of data based on the response stream.
- PullStocks will create async requests and raise the StockAvailable event whenever the async job is completed.
CEP Client
On the CEP client we will be using the following things:
· Syncfusion WPF GridDataControl – works well with high-speed data changes, keeping CPU usage to a minimum.
· Rx – creates requests and updates the ViewModel bound to the grid.
Application Setup
The WPF application uses a simple MV-VM by defining a StocksViewModel to hold stock data. The Stocks collection is bound to the Syncfusion WPF GridDataControl.
Using Rx to create requests
This real-time application requires real-time data that will be pulled over the wire for every 500 milliseconds. We will be making use of IObservable to create a streaming request and repeat that over a time delay.
private void RealTimeStocks(int delay, string quotes)
{
var stockReader = new StockQuotesReader(quotes);
var stockFeeds = Observable.Defer(() =>
{
stockReader.PullStocks();
var evt = from e in Observable.FromEvent(stockReader, "StockAvailable")
select new { Stocks = e.EventArgs.Stocks.ToStockQuotes() };
var delayedEvt = Observable.Return(evt).Delay(TimeSpan.FromMilliseconds(delay));
return delayedEvt;
}).Repeat();
var stocks = from s in stockFeeds
from t in s
select t.Stocks;
stocks.SubscribeOnDispatcher().Subscribe((stockQuotes) =>
{
this.AddOrUpdateModel(stockQuotes);
});
}
We now have streaming real-time stock data pulled asynchronously over the Web and shown on the Syncfusion GridDataControl.
This also works with Essential Grid Silverlight in out-of-browser (OOB) mode. If you want to get hold of the sample, send me a request.