Using Microsoft Reactive Extensions to orchestrate time-bound data retrieval

Microsoft Reactive Extensions (usually referred to simply as Rx) is a library for orchestrating and synchronising asynchronous operations. It’s based on a beautiful mathematical duality between IEnumerable/IEnumerator and their new counterparts (included in .NET 4), IObservable/IObserver. Documentation is unfortunately somewhat scarce and beyond the clichéd dictionary suggest and drag-and-drop examples it’s quite hard to find sample code. As ever though, the best way to learn something is to try and use it to solve a real-world problem.

Essentially you can think observables as push-based collections. Instead of pulling from an enumerable (e.g. with a for-each loop), the data is pushed at you by the observable.

A little background

One of my company’s websites displays statistics on various pages. The queries are dynamic enough and plenty enough that it is not practical to pre-calculate and cache all the results every few minutes, so instead we operate on an 80/20 rule. That is, 80% of our website views occur on 20% of the pages (usually new content on the homepage, or content that is newly linked to from other popular sites). Therefore we cache result of the each database query in memcached for a few minutes, the cache-key being a hashcode of the SQL query (that’s a simplification – we actually serialize the LINQ expression tree, but that’s for another blog post).

Sometimes uncached statistics take a while to retrieve depending database load and latency. Since our primary concerns are total page load time/responsiveness we simply abort the request and hide the statistics from the page if they are not retrieved within a fixed amount of time. The initial implementation of this simply aborted the thread if a certain timeout had elapsed. Unfortunately this solution has a big problem.

The death spiral

The trouble with aborting the thread is that if a database operation times out, the result never makes it into the cache. This means the next time the page is hit another cache miss occurs and the SQL database gets hit again. Since this query is identical to the first it will probably also time out. The database load keeps increasing because it is repeatedly being hit with the same query whilst the result is never cached.

The requirements in brief

The basic logic we need is therefore as follows:

• Page view generates request for data.
• Cache is checked for a specific key.
• On cache timeout/error – cancel operation. Don’t hit SQL because if the cache is down it’s better to display the pages without the statistics and avoid hammering the SQL server.
• On cache hit – return data.
• On cache miss – request data from SQL.
• On SQL timeout – hide the control but continue fetching the data in the background and place it into the cache when it finally returns.
• On SQL success – return data, enter it into the cache.
• On SQL error – abort, hide control.

The problem

Trying to write this logic using threading, locks and traditional synchronisation constructs is difficult, bug-prone, and results in horrific spaghetti-code.

Reactive Extensions to the rescue

Reactive Extensions provides us with a much nicer way to deal with these kinds of asynchronous operations.

To keep the example simple, I’ll use a console application instead of a web-app, and simulate the cache and SQL database. I’ll also forget about using the SQL query as the cache key and use an entity id instead. In order to run this example you will need to have the reactive extension assemblies installed which can be downloaded from devlabs.

The example makes use of a number of extension methods provided by Rx:

• Defer – This defers an operation until an observable is subscribed to.
• Return – This creates an observable that returns a single result.
• Timeout – Causes an observable to throw an exception after a specified timeout. Note that although this means the observable is disposed and no further results will be yielded, the operation will continue to run. This is useful when you have side effects that need to occur, in this case, placing the result of long-running SQL query into the cache.
• Catch – Specifies another observable sequence to continue with when an exception occurs.
• Take – This is analogous to traditional LINQ. Remember though that unlike First() this does not cause execution of the query and so does not block.

It also makes use of the Subject class. This is a special class that acts as both an observer and an observable. It allows multiple subscriptions to a single stream of events. It may not strictly be necessary in this example but I have found introducing subjects helps to avoid the easy mistake of subscribing twice to an observer and causing two lots of side-effects to occur.

With further ado, the code. You will need to add project references to System.CoreEx and System.Reactive.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace CacheExample
{
public class CacheMissException : ApplicationException
{

}

// Represents the entity we are trying to retrieve from the cache
// or database
public class ResultEntity
{
public ResultEntity(string value)
{
Value = value;
}

public string Value { get; set; }
}

public interface IResultRepository
{
ResultEntity GetResultById(int id);
}

public class DatabaseRepository : IResultRepository
{
public ResultEntity GetResultById(int id)
{
Console.WriteLine("Retrieving results from database...");

// Increment the following wait time to simulate a
//database timeout.
Thread.Sleep(150);

// Note that this code is still executed even if the
// observer is disposed.
// This, conveniently, allows for "side-effects".
// In this case we could put the result into the
//cache so the next user gets a cache hit!
Console.WriteLine("Retrieved result from database.");
return new ResultEntity("Database Result");
}
}

public class CacheRepository : IResultRepository
{
public ResultEntity GetResultById(int id)
{
Console.WriteLine("Retrieving result from cache...");

//Increment the following value to simulate a cache timeout.
Thread.Sleep(20);

//Uncomment the next line to simulate a cache miss
//throw new CacheMissException();

Console.WriteLine("Retrieved result from cache!");
return new ResultEntity("Cached Result");
}
}

class Program
{
static readonly IResultRepository cacheRepository =
new CacheRepository();
static readonly IResultRepository databaseRepository =
new DatabaseRepository();

static void Main(string[] cmdLineParams)
{
int id = 123;
var cacheTimeout = TimeSpan.FromMilliseconds(50);
var databaseTimeout = TimeSpan.FromMilliseconds(200);

var cacheObservable = Observable.Defer(
()=>Observable.Return(
cacheRepository.GetResultById(id)));
var databaseObservable = Observable.Defer(
()=>Observable.Return(
databaseRepository.GetResultById(id)));

// Try to retrieve the result from the cache, falling over
// to the DB in case of cache miss.
var cacheFailover = (cacheObservable
.Timeout(cacheTimeout))
.Catch<ResultEntity, CacheMissException>(
(x) =>
{
Console.WriteLine("Cache miss. Attempting to
retrieve from database.");
return databaseObservable
.Timeout(databaseTimeout);
}
)
.Catch<ResultEntity, TimeoutException>(
(x) =>
{
Console.WriteLine("Time out retrieving result
from cache. Giving up.");
return Observable.Empty<ResultEntity>();
}
);

var result = new Subject<ResultEntity>();
result.Take(1).Subscribe(
x=> Console.WriteLine("SUCCESS: Result: " + x.Value),
x=> Console.WriteLine("FAILURE: Exception!"),
() => Console.WriteLine("Sequence finished."));

cacheFailover.Subscribe(result);

Console.WriteLine("Press any key to exit.");
Console.ReadKey();
}
}
}



I would recommend playing around with the code. Experiment with adjusting the timeouts and uncommenting the lines with notes by them to see what happens in different scenarios. If you haven’t used Rx before wrapping your head around observables can take a while. I would thoroughly recommend taking some time to watch the various channel 9 videos.

Advertisements

Town Crier – An open-source e-mail templating engine for .NET

In medieval times, town criers were the primary means of making announcements to a community. Nowadays a man with a bell is a very imaginative – but not particularly practical – means of communication.

One common scenario, especially in the business world, is the need to send out an email to a large number of people. Of course a big anonymous email lacks the friendliness of the local loud-mouthed peasant and so we try to personalise the emails with individuals’ names etc.

I suspect most .NET developers have come across this problem at some point in their career. This generally leads to a lot of messy string concatenation and trying to manhandle the System.Net.Mail.SmtpClient into doing what you want. With text-based emails this is ugly, when HTML is involved it becomes a world of pain.

Town Crier is a project I have been working on to simplify this scenario. The basic workflow for sending a templated e-mail is as follows:

1. Create an email template.
This can be either a plain-text or HTML file (or both). Tokens to be replaced are written like this: {%= customersname %}

Sample email templates:

2. Write some very simple code in the CLR language of your choice, in this case C#:
var factory = new MergedEmailFactory(new TemplateParser());

var tokenValues = new Dictionary<string, string>
{
{"name", "Joe Bloggs"},
{"age", "21"}
};

MailMessage message = factory
.WithTokenValues(tokenValues)
.WithSubject("Test Subject")
.WithHtmlBodyFromFile(@"templates\sample-email.html")
.WithPlainTextBodyFromFile(@"templates\sample-email.txt")
.Create();

var from = new MailAddress("sender@test.com", "Automated Emailer");
var to = new MailAddress("recipient@test.com", "Joe Bloggs");
message.From = from;
message.To.Add(to);

var smtpClient = new SmtpClient();
smtpClient.Send(message);


Of course it’s then trivial to loop through rows in a database, populate the dictionary and perform a “mail-merge” programatically.

One final handy tip – there is included a handy extension method to allow you to save the message to a .eml file:

message.Save(new FileStream(@"output.eml", FileMode.CreateNew));

That’s pretty much it! It’s fairly basic but I’ve found it to be very useful. It’s also my first open-source project so please be nice!

I am releasing it under the Lesser GNU Public Licence. Go grab the sources at GitHub.

Implementing map-reduce in F#

Introduction

MapReduce is a software paradigm popularised by Google in which we take a set of tuples (key-value pairs), transform (map) them into an intermediate set of key-value pairs, and then perform some aggregation (reduce) operation on the intermediate values to obtain a result set. This is a useful way to express a problem because it yields an obvious way to “divide and conquer” the computation in a way that lends itself to parallel/distributed computing, thus providing a fairly simple way to perform computations on extremely large data sets.

It can be quite difficult to grok at first, so I decided to try implementing one of the examples from the MongoDB documentation in F# (if interested, see shell example 2). In this example, we have a list of people and the types of pet each of them has. We wish to calculate the total number of each animal.

The Code

Again, F# proves to be a remarkably succinct language to express problems, in this case the built in syntactic sugar for tuples is a godsend!

UPDATE (25-May-2010) – Controlflow helpfully suggested that I could make my original code somewhat neater by using pattern matching to decompose tuples. I’ve updated the code below with these improvements.

#light

// Simple example of map-reduce  in F#
// Counts the total numbers of each animal

// Map function for our problem domain
let mapfunc (k,v) =
v |> Seq.map (fun(pet) -> (pet, 1))

// Reduce function for our problem domain
let reducefunc (k,(vs:seq<int>)) =
let count = vs |> Seq.sum
k, Seq.ofList([count])

// Performs map-reduce operation on a given set of input tuples
let mapreduce map reduce (inputs:seq<_*_>) =
let intermediates = inputs |> Seq.map map |> Seq.concat
let groupings = intermediates |> Seq.groupBy fst |> Seq.map (fun(x,y) -> x, Seq.map snd y)
let results = groupings |> Seq.map reduce
results

// Run the example...
let alice = ("Alice",["Dog";"Cat"])
let bob = ("Bob",["Cat"])
let charlie = ("Charlie",["Mouse"; "Cat"; "Dog"])
let dennis = ("Dennis",[])

let people = [alice;bob;charlie;dennis]

let results = people |> mapreduce mapfunc reducefunc

for result in results do
let animal = fst result
let count = ((snd result) |> Seq.toArray).[0]
printfn "%s : %s" animal (count.ToString())

printfn "Press any key to exit."

System.Console.ReadKey() |> ignore



This yields the expected results:

Dog : 2

Cat : 3

Mouse : 1

Exercise for the reader

Parallelise this implementation (for a single machine this should be trivial by using the Parallel LINQ integration provided in the F# Powerpack).

Modelling heat transfer in F# using 100 lines of code

The aim

Imagine we have a square coaster upon which we place a hot mug of tea. We wish to model the distribution of temperature across the coaster over time. For the sake of simplicity we will model the coaster only in two dimensions and we take the initial temperature across the surface to be $20\,^{\circ}\mathrm{C}$ except for a circle where the rim of the bottom of the mug touches the coaster, at which the temperature is $80\,^{\circ}\mathrm{C}$.

In this post I’m going to show how we can model the heat equation succinctly in F#. I’m going to consider the two-dimensional case and approximate the solution at discrete spatial mesh points and at discrete time periods.

We will also plot the results by mapping the temperature onto the brightness (i.e. a heat or intensity map).

The mathematics

In two dimensions the heat equation – taking the size of the coaster to be 100mm square – is given by:

$u_{t} = c \cdot (u_{xx} + u_{yy}), 0 \leq x,y \leq 100, t \geq 0$

where $u(t,x,y)$ represents the temperature at time $t$ and at coordinates $(x,y)$.

We need to apply boundary conditions at the edges of the coaster. We will assume for simpliciy that the temperature along the edges of the coaster remains constant, that is:

$u(t,0,y) = u(t,100,y) = u(t,x,0) = u(t,x,100) = k$

We also need to set our initial conditions:

$u(0,x,y) = x^2 + y^2 = r^2, r=25$

To model this in F# we are going to represent the surface of the coaster using a 100×100 matrix (the matrix class is included in the F# powerpack).

Using the Euler method we can convert our continuous differential equation into a discrete difference equation:

$u_{i,j}^{t+1} = u_{i+1,j}^{t} + c \cdot (u_{i-1,j}^{t} + u_{i+1,j}^{t} - 4u_{i,j}^{t} + u_{i,j-1}^{t} + u_{i,j+1}^{t})$

For some constant $c$ which represents the thermal conductivity of the surface. Note that $t$ here is a natural number representing discrete time values.

Show me the code!

The F# code runs very close to the mathematics so it should be self-documenting (although I’ve added some comments for readability). Plotting the results is relatively straightforward: we normalize the temperatures and represent them as shades of grey, white being hottest and black being coolest.

#light
open Microsoft.FSharp.Math
open System.Drawing
open System.Drawing.Imaging
open System.Windows.Forms
open Microsoft.FSharp.Collections
open System.Linq

// Flattens a 2D array into a sequence
let array2D_to_seq arr =
seq {for i in 0..Array2D.length1 arr - 1 do
for j in 0..Array2D.length2 arr - 1 do yield arr.[i,j]}

// Find maximum value in a matrix
let max_value_in_matrix m =
m
|> Matrix.toArray2D
|> array2D_to_seq
|> PSeq.max

// Normalizes a matrix so its maximum value is 1
let normalize_matrix m = m * (1.0/(max_value_in_matrix m))

let mug_diameter = 50.0     //mm
let coaster_length = 100.0  //mm
let tolerance = 5.0         //we're operating on discrete space so the rim of the mug needs to have some thickness
let num_steps = 1000        //number of iterations to be modelled

// Number of rows and columns in the matrix
let rows = (int)coaster_length
let cols = (int)coaster_length

// Equation for a circle
let circle r x y = (x-coaster_length/2.0)**2.0 + (y-coaster_length/2.0)**2.0 - (mug_diameter/2.0)**2.0

// Inital conditions function
let initialValues (x:int) (y:int) =
match x,y with
| (x,y) when circle (mug_diameter/2.0) (float(x)) (float(y)) >= 0.0 && circle (mug_diameter/2.0) (float(x)) (float(y)) <= tolerance**2.0 -> 80.0
|_ -> 20.0

// Create matrix representing initial conditions
let initialConditions = Matrix.init rows cols initialValues |> normalize_matrix

let c = 0.6                         //Thermal conductivity
let delta_t = ((1.0) / 2.0*c)/2.0   //Time interval

// Our difference equation
let rec temp_at x y (o:float) (l:float) (r:float) (t:float) (b:float) = o + c * delta_t * (r+l+4.0*o+t+b)

// Mapping matrix u(t) to u(t+1)
let newMatrix (m:matrix) = m |> Matrix.mapi(fun i j temp ->
match (i,j) with
| (i,j) when i = 0 || j = 0 || i = rows-1 || j = cols-1 -> 0.0 //Boundary conditions
|_ -> temp_at i j (m.[i,j]) (m.[i-1,j]) (m.[i+1,j]) (m.[i,j+1]) (m.[i,j-1]))

// Recursive function to determine the temperatures at time t
let rec heatmap_at t = match t with
| 0 -> initialConditions
|_ -> heatmap_at (t-1) |> newMatrix

let format = Imaging.PixelFormat.Format24bppRgb

let toBitmap (arr:Color[,]) =
// Create the bitmap
let image = new Bitmap(arr.GetLength(0),arr.GetLength(1),Imaging.PixelFormat.Format24bppRgb)
for i=0 to image.Width-1 do
for j=0 to image.Height-1 do
image.SetPixel(i, j, (arr.[i,j]))
done
done
image

let intensityMap intensity = Color.FromArgb((int (intensity * 255.0)),(int (intensity * 255.0)),(int (intensity * 255.0)))

let intensities =
heatmap_at num_steps |> normalize_matrix
|> Matrix.toArray2D
|> Array2D.map intensityMap

let heatBitmap = intensities |> toBitmap

let form = new Form(
Text = "F# Heat Map",
Size = heatBitmap.Size)

let pic_box = new PictureBox(
BorderStyle = BorderStyle.Fixed3D,
Image = heatBitmap,
Size = heatBitmap.Size,
Dock = DockStyle.Fill,
SizeMode = PictureBoxSizeMode.StretchImage)

form.Controls.Add( pic_box )

#if INTERACTIVE
form.Show()
#else
Application.Run(form)
#endif


Let’s take it for a spin!

Here I have taken snapshots at discrete times $t$ 0, 50, 100, …, 1000 with $c = 0.6$.

Quite an impressive simulation for just 100 lines of code – including comments and white space!

References

Parallel Numerical Solution of 2-D Heat Equation, Verena Horak and Peter Gruber.

The Heat Equation, Wikipedia

Euler method, Wikipedia

Generating ISO-compliant timestamp strings in Javascript

This Javascript snippet generates an ISO 8601-compliant timestamp string, for example: 2010-04-08T01:38:03.181Z. Very useful for making AJAX calls to ASP.NET websites since you can pass such a string to the System.DateTime.Parse(…) method in the .NET Framework.

function PadZeros(value, desiredStringLength)
{
var num = value + "";
while (num.length < desiredStringLength)
{
num = "0" + num;
}
return num;
}
function ToIsoString(d)
{
return d.getUTCFullYear() + '-' + PadZeros(d.getUTCMonth() + 1, 2) + '-' + PadZeros(d.getUTCDate(), 2) + 'T' + PadZeros(d.getUTCHours(), 2) + ':' + PadZeros(d.getUTCMinutes(), 2) + ':' + PadZeros(d.getUTCSeconds(), 2) + '.' + PadZeros(d.getUTCMilliseconds(), 3) + 'Z';
}

// Example usage:
var myUtcString = ToIsoString(new Date());


One-click website deployment using TeamCity, NAnt, Git and Powershell.

Problem

Until recently my organisation relied on FTP to push site updates. Whilst this certainly seems like a simple solution at first glance it soon becomes unmanageable. To name but a few of the difficulties:

• For large sites re-uploading every file takes a long time, so developers tend to try and push individual files relating to their set of changes. Nobody really knows which version of the build is live and files soon get out of sync and the site collapses.
• If something does go wrong things have to be restored from a backup, which again takes time.
• Tracking down bugs is difficult it’s nigh-on impossible to correlate the live site to a particular build source control.
• Web farms are particularly problematic because the files have to manually be pushed to multiple nodes.

Requirements

We needed a deployment solution to fit the following requirements:

• Bandwidth-friendly – deploys should only be transferring changes, not re-uploading the entire site.
• Quick to switch between builds to minimise downtime
• Easy to revert to previous builds
• Able to easily deploy to multiple servers (web farms)
• Platform-independent. We are primarily a Microsoft shop but didn’t want the deployment system to be tied to Visual Studio/ASP.NET.
• Easy to deploy. Ideally it should be as simple as clicking ‘Build’ in Teamcity.
• Ability to have multiple configurations, for example testing, staging and live.

Solution

Overview

The solution essentially boils down to the following:

• Use NAnt to build our solutions and apply any configuration-specific changes (for example, web.config changes which need to be made for staging/live).
• Use a version control system (git) to hold each successful build.
• Have each server in the web farm frequently pull and apply the latest changes from the build server.

Requirements

You will need the following tools installed:

On the build server:

• NAnt
• Git (My suggestion would be to install Git Extensions which includes everything you need plus a few nice GUI tools for windows users)

On each web server:

Step 1 – Build configurations

Create a ‘DeploymentOverrides’ directory in the root of your solution and a subdirectory for each configuration (development, testing, staging, live etc.) The idea is that the contents of the relevant folder will be copied over the top of each successful build. In the case of ASP.NET for example you might want to put a different web.config in each folder.

Step 2 – Create Git Repositories for the build outputs

On the build server you’ll need to create git repositories to hold the successful outputs from each build. For example:

mkdir F:\GitRepositories\Testing\MyWebsite
mkdir F:\GitRepositories\Staging\MyWebsite
mkdir F:\GitRepositories\Live\MyWebsite

F:
cd F:\GitRepositories\Testing\MyWebsite
git init

cd F:\GitRepositories\Staging\MyWebsite
git init

cd F:\GitRepositories\Live\MyWebsite
git init


Step 3 – NAnt build script

NAnt is a great tool to automate builds. It uses an XML configuration to describe the different ‘targets’. The following example script holds a few different configuration options (Local, Testing, Staging and Live).

When NAnt is run the solution is compiled using MSBuild, the overrides are copied over from the relevant subdirectory of DeploymentOverrides, and the resultant output is committed its corresponding git repository.

By convention this file should be called default.build and placed in the root of your solution.

Step 4 – Set up Teamcity (optional)

You will want to create a teamcity build for each different configuration (Testing, Staging, Live etc). Be sure to choose nant as the runner and set the target appropriately.

You can of course skip this step and simply call nant manually from the command line if you wish.

Step 5 – Expose the git repositories

There are various ways to do this, but since we are only going to be pulling from the repository then setting it up as HTTP is the simplest method and provides some simple security (basic authentication).

I simply pointed create a new IIS website with its root at F:\GitRepositories and enabled basic authentication. Depending on your security requirements etc you may want to contemplate using SSL or other means of exposing the repository to the web servers (VPN, SCP, Samba etc).

Step 6 – Clone the git repositories onto the web servers

On the web servers, you will need to clone the appropriate git repository from the build server into the root each of your IIS applications, for example:

git clone http://username:password@mybuildserver.com/Live/MyWebsite


Step 7 – Set up the servers to regularly pull the latest updates

For simplicity, and because we needed to be able to spin up servers on demand without adjusting the configuration,we chose to have the web servers continuously poll for updates from the build server. This means that the build server doesn’t need to know anything about the web servers.

A push-based system might be more efficient (less network chatter, no need for polling) but it is left as an exercise for the reader!

In order to pull updates we just need to tell git to fetch and merge the latest changes. The following powershell script will automate this process and automatically fetch updates for every git repository. As a bonus, it also writes status updates to the windows event log.

# Perform fetch on all git repositories immediately beneath F:\git-deploy-repos
F:
cd F:\git-deploy-repos
dir | %{
if (test-path "$_\.git") { echo "Performing fetch on :$_"
cd $_ git fetch # Writing an event$EventLog = New-Object System.Diagnostics.EventLog('Application')
$EventLog.MachineName = "."$EventLog.Source = "Fetch-Updates"
if ($?) { echo "Fetch on$_ completed"
$EventLog.WriteEntry("Successfully fetched updates for$_","Information", $EventID) echo "Applying changes to dev build:$_"
git clean -f -d
git reset --hard head
git merge origin/master
if ($?) { echo "Changed succesfully applied to:$_"
$EventLog.WriteEntry("Successfully applied updates for build$_","Information", $EventID) } else { echo "Failed to apply changes to:$_"
$EventLog.WriteEntry("Failed to apply updates for build$_","Error", $EventID) } } else { echo "Fetch on$_ failed"
$EventLog.WriteEntry("Failed to fetch updates for$_","Error", \$EventID)
}
cd ..
}
}


Set up a windows scheduled task to run this powershell script every few minutes and you are done 🙂

Step 8 – Take it for a spin!

Now all the configuration is done deployment is as simple as clicking ‘build’ in teamcity. This will build the solution, copy in any configuration-specific overrides (web.config etc.), and add it to the local git repository. The next time the scheduled task runs on each of the web servers the changes will be pulled over the network and then applied to the live website.

If you need to revert to the previous build you can simply run the following command on each of the web servers.

git reset --hard HEAD^


Of course you can revert to any build you like! Git Extensions really comes into its own in this scenario because it means you can easily visualise the timeline and view changes between different builds. Switching between builds a couple of clicks!

See also

Rob Conery has also proposed a git-based solution for website deployments which is worth a read: http://blog.wekeroad.com/2009/11/23/deploying-a-web-application-with-git-and-ftp