Tuesday, March 15, 2011

Asynchrony in C# 5: Dataflow Async Logger Sample

Check out this (very simple) code examples for TPL Dataflow.

Suppose you are developing an Async Logger to register application events to different sinks or log writers.

The logger architecture would be as follow:

image

Note how blocks can be composed to achieved desired behavior. The BufferBlock<T> is the pool of log entries to be process whereas linked ActionBlock<TInput> represent the log writers or sinks.

The previous composition would allows only one ActionBlock to consume entries at a time.

Implementation code would be something similar to (add reference to System.Threading.Tasks.Dataflow.dll in %User Documents%\Microsoft Visual Studio Async CTP\Documentation):

TPL Dataflow Logger
var bufferBlock = new BufferBlock<Tuple<LogLevel, string>>();

ActionBlock<Tuple<LogLevel, string>> infoLogger =
    new ActionBlock<Tuple<LogLevel, string>>(
        e => Console.WriteLine("Info: {0}", e.Item2));

ActionBlock<Tuple<LogLevel, string>> errorLogger =
    new ActionBlock<Tuple<LogLevel, string>>(
        e => Console.WriteLine("Error: {0}", e.Item2));

bufferBlock.LinkTo(infoLogger, e => (e.Item1 & LogLevel.Info) != LogLevel.None);
bufferBlock.LinkTo(errorLogger, e => (e.Item1 & LogLevel.Error) != LogLevel.None);

bufferBlock.Post(new Tuple<LogLevel, string>(LogLevel.Info, "info message"));
bufferBlock.Post(new Tuple<LogLevel, string>(LogLevel.Error, "error message"));

Note the filter applied to each link (in this case, the Logging Level selects the writer used). We can specify message filters using Predicate functions on each link.

Now, the previous sample is useless for a Logger since Logging Level is not exclusive (thus, several writers could be used to process a single message).

Let´s use a Broadcast<T> buffer instead of a BufferBlock<T>.

Broadcast Logger
var bufferBlock = new BroadcastBlock<Tuple<LogLevel, string>>(
    e => new Tuple<LogLevel, string>(e.Item1, e.Item2));

ActionBlock<Tuple<LogLevel, string>> infoLogger =
    new ActionBlock<Tuple<LogLevel, string>>(
        e => Console.WriteLine("Info: {0}", e.Item2));

ActionBlock<Tuple<LogLevel, string>> errorLogger =
    new ActionBlock<Tuple<LogLevel, string>>(
        e => Console.WriteLine("Error: {0}", e.Item2));

ActionBlock<Tuple<LogLevel, string>> allLogger =
    new ActionBlock<Tuple<LogLevel, string>>(
    e => Console.WriteLine("All: {0}", e.Item2));

bufferBlock.LinkTo(infoLogger, e => (e.Item1 & LogLevel.Info) != LogLevel.None);
bufferBlock.LinkTo(errorLogger, e => (e.Item1 & LogLevel.Error) != LogLevel.None);
bufferBlock.LinkTo(allLogger, e => (e.Item1 & LogLevel.All) != LogLevel.None);

bufferBlock.Post(new Tuple<LogLevel, string>(LogLevel.Info, "info message"));
bufferBlock.Post(new Tuple<LogLevel, string>(LogLevel.Error, "error message"));

As this block copies the message to all its outputs, we need to define the copy function in the block constructor. In this case we create a new Tuple, but you can always use the Identity function if passing the same reference to every output.

Try both scenarios and compare the results.

No comments: