Microsoft StreamInsight CEP is a very powerful platform for developing complex event processing (CEP) systems. There are several development models that we can follow. In this post we will use the IObservable/IObserver model using .NET Reactive extensions (Rx). Since this will be a real-time application, we will also be using F# async workflows to pull stock data.
F# async workflows are the coolest part of using F# in a real-time application. They allow writing concise code that (1) executes in parallel and (2) exposes to another .NET library with ease.
I won’t go into detail about F# except for the async workflow used in this application. There is a three-part series on using design patterns for F# async workflows; I have used Pattern #3 in this post, since we are using Rx to invoke the workflows. In this design pattern, the worker reports the progress through events—a modified version of AsyncWorker<> code is shown below,
type JobCompletedEventArgs<'T>(job:int, result:'T) = inherit EventArgs() member x.Job with get() = job member x.Result with get() = result type AsyncWorker<'T>(jobs: seq>) = // This declares an F# event that we can raise let allCompleted = new Event<'T[]>() let error = new Event() #if WPF let canceled = new Event() #else let canceled = new Event() #endif let jobCompleted = new Event>() let cancellationCapability = new CancellationTokenSource() /// Start an instance of the work member x.Start() = // Capture the synchronization context to allow us to raise events back on the GUI thread let syncContext = SynchronizationContext.CaptureCurrent() // Mark up the jobs with numbers let jobs = jobs |> Seq.mapi (fun i job -> (job, i+1)) let raiseEventOnGuiThread(evt, args) = syncContext.RaiseEvent evt args let work = Async.Parallel [ for (job,jobNumber) in jobs -> async { let! result = job syncContext.RaiseEvent jobCompleted (new JobCompletedEventArgs<'T>(jobNumber, result)) return result } ] Async.StartWithContinuations ( work, (fun res -> raiseEventOnGuiThread(allCompleted, res)), (fun exn -> raiseEventOnGuiThread(error, exn)), (fun exn -> raiseEventOnGuiThread(canceled, exn)), cancellationCapability.Token) /// Raised when a particular job completes [] member x.JobCompleted = jobCompleted.Publish /// Raised when all jobs complete [] member x.AllCompleted = allCompleted.Publish /// Raised when the composition is cancelled successfully [] member x.Canceled = canceled.Publish /// Raised when the composition exhibits an error [] member x.Error = error.Publish
We have used [] attributes to mark these events for exposing to other .NET CLI languages. Since we are using Rx, we need to have an event that inherits from System.EventArgs–JobCompletedEventArgs does that here. The AsyncWorker is now ready to be used as a library for running parallel code.
Stock Quotes Reader
The Stock Quotes Reader defines a wrapper that performs a request to the server (it would be Yahoo finance here) and pulls the stocks.
type StockAvailableEventArgs(stocks:string[]) = inherit EventArgs() member x.Stocks with get() = stocks type StockQuotesReader(quotes:string) = let stockAvailableEvent = new Event() let httpLines (uri:string) = async { let request = WebRequest.Create uri use! response = Async.FromBeginEnd(request.BeginGetResponse, request.EndGetResponse) use stream = response.GetResponseStream() use reader = new StreamReader(stream) let lines = [ while not reader.EndOfStream do yield reader.ReadLine() ] return lines } // n - name, s - symbol, x - Stock Exchange, l1 - Last Trade, p2 - change in percent, h - high, l - low, o - open, p - previous close, v - volume let yahooUri (quotes:string) = let uri = String.Format("http://finance.yahoo.com/d/quotes.csv?s={0}&f=nsxl1hlopv", quotes) uri member x.GetStocks() = let stocks = [httpLines(yahooUri quotes)] stocks // member x.TestAsync() = // let stocks = httpLines(yahooUri quotes) // Async.RunSynchronously(stocks) member x.PullStocks() = let stocks = x.GetStocks() let worker = new AsyncWorker<_>(stocks) worker.JobCompleted.Add(fun args -> stockAvailableEvent.Trigger(new StockAvailableEventArgs(args.Result |> List.toArray)) ) worker.Start() static member GetAsyncReader(quotes) = let reader = new StockQuotesReader(quotes) let stocks = reader.GetStocks() let worker = new AsyncWorker<_>(stocks) worker [] member x.StockAvailable = stockAvailableEvent.Publish
The above wrapper class does some interesting things:
CEP Client
On the CEP client we will be using the following things:
· Syncfusion WPF GridDataControl – works well with high-speed data changes, keeping CPU usage to a minimum.
· Rx – creates requests and updates the ViewModel bound to the grid.
Application Setup
The WPF application uses a simple MV-VM by defining a StocksViewModel to hold stock data. The Stocks collection is bound to the Syncfusion WPF GridDataControl.
Using Rx to create requests
This real-time application requires real-time data that will be pulled over the wire for every 500 milliseconds. We will be making use of IObservable to create a streaming request and repeat that over a time delay.
private void RealTimeStocks(int delay, string quotes) { var stockReader = new StockQuotesReader(quotes); var stockFeeds = Observable.Defer(() => { stockReader.PullStocks(); var evt = from e in Observable.FromEvent(stockReader, "StockAvailable") select new { Stocks = e.EventArgs.Stocks.ToStockQuotes() }; var delayedEvt = Observable.Return(evt).Delay(TimeSpan.FromMilliseconds(delay)); return delayedEvt; }).Repeat(); var stocks = from s in stockFeeds from t in s select t.Stocks; stocks.SubscribeOnDispatcher().Subscribe((stockQuotes) => { this.AddOrUpdateModel(stockQuotes); }); }
We now have streaming real-time stock data pulled asynchronously over the Web and shown on the Syncfusion GridDataControl.
This also works with Essential Grid Silverlight in out-of-browser (OOB) mode. If you want to get hold of the sample, send me a request.