Using Microsoft Reactive Extensions to orchestrate time-bound data retrieval

Microsoft Reactive Extensions (usually referred to simply as Rx) is a library for orchestrating and synchronising asynchronous operations. It’s based on a beautiful mathematical duality between IEnumerable/IEnumerator and their new counterparts (included in .NET 4), IObservable/IObserver. Documentation is unfortunately somewhat scarce and beyond the clichéd dictionary suggest and drag-and-drop examples it’s quite hard to find sample code. As ever though, the best way to learn something is to try and use it to solve a real-world problem.

Essentially you can think observables as push-based collections. Instead of pulling from an enumerable (e.g. with a for-each loop), the data is pushed at you by the observable.

A little background

One of my company’s websites displays statistics on various pages. The queries are dynamic enough and plenty enough that it is not practical to pre-calculate and cache all the results every few minutes, so instead we operate on an 80/20 rule. That is, 80% of our website views occur on 20% of the pages (usually new content on the homepage, or content that is newly linked to from other popular sites). Therefore we cache result of the each database query in memcached for a few minutes, the cache-key being a hashcode of the SQL query (that’s a simplification – we actually serialize the LINQ expression tree, but that’s for another blog post).

Sometimes uncached statistics take a while to retrieve depending database load and latency. Since our primary concerns are total page load time/responsiveness we simply abort the request and hide the statistics from the page if they are not retrieved within a fixed amount of time. The initial implementation of this simply aborted the thread if a certain timeout had elapsed. Unfortunately this solution has a big problem.

The death spiral

The trouble with aborting the thread is that if a database operation times out, the result never makes it into the cache. This means the next time the page is hit another cache miss occurs and the SQL database gets hit again. Since this query is identical to the first it will probably also time out. The database load keeps increasing because it is repeatedly being hit with the same query whilst the result is never cached.

The requirements in brief

The basic logic we need is therefore as follows:

  • Page view generates request for data.
  • Cache is checked for a specific key.
    • On cache timeout/error – cancel operation. Don’t hit SQL because if the cache is down it’s better to display the pages without the statistics and avoid hammering the SQL server.
    • On cache hit – return data.
    • On cache miss – request data from SQL.
      • On SQL timeout – hide the control but continue fetching the data in the background and place it into the cache when it finally returns.
      • On SQL success – return data, enter it into the cache.
      • On SQL error – abort, hide control.

The problem

Trying to write this logic using threading, locks and traditional synchronisation constructs is difficult, bug-prone, and results in horrific spaghetti-code.

Reactive Extensions to the rescue

Reactive Extensions provides us with a much nicer way to deal with these kinds of asynchronous operations.

To keep the example simple, I’ll use a console application instead of a web-app, and simulate the cache and SQL database. I’ll also forget about using the SQL query as the cache key and use an entity id instead. In order to run this example you will need to have the reactive extension assemblies installed which can be downloaded from devlabs.

The example makes use of a number of extension methods provided by Rx:

  • Defer – This defers an operation until an observable is subscribed to.
  • Return – This creates an observable that returns a single result.
  • Timeout – Causes an observable to throw an exception after a specified timeout. Note that although this means the observable is disposed and no further results will be yielded, the operation will continue to run. This is useful when you have side effects that need to occur, in this case, placing the result of long-running SQL query into the cache.
  • Catch – Specifies another observable sequence to continue with when an exception occurs.
  • Take – This is analogous to traditional LINQ. Remember though that unlike First() this does not cause execution of the query and so does not block.

It also makes use of the Subject class. This is a special class that acts as both an observer and an observable. It allows multiple subscriptions to a single stream of events. It may not strictly be necessary in this example but I have found introducing subjects helps to avoid the easy mistake of subscribing twice to an observer and causing two lots of side-effects to occur.

With further ado, the code. You will need to add project references to System.CoreEx and System.Reactive.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace CacheExample
{
    public class CacheMissException : ApplicationException
    {

    }

    // Represents the entity we are trying to retrieve from the cache
    // or database
    public class ResultEntity
    {
        public ResultEntity(string value)
        {
            Value = value;
        }

        public string Value { get; set; }
    }

    public interface IResultRepository
    {
        ResultEntity GetResultById(int id);
    }

    public class DatabaseRepository : IResultRepository
    {
        public ResultEntity GetResultById(int id)
        {
            Console.WriteLine("Retrieving results from database...");

            // Increment the following wait time to simulate a
            //database timeout.
            Thread.Sleep(150);

            // Note that this code is still executed even if the
            // observer is disposed.
            // This, conveniently, allows for "side-effects".
            // In this case we could put the result into the
            //cache so the next user gets a cache hit!
            Console.WriteLine("Retrieved result from database.");
            return new ResultEntity("Database Result");
        }
    }

    public class CacheRepository : IResultRepository
    {
        public ResultEntity GetResultById(int id)
        {
            Console.WriteLine("Retrieving result from cache...");

            //Increment the following value to simulate a cache timeout.
            Thread.Sleep(20);

            //Uncomment the next line to simulate a cache miss
            //throw new CacheMissException();

            Console.WriteLine("Retrieved result from cache!");
            return new ResultEntity("Cached Result");
        }
    }

    class Program
    {
        static readonly IResultRepository cacheRepository =
            new CacheRepository();
        static readonly IResultRepository databaseRepository =
            new DatabaseRepository();

        static void Main(string[] cmdLineParams)
        {
            int id = 123;
            var cacheTimeout = TimeSpan.FromMilliseconds(50);
            var databaseTimeout = TimeSpan.FromMilliseconds(200);

            var cacheObservable = Observable.Defer(
                        ()=>Observable.Return(
                              cacheRepository.GetResultById(id)));
            var databaseObservable = Observable.Defer(
                        ()=>Observable.Return(
                              databaseRepository.GetResultById(id)));

            // Try to retrieve the result from the cache, falling over
            // to the DB in case of cache miss.
            var cacheFailover = (cacheObservable
                .Timeout(cacheTimeout))
                .Catch<ResultEntity, CacheMissException>(
                    (x) =>
                        {
                        Console.WriteLine("Cache miss. Attempting to
                                    retrieve from database.");
                        return databaseObservable
                                 .Timeout(databaseTimeout);
                        }
                )
                .Catch<ResultEntity, TimeoutException>(
                    (x) =>
                    {
                        Console.WriteLine("Time out retrieving result
                                    from cache. Giving up.");
                        return Observable.Empty<ResultEntity>();
                    }
                );

            var result = new Subject<ResultEntity>();
            result.Take(1).Subscribe(
                  x=> Console.WriteLine("SUCCESS: Result: " + x.Value),
                  x=> Console.WriteLine("FAILURE: Exception!"),
                  () => Console.WriteLine("Sequence finished."));

            cacheFailover.Subscribe(result);

            Console.WriteLine("Press any key to exit.");
            Console.ReadKey();
        }
    }
}

I would recommend playing around with the code. Experiment with adjusting the timeouts and uncommenting the lines with notes by them to see what happens in different scenarios. If you haven’t used Rx before wrapping your head around observables can take a while. I would thoroughly recommend taking some time to watch the various channel 9 videos.

Advertisements