MassTransit v3 Update

MassTransit v3 Update

It has been nearly six months since the first alpha of MassTransit v3 was released, and a lot of progress has been made. It turns out that rewriting an entire code base takes time and attention. Nonetheless, the new architecture is working out wonderfully and the code is nicely separated by concerns.

So, about the TPL…

Make no mistake, the TPL (introduced in .NET 4.0), followed by the addition of async/await (in C# 5) has made the creation of asynchronous code clean and concise. That being said, knowing the exact behavior of the language constructs, and how the compiler translates the keywords is very important. Add to the mix the fact that many third-party assemblies were not designed for asynchronous invocation, and the resulting cesspool is quite a mess. However, it’s not that bad.

At every layer, MassTransit has been built around the TPL, leveraging async/await for the best performance, and providing pipe and filter composition at every possible extension point. The middleware injection is extensive, and new filter can be created easily to support many advanced use cases such as rate limiting, concurrency restriction, and asynchronous transactions. More filters will be coming, but an initial release has to happen at some point…

Some New Features

The past six months have not been entirely about stabilization of the code. There have been several tasty new features added as well.

External Message Data Storage

Big messages are inevitable, and big messages can really clog up the works making small message suffer. And there are some brokers that just can’t deal with big messages (cough, Azure Service Bus, cough) at all. To support the transfer of big messages (those messages with large byte arrays or strings), MassTransit now has the ability to send and receive message data outside of the message body.

A couple of standard repositories are available (in-memory, and file system) with more to come, including Azure Blob Storage and perhaps Amazon S3.

_repository = new FileSystemMessageDataRepository(dataDirectory);

When sending a message, during message construction the repository is used to store the message data, which returns an address which is written to the message property.

string data = "Some really long string (or byte array)";
var message = new BigMessage
{
    Body = await _repository.PutString(data)
};
await endpoint.Send(message);

Like all of MassTransit, the repository is async aware, so the Put is awaited. Then, the message is sent and the reference to the message data is saved in the message body. Reading the message data is as easy as decorating the consumer with the message data behavior, and then just using the property directly.

x.ReceiveEndpoint("my_queue", e =>
{
    e.UseMessageData<BigMessage>(_repository);

    e.Handle<BigMessage>(context =>
    {
        string body = await context.Message.Body.Value;
        Console.WriteLine(body);
    });
}

Big message consumers need not be aware of the external storage implementation in use. The consumer only needs to await on the message data property, and the resulting content (either a stream, or the byte[] or string) will be returned asynchronously.

Message Transformation

To support external message data, a mechanism for modifying the properties of a message as it passed through the consume pipeline was required. To that end, MassTransit now has the ability to specify a message transform.

To specify a message transform, add a transform to the receive endpoint configuration.

x.ReceiveEndpoint("my_queue", e =>
{
    e.Transform<A>(t =>
    {
        t.Set(p => p.Second, context => "World");
    });
    e.Handle<A>(context => Console.WriteLine(context.Message.Second));
});

In the example above, the transform is applied to any A message type, and the Second property has the value “World” for any subsequent message filters, including any consumers, handlers, or sagas.

By using the Set method, the original A message is not modified. A new version of the message is created that contains the new property value. This is in contrast to the Replace method, which changes the original message property.

Instead of defining a message transform inline, a separate transform specification class can be created. There are many reasons to do this, including separation of code concerns, etc. but it’s become very useful.

class MessageATransform :
    ConsumeTransformSpecification<A>
{
    public MessageATransform()
    {
        Set(x => x.First, context => "Hello");
        Set(x => x.Second, context => "World");
    }
}

The transform is then applied to the receive endpoint.

x.ReceiveEndpoint("my_queue", x =>
{
    x.UseTransform<A>(x => x.Get<MessageATransform>());        
    x.Handle<A>(context => Console.WriteLine(context.Message.Second));
});

Simplified Saga Repository

To make creating new saga repositories easy, the actual behavior required by a new saga implementation is reduced to two methods. The repository has also been redesigned to support composition and middleware, as well as full async operation, making it a clean and consistent implementation — on par with every other type of message consumer.

There is probably some tuning and adjustments yet to be addressed, but it’s super sweet so far.

What’s Left?

There are a few more things to wrap up before making MassTransit v3 ready for the primary NuGet feed (it’s currently hidden behind the pre-release flag). The exception handling pipeline needs to be well tested and verified, including adding context to the messages in error queues. Really, it’s just a lot of exception and sad-path testing at this point. The majority of the functionality is working very well, including Azure Service Bus.

If you are ambitious and ready to get started with the latest and greatest, I highly recommend pulling down the most recent pre-release packages and taking them for a spin. There are a couple of complete samples that demonstrate how to use MassTransit in a variety of scenarios.

Sample-RequestResponse

A complete request/response example, leveraging the IRequestClient to encapsulate the configuration and endpoint mapping, keeping the requestor code clean and simple. Source Code

Sample-Courier

A complete sample for using Courier, the Routing Slip implementation that is included with MassTransit. Examples of how to create and execute routing slips, as well as track the routing slip events and orchestrate those events using Automatonymous are included, as well as using SQLite as a saga repository for the state machine instances. Source Code

Documentation

Okay, the documentation still needs a lot of work, but it’s coming along. If you’re a great writer, the more help the better on this part.

Stay Tuned

This was meant as an interim update, just to give a status on the development of MassTransit. The initial feedback and encouragement has been great, both on the much simpler API, the overall design of the message pipeline, and the Azure Service Bus support. It’s feedback from developers that helps determine when it is ready for stable release, so test drive the alphas and keep the feedback coming!

MassTransit 3 API Changes

When MassTransit 3 was announced, it was also stated that many of the APIs have changed. This post covers some of the most data types, and describes how they are used as well as how they compare to previous version. In addition, the conceptual changes of MassTransit 3 will also be shared.

The Bus and Receive Endpoints

In MassTransit 3, a bus is a logical construct which connects to one or more hosts via a single message transport, either RabbitMQ or Azure Service Bus. A bus may include multiple receive endpoints in which each receiving endpoint is connected to a separate queue.

Previous versions of MassTransit only supported a single queue per service bus (which was configured using the ReceiveFrom method).

To support the configuration of receive endpoints, as well as advanced configuration specific to each transport, bus creation has changed. The shortest code to create a bus is shown below.

var hostAddress = new Uri("rabbitmq://localhost/test_virtual_host");

IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
    x.Host(hostAddress, h =>
    {
        h.Username("test");
        h.Password("password");
    });
});

Previous versions of MassTransit created a service bus using ServiceBusFactory.New.

The bus created above has no receive endpoints. However, if a request is sent to an endpoint using the bus’s address as the return address, a temporary queue is created to receive the response. The bus’s queue name is assigned by the broker (in the case of RabbitMQ, for Azure the name can be specified or dynamically generated).

HINT: Don’t write a log message to display the bus’s address! Merely accessing the address property results in the creation of the temporary queue! How’s that for an awesome side effect! This should probably change to Task<Uri> GetBusAddress() to signal the impact of using it!

IBus (so long IServiceBus)

In MassTransit 3, IBus refers to a bus instance. A bus includes methods to obtain send endpoints, as well as publish messages.

Previously IServiceBus was used, but there were significant changes to the method signatures with dangerous implications from improper use (many methods now return a Task and execute asynchronously). Therefore, changing the interface to IBus was decided to be the best approach to avoid confusing bugs when upgrading existing services.

public interface IBus
{
    // the address of the bus, which is dynamically created and exclusive
    // to the bus instance
    Uri Address { get; }

    // return a send endpoint for the specified address
    Task<ISendEndpoint> GetSendEndpoint(Uri address);

    // publish a message on the bus, using the publish conventions of the
    // underlying transport (8+ overloads for customizing)
    Task Publish<T>(T message, CancellationToken cancellationToken);
}

Starting the Bus

Once a bus is created, an IBusControl interface is returned. The IBusControl interface includes IBus and adds the Start method.

public interface IBusControl :
    IBus
{
    // start the bus, as well as any configured receive endpoints
    Task<BusHandle> Start(CancellationToken cancellationToken);
}

When the application is ready to start the bus, the Start method should be called. This method returns a BusHandle which should be retained until the bus needs to be stopped.

BusHandle busHandle = await bus.Start(cancellationToken);

// application runs, then it's time to stop the service

await busHandle.Stop();

Consumers

A consumer is a class that handles (or in this case, consumes) one or more message types. When a message is read from a queue, an instance of the consumer is created using the configured consumer factory.

The lifecycle of the consumer is managed entirely by the consumer factory. This gives control over the construction and disposal of the consumer to the consumer factory. There are integrations for most of the dependency injection containers included with MassTransit (StructureMap, Autofac, Unity, Ninject, Castle Windsor).

Consumer Message Handlers

The consumer declares the handled message types via the IConsumer interface. An example consumer of two message types, A and B, is shown below.

class AbConsumer :
    IConsumer<A>,
    IConsumer<B>
{
    public async Task Consume(ConsumeContext<A> context)
    {
    }

    public async Task Consume(ConsumeContext<B> context)
    {
    }
}

Previously, the Consumes<T>.All or Consumes<T>.Context interfaces were used to specify consumers. This was changed to simplify the consumer class definition — the original syntax was clever but not very discoverable.

The Consume method is asynchronous, and returns a Task that is awaited before acknowledging the message. If the consumer runs to the completion (task status of RanToCompletion), the message is acknowledged and removed from the queue. If the consumer faults (such as throwing an exception, resulting in a task status of Faulted or Canceled), the message is nack’d and remains on the queue.

Receive Endpoints

Within a bus, zero or more receiving endpoints can be declared. Each receiving endpoint should have a different queue name, and can also specify the host on which the receive endpoint is to be connected. To configure a receive endpoint for the consumer above, see the example below.

IBusControl bus = Bus.Factory.CreateUsingRabbitMq(x =>
{
    var host = x.Host("rabbitmq://localhost/test_virtual_host", h =>
    {
        h.Username("testuser");
        h.Password("password");
    });

    // declare the receive endpoint on the host
    x.ReceiveEndpoint(host, "consumer_queue", e =>
    {
        // configure the consumer using the default constructor
        // consumer factory.
        e.Consumer<AbConsumer>();
    })
});

Sending a message to the endpoint

To send a message to the receiving endpoint, as a quick example, the bus would be used to get the send endpoint, and then a message would be sent.

var address = "rabbitmq://localhost/test_virtual_host/consumer_queue";
ISendEndpoint consumerEndpoint = await bus.GetSendEndpoint(address);
var message = new A { Value = "Hello, World."};
await consumerEndpoint.Send(message);

The message type, A, is a simple object with properties:

public class A
{
    public string Value { get; set; }
}

Consuming the Message

Inside the consumer, the Consume method handles the message. All messages are delivered within a ConsumeContext<T> that is specific to the message and consumer. For example, if the consumer handled the message and then published an event it would use the context to publish the event.

public async Task Consume(ConsumeContext<A> context)
{
    string value = context.Message.Value;
    await Console.Out.WriteLineAsync(value);

    var thisJustHappend = new AHandled(value);
    // this is async, but read below why we don't have to await it
    context.Publish(thisJustHappened);
}

In the consumer above, the value of the message is written (using the async IO methods) to a file (in this case, just the console — I’m not actually sure if the console is open for async i/o, but roll with it). Then, an event is published using the context. The published event is also just a class.

public class AHandled
{
    public AHandled(string value)
    {
        Value = value;
    }

    public string Value { get; private set;}
}

Now, the publish could have been awaited — which completes once the broker acknowledges that the publish has been written to the queue. However, the context is delegating the Publish call, and therefore is able to capture the Task returned and keep track of it inside the consumer context. The message will not be acknowledged until all pending tasks have completed. So the consumer could publish a dozen events without awaiting each one (which would be silly, honestly – since they’re all async) and the framework will handle awaiting until all of the messages have been published and then acknowledge the message being consumed.

Sending endpoints work the same way.

public async Task Consume(ConsumeContext<A> context)
{
    var sendEndpoint = await context.GetSendEndpoint(_serviceAddress);
    sendEndpoint.Send(someCommand);
}

In the above example, the Task returned from sendEndpoint.Send is captured by the consume context (by inserting an intercepter in front of the ISendEndpoint interface) and awaited by the consumer before acknowledging the message.

More to Come

This is just a short introduction to some of the API changes, to make it easy to migrate applications to the new version.

It’s important to remember that if JSON or XML serialization is being used, there is complete interoperability between services using MassTransit 2.x and MassTransit 3. So there is no need to update every service simultaneously — services can be updated as needed.

There will be more content, as well as updated documentation, as MassTransit becomes ready for general use. Until then, enjoy the alpha bits and share your feedback on the changes!

MassTransit ]|[

This article describes an upcoming release of MassTransit. This release is still under development and will be pushed in various pre-release forms before the final release is finished. Early feedback is valuable, and you are encouraged to check out the early bits, but recognize that they are just that – early bits. There will be bugs and features that are not yet complete.

Background

MassTransit was started in 2007 after a very exciting ALT.NET meeting in Austin, TX. I met a lot of great, smart people with a lot of great ideas on how to make the .NET community a better place for software developers. It was shortly after that Dru Sellers and I started working on a messaging library for use with MSMQ to solve problems within our respective jobs. I had actually been using MSMQ since the late 90’s from C++, but was starting to make moves into C# and needed a managed solution.

Originally NServiceBus was considered, but while it was open source, it was not accepting contributions at the time and was specific to less than stellar projects like Spring.NET. Having reviewed the source code, Dru and I came to the conclusion that we could start a new project to address our requirements using frameworks with which we were both more familiar. Since that time, NServiceBus has seen tremendous growth and is now a commercial product from Particular Software (started by Udi Dahan).

In the past seven years, MassTransit has evolved as requirements for messaging in distributed systems changed. What started as an abstraction layer on top of MSMQ become a comprehensive platform for creating distributed applications, including support for sagas (okay, we can call them process monitors if you prefer), message scheduling (via Quartz), and a powerful routing slip implementation that has been used as the underpinnings of a modern scaled distributed interoperability platform. Each of these features is great standalone, but when combined they make possible some really incredible system design options that are reliable, scalable, and supportable.

In with the New

There have been many changes in the .NET framework since version 2.0 was released. The runtime and language (C#) have both evolved, adding powerful new features. The biggest changes appeared in .NET 4 with the introduction of the TPL. The new threading support has made asynchronous programming easier to understand and has greater performance with lower overhead than previous operating system threading models.

In addition, the big puffy cloud has become mainstream and many applications are being built cloud-first, leading to the use of push messaging systems. When support for RabbitMQ was added, it took a while to get it right and get it cloud-friendly. The first incarnation followed the same pull model that was used for MSMQ, which led to expensive running services with the cloud’s pay-as-you-go model. Refining the transport to support push via AMQP brought the request charges down significantly, but started to identify limitations in the underlying transport model around which MassTransit was built.

Along with the cloud came Azure and the Azure Service Bus. For many .NET developers, it’s the easiest option for systems being deployed to the cloud. Cloud services are cost effective ways to host applications, but having to run a RabbitMQ cluster in IaaS can be expensive. Support for Azure Service Bus has been limited to an external library due to the difference in messaging topology, particular the topic/queue structure that’s provided on top of SQL service broker. There is also the on-premise Service Bus option, although why somebody would run that over RabbitMQ eludes me.

RabbitMQ also got a lot more awesome, with many improvements in broker-to-client communcation, including notifications to clients as the topology of cluster nodes changes and messages flow through the broker. The client library for .NET has gone from a limited, basic functionality library to include extensive support for the new features. This again identified some seams in MassTransit that were just not clean and made it difficult to take advantage of the new features.

The term CQRS, or Command Query Responsibility Segregation, became a lot more normal, and the separation of reads from writes became obvious as more functional programming styles became commonplace. This is also true in messaging systems. For example, in RabbitMQ you send messages to exchanges and read messages from queues. Two completely separate constructs tuned to each side of the message conversation.

So What’s New With MassTransit 3

MassTransit v3 is a modern, enterprise-grade framework for creating reliable, scalable, and supportable distributed applications. Version 3 leverages the latest .NET framework features and supports current message brokers including RabbitMQ and Azure Service Bus (additional support for Event Hubs, as well as on-premise Service Bus will be available as well).

Each message broker will be fully supported at the API level, allowing the distinct advantages of each broker to be utilized. The configuration API for endpoints and consumers is specialized for each transport, while consumers remain agnostic to the transport implementation details. This retains the ability to transplant consumers from one messaging system to another without a rewrite yet take advantage of transport specific features. Message consumers have access to transport-specific parameters and settings as well, in case more specific per-message controls are needed.

Most modern frameworks provide middleware extension points, and MassTransit 3 is no exception. Full end-to-end middleware injection points, from the transport, post serialization, to handlers and consumers and everything in between, make MassTransit 3 the most customizable and extensible messaging framework. Every middleware component is asynchronous, ensuring maximum thread utilization, as well as scope owning, allowing full in/out processing and exception handling. All of this without any overhead that impacts message processing throughput. Traditional observation-style notification points are also available, including pre/post/fault consumer message handling, as well as pre/post/fault send message handling, either by a specific inbound message type or consumer type, outbound message type or endpoint address, or generic inbound and outbound message paths.

A new MassTransit Host, which was part of the original releases, but was extracted to become Topshelf, is now included. And with it comes easy “create an assembly and hit F5” approach to building autonomous services with MassTransit. All of the setup, including separate receive endpoints for each consumer, is handled by convention to make getting started easier than ever. And those services are production ready with full logging support.

Courier, the routing slip implementation for MassTransit, as well as Quartz integration are fully optimized for MassTransit 3. Both scheduling and courier are built into MassTransit, along with full container integration.

Out with the Old

Seven years of evolution in a framework means a lot of old code, a lot of which is old solving problems. To keep MassTransit 3 clean and tight, some things had to go.

MSMQ is no longer supported. There was extensive code in MassTransit supporting features that were not supported by MSMQ out of the box. Modern message brokers have full support for topic- and/or exchange-based routing, which can be complex and error prone. Rather than try to keep MSMQ alive, everything related to supporting publish/subscribe on MSMQ is gone, deleted, left in the archives of Git history. While a send/recieve only transport for MSMQ may be added at some future point, developers can continue to use the 2.x versions for MSMQ applications.

Magnum, is no longer a reference. Neither is Stact. Both of these poorly maintained libraries have been removed from MassTransit. A lot of the features in those libraries were there to compensate for limited .NET framework features. With .NET 4.5, the need for those extensions is no longer required. In fact, there are many code design changes that make generic support easier and more understandable, eliminating the dynamic code generation that was so heavily leveraged in previous version of MassTransit. This should lead to faster code execution and reduced process startup time.

Of course, with no Magnum, there is no Magnum state machine. For applications built today, Automatonymous (docs) should be used anyway. It’s a better, more flexible, and more extensible state machine than Magnum implementation.

A lot of legacy interfaces that were never suggested to be used, as well as tiny little features that were difficult to support and not used, are gone. Some interfaces may remain to make it easier to port v2 applications to v3, but those will be for compatibility only (and, in fact, packaged in a separate MassTransit.Compatibility assembly).

Timelines

The first early bits of MassTransit 3 should be available on NuGet soon. This includes support for Azure Service Bus and RabbitMQ. I’ll follow up the bits with some articles on how to get started with the new configuration API, and how to get your first message producers and consumers communicating. Details on the default conventions for RabbitMQ and Azure Service Bus will also be published (RabbitMQ is unchanged and 100% compatible with v2.x services).

Automatonymous, Courier, and Quartz integration will also have pre-release versions available that are compatible with the new MassTransit 3 bits.

In Closing

Many more developers are being pushed into building decoupled message-based applications, and those developers need a clean and modern API that aligns closely to the .NET 4.5 framework. This is even more important as we move into vNext, with .NET now being open source. A fully vNext compatible version of MassTransit is definitely coming as the vNext release approaches. MassTransit 3 is being built to target those developers, as well as those who are maxing out with MassTransit v2.x and need better asynchronous support and extensibility for their applications.

Watch for the early bits announcement soon!

The Fooidity Switchyard

Yesterday, I announced Fooidity, a new open source project that simplifies separating code deployments from code release by allowing code paths to be switched dynamically without restarting the application. In continuing with the roll out, today I’m announcing a new property setup to help developers use Foodity right away.

Fooidity Switchyard

The Fooidity Switchyard is a cloud service hosted in Azure that maintains the state of code features. Developers can sign into the site using their GitHub credentails, and then create organizations and applications which can be switched.

Note that the site is in an early state and no is not in a guaranteed state of uptime at this point. It’s close, and once the SSL and everything are setup, as well as redundant cloud service instances, it should be ready for general production use.

Getting Started

To see how features can be toggled in an application, using Visual Studio to create a new console application. Once created, use NuGet to add the following packages: Topshelf and Fooidity.Client.Autofac. The new service will be using an Autofac container.

Once the project is created, open the Program.cs file and paste the contents of the gist: https://gist.github.com/phatboyg/dceaf56120f5382cdfa7

Creating an Organization

The Fooidity Client is a set of components that use SignalR to communicate the Fooidity Switchyard. The Autofac integration makes it easy to configure the client using idioms close to the Autofac container.

Visit http://api.fooidity.com/ and create an account (you’ll need to enter your email address, but GitHub authentication is used). Then, create an organization on the Organizations tab:

Create Organization

Once an organization is created, create an Application:

Create Application

Enter a name for the application and select an organization:

Create Application Form

Once created, create an application key:

 

Create Application Key

Then copy the application key and put it into the ApplicationKey configuration section of the container setup.

Compileand run the application, and it should start writing False to the console. Refresh the application in the web site and use the Enable/Disable links to change the state of the feature.

Toggle Feature

As the feature is toggled, the console output changes to reflect the state of the feature.

More To Come

Above shows how you can easily integration feature toggles into your application, and modify them at run-time. More samples are coming, so stay tuned!

You an also checkout the source code at https://github.com/phatboyg/Fooidity, which includes the Switchyard.

 

Announcing Fooidity

Background

To meet ever increasing customer demands, development teams are now working concurrently adding value to their applications. This can be difficult in a software-as-a-service (SaaS) environment, where a single codebase is continuously delivered to production. When multiple teams are working independently on the same code base, synchronizing feature completion and testing can be difficult, resulting in idle teams waiting on other teams to finish their features. In a large code base, having long-lived branches in source control can result in difficult merges, so it’s often necessary to have multiple teams actively committing incomplete or partially tested to the same development branch.

If you have been paying attention to what is hot and happening in application development, you may have heard about feature toggles. A feature toggle controls what features are available in an application. There are several implementations already available on GitHub (such as nToggle and NFeature, however, an unfortunate thing seen over and over is the preponderance of the if keyword. These conditional code blocks clutter the code and result in nested code blocks that would preferably be avoided.

Introducting Fooidity

Fooidity is a new open source project that simplifies separating code deployments from code release by allowing code paths to be switched dynamically without restarting the application. By using switches tied to features, new code can be delivered to production without changing the existing behavior. Once the development team is ready to delivery the feature to one or more customers, the feature can be enabled, allowing the switches to flow into the new behavior.

Using Fooidity

To use Fooidity, using NuGet to add the Fooidity package to your .NET project. Fooidity requires .NET 4.5.

Code Features

In Fooidity, a code feature represents code that is new to the application. This can be a new feature under development, an enhancement to an existing feature, or even a simple bug fix. All code that changes the behavior of an application currently in use by customers is a code feature. The CodeFeature interface is used to denote a type that identifies the code feature itself, declaring using a .NET struct.

public struct Feature_NewScreen :
    CodeFeature
{
}

Because C# is a statically typed language, Fooidity embraces the type system by allowing code features to be defined using types. Rather than using strings, which make it hard to track down usage, or enums, which result in a single type in which every feature must be defined, using a type makes it easy for developers to find usages in code with a simple key combination in Visual Studio.

Code Switches

To selectively execute code based on a feature’s state, a switch is used. There are several switches available in Fooidity which can be used depending upon the requirements of the application.

In most situations, releasing a new feature to a subset of customers prior to making the feature available to all users is preferred. Not only does this limit the impact to customers in cases where the new behavior has unexpected side effects, but it allows the development team to more closely monitor the new behavior as it is enabled.

Fooidity has the concept of a contextual switch which allows a feature to be enabled for a subset of customers, users, etc. For instance, the userId could be used to enable a feature for a specific user. A customerId might be used to enable a feature for one or more customers.

To define a context, the developer creates a context type:

public interface ICustomerContext
{
    Guid CustomerId { get; }
}

A provider is then needed to obtain the context:

public class CustomerContextProvider :
    IContextProvider<ICustomerContext>
{
}

The context is then converted to a string key that can be used to identify the context in the feature state:

public class CustomerContextKeyProvider :
    IContextKeyProvider<ICustomerContext>
{
    public string GetKey(ICustomerContext context)
    {
        return context.CustomerId.ToString();
    }
}

The switch uses the context for the request, be it a web request, message consumer in MassTransit, or any other type of scoped application method to determine if it is enabled for the context.

Dependency Injection Container Support

As a long time user of a dependency injection container in application architectures, having a feature toggle implementation with strong container support is a must. The ability to dynamically select multiple implementations from a container, including the use of context to aid in implementation selection, makes it easy to control the activation of new application behavior.

To this end, the real power of Fooidity comes into play when used with a container, such as Autofac, StructureMap, or Castle Windsor. Using a container with Fooidity makes it easy to support many advanced scenarios, such as switch evaluation tracking, switch state caching, and conditional resolution of class dependencies based on feature state.

For instance, two versions of an interface implementation can be created, and the container can resolve either the new or old version depending upon the state of a feature:

builder.RegisterSwitchedType<BugFix1337, IReader, FileReaderV2, FileReader>();

If the BugFix1337 feature is enabled, the FileReaderV2 class will be resolved from the container. If disabled, the original FileReader will be resolved instead. Both classes implement the IReader interface, making the switch transparent. This allows the deployment of the bug fix without changing the behavior of the application until the feature is enabled. The code can be deployed at any time, without requiring the development team to monitor post deployment to ensure the bug fix is working. The team can enable the feature when they are ready to monitor the fix, to ensure the new behavior is as expected.

More To Come

The announcement of Fooidity is just the first in a series of features that are coming very soon. One feature of note is an entire management portal hosted in Azure that can be used by any developer to track and manage their feature states. A client assembly, hosted in the developer’s application, connects to the service (using SignalR) and obtains the feature state, including real-time updates which are pushed directly to the client. This will make it super easy to get started.

The NuGet packages are already available (search for Fooidity), and the entire source code is available on GitHub.

Separating Concerns – Part 2: Services

In the previous article on Separation of Concerns, libraries were explained as a way to decompose an application into separate sets of functions, resulting in code that is easier to maintain and has higher cohesion. This article continues the subject, explaining how applications can benefit from using services and what differentiates them from libraries.

Services

A service is a set of related functions usable by multiple applications and accessible through one or more public interfaces. What differentiates a service from a library is the way an application uses it. In the case of a library, functions run in the execution context of the calling application, inheriting the application’s environment, which includes the process, thread, and user context. A service, however, runs in its own execution context and has its own set of policies that govern how it can be used.

Due to framework limitations and/or the complexity of asynchronous programming, many applications invoke services synchronously — leaving the developer with the impression that the service has “taken control” of the execution context. In reality, the calling application is only blocked while waiting for the service to respond. And since the application is blocked, it behaves like a library function call, at least from the perspective of the debugger. Service invocation, however, invokes a complex exchange of information between the calling application’s process and the service’s process. This exchange is even further complicated by the fact that the application and service processes may exist on separate machines, be written using different languages, and be running on different operating systems. Also, it’s important to recognize that at any time during service invocation, the operating system, the process, the service, or the network can (and eventually will) fail.

Service Interface

The blocking scenario above pertains to applications that invoke services using a remote procedure call (or, RPC) form of inter-process communication. Examples include a web browser using a REST implementation over HTTP, a mobile client using a SOAP implementation (also over HTTP), and an intranet application using a binary implementation over TCP. A single service can support one or more of these implementations simultaneously, making the service available to a broader range of applications.

Another form of inter-process communication is message passing, where the application sends a message to the service. Message passing is inherently asynchronous, the application can send the message and continue processing without waiting for the service to complete. There are many advantages to using asynchronous message passing instead of synchronous RPC, but an asynchronous programming model is also more complex for developers. Messages can also be passed using a durable message store, making it easier to recover from failure without losing service requests and/or commands. Message passing also eliminates temporal coupling, preventing the application from being dependent upon the availability of the service.

And the advantage is… what?

Given the complexity of a service, particularly in comparison to a library, why would anyone want to deal with the complexity of a service?

Quite simply, with a service, the implementation details of the public interface are encapsulated within the service itself and do not become dependencies of the calling application. And since the application is only calling the public interface, the only dependency added to the application is the service interface. The application does not inherit the dependencies of the service, as those dependencies are private implementation details. For this reason alone, when a function has a dependency on another service or when a function depends upon dynamic data, it is better to create a service, encapsulating those dependencies and enabling the service to be managed separately from the applications using it.

For example, a domain name validation function requires a list of valid domain names. However, the current list of valid domain names is constantly changing. If domain name validation was implemented as a library, the application must also be responsible for maintaining the list of valid domain names. Rather than adding these additional requirements to the application, a service is used to valid the domain name instead.

So, the advantage of encapsulating dependencies while retaining the ability to reuse functionality is the key benefit of a service. The benefits of a library are also benefits of a service, including high cohesion, as long as the service focuses on a single concern or responsibility.

In the next installment, frameworks will be explained.

CRUD is Not a Service

Introduction

Many systems implement CRUD (create, read, update, and delete) using a repository pattern. An entity is loaded using a Get method, some business layer logic makes changes to the entity, and ultimately saved using a Put method. This exact pattern is replicated with as many names as there are examples, including EntityManager, EntityDataContext, EntityStorage, etc. In fact, the pattern itself has been completely generalized by libraries such as NHibernate, which provides an ISession interface for performing simple CRUD operations (yes, there are many additional advanced features that make NHibernate much more useful than just a simple CRUD library, but that’s not the point).

A significant weakness of the repository pattern is ensuring that an entity’s state is valid. When an entity is saved, it is important that the contents are valid. If the contents are invalid, such as the order total not equaling the sum of the order items, the resulting inconsistency can spread throughout the application causing additional, and perhaps greater inconsistencies.

Most of the time, developers using the repository pattern define classes for entities with properties for the attributes of the entity. And in most cases, the properties allow both reads and writes – making it possible for entity to become invalid. The repository pattern also does not allow intent to be either expressed or captured as changes are made.

In order to properly validate an entity, the validation logic may need access to additional resources. For example, adding an activity to an itinerary may need to verify that there are seats available for a dining activity. Beyond simple validation, adding the activity to an itinerary may need to allocate an available seat to the itinerary’s owner. Subsequently, removing the activity would require that the previously allocated seat be released so that is available to others.

As systems evolve, this type of behavior gets added to the repository class, checking for property changes in the Put method and invoking services when changes are found. The more property changes that require external systems to be notified, the more complex the Put method becomes resulting in a greater number of dependencies in the repository class.

Don’t even ask what happens when the entity is saved and the property changes, having invoked external services are not persisted due to a transaction timeout or deadlock in the storage system. And don’t simply suggest that invoking the services after the save is complete is the right answer, because then what happens when the services cannot be invoked due to a network or availability issue.

Command Query Separation

Command Query Separation (or CQS) is a pattern that separates commands (typically write operations) from queries (including reads and operations that do not change data). By separating the concerns of reading and writing, it is possible to tune a system to meet specific scaling requirements without generalizing for both operations.

The following provides an example of how CQS can be used to implement CRUD operations. With each operation, an approach is presented that allows for separation of concerns as well as an implementation that can scale reads and writes separately.

Create

Consider, for example, a dining reservation system for a local restaurant. The Reservation service exposes an API to schedule a reservation where the client specifies the date, number of guests, and the desired seating time. When called, the service checks availability and either adds the reservation to the calendar, or fails and notifies the caller that the requested time is not available. If the reservation is added, a table is reserved for the guests and the table is removed from the available seating. Once all available seating is reserved, the service will no longer accept reservations.

The scheduling API above is an example of a command. A command tells a service to perform an operation. The service is responsible for implementing the command’s behavior, and is also the ultimate authority as to whether the command can be completed.

From the perspective of the command’s initiator, the contract is well defined. Submit the required arguments (date, time, and the number of guests), and observe one of the two possible outcomes (scheduled, or unavailable). As long as there are available seats at the requested time, the reservation should succeed. If the command fails due to lack of availability, the initiator can choose to adjust the arguments (such as requesting a later time, or selecting a different date) and resubmit the command, or it can decide instead to try another time to check if an opening becomes available.

Read

In order to give the initiator a chance of successfully scheduling a reservation, it’s important that the reservation systems constraints are available so that initiators are able to submit reservations that will be accepted. This can be done many ways, but one way would be to expose the availability through a separate service.

For example, an application may display the restaurant’s availability to the user so that the user can select a time. At the minimum, having access to the restaurant’s days and hours of operation would allow the user to know when the restaurant is open. However, the restaurant may only take reservations in the evening and on weekends. To convey this information to the application and the user, the availability service may supply more detailed availability including ranges of time and the seating availability for each range.

The additional information provided by the availability service enables the application to determine in advance if a reservation will be accepted. If there is no seating available at a particular date and time, the application can assume that submitting a reservation for the date and time will fail. The application is not prevented from submitting the reservation, but it is likely that the reservation will fail.

Update

Plans change, and likewise the reservation service need to be able to make changes to reservations. Depending upon the type of change, however, the service needs to follow different behaviors.

For example, if the reservation time changes, the service would need to determine if there was sufficient capacity at the new time for the reservation. On the other hand, if the number of guests increased, the service would need to ensure there was either sufficient seating at the already assigned table or if a larger table was available at the same time. A simple change, such as updating the name on the reservation, might not require any checks at all – unless the new name is identified as a VIP, in which case a check for upgraded tables or perhaps special menu selections would be performed to ensure that the VIP is treated to the best possible service.

As the above examples clearly show, an update is not just a save operation. An update is a general term given to a wide variety of changes that may be applied to a reservation. Since the behavior of each change is different, each change should also be handled differently. A command should be created to define the contract for each change, and each command should be explicitly named to describe the change (UpdateReservationName, ChangeReservationGuests, ChangeReservationTime).

While the update has changed from a single “write my object” operation to three separate commands, it is now easier to reason about the behavior of each command. If a new reservation time is requested, the initiator could check the published availability information and predetermine if the time slot is available. The initiator is not prevented from sending the command based on this information, but the likelihood of success is greater.

Aggregate Roots and Scoping

An aggregate root is a form of transactional boundary (defined in Domain Driven Design by Eric Evans) which defines the scope of an operation and its related data). For example, if the reservation service managed a list of guests with each reservation, the reservation would be the aggregate root and the list of guests would be contained within the reservation. This means that the addition or removal of a guest would be performed by or with the aggregate root. In practice, such as with a relational database, adding a guest to the reservation would not involve simply inserting into a ReservationGuest table, but actually loading the reservation and adding a guest. The reservation is the root entity, and the guests are a related or child entity.

The reason for this is that a reservation should be treated as a whole and not a set of related entities. If the system has a rule that a reservation cannot exceed eight guests, and guests are arbitrarily added outside of the reservation, the logic to validate the number of guests ends up in multiple places (just read this as cut-n-paste, which makes it quite obvious why it is a bad thing). Keeping the validation logic as part of the reservation makes the rules easier to discover and understand compared to having validation logic spread across the service.

Delete

Continuing with the example, it’s likely that a guest may cancel a reservation. Plans change, and the service should be able to remove a reservation that is no longer required.

To support canceling a reservation, the service may provide an additional API to cancel a reservation using the reservation number. When the command is received, the service would look up the reservation. If the reservation exists, the reservation would be marked as canceled and removed from the schedule – making the table available for scheduling by other patrons. If the reservation does not exist, the command would fail but the failure does not have any other effects. If the reservation existed but was already canceled, the command could be acknowledged as already canceled (there is no need to cancel a canceled reservation, but not failing ensures that the command is processed idempotently).

The fact that the reservation existed does not change, so it is important that the history of the reservation is retained. While the service could simply delete the reservation from storage, the stakeholders may want to keep a history of reservations for future use, such as marketing or promotional events, or to follow up to solicit feedback as to why the reservation was canceled.

Auditing

When a command is executed, such as adding an activity to an itinerary, it is important to retain an audit trail of changes. This audit trail is important in case the contents of the itinerary are disputed. For example, a customer may argue that they did not add a particular activity or that an activity is missing. Without an audit trail, it would be impossible to determine the contents of an itinerary at a previous point in time or who made any changes to the itinerary.

Retaining a history of commands executed on the itinerary along with preventing itinerary changes outside of the available commands can provide a reliable audit trail should the need arise. Additionally, ensuring that each command includes the user who initiated the command along with timestamps indicating when the command was initiated and executed can provide a chronological view of the changes made to the entity.

To summarize a statement commonly made by Greg Young, “So you have an audit trail, how do you know it’s right?”

By retaining every successful command, it is possible to rebuild the state of a reservation. In fact, in an event-sourced model, the actual commands are used to determine the current state. There are use cases for each approach, so if you have a highly event-based model, event sourcing may be worth consideration.

Bonus: Transferring Data Between Systems

In many organizations, separate test and production systems are used so that integrators and developers can test software or configuration changes prior to deploying them on production. For example, an integrator may configure a new customer on the test system prior to moving that configuration into production. More often than not, this transfer is performed using simple CRUD operations – typically behind the facade of an “import/export” link.

A disadvantage of using bulk CRUD operations when transferring configuration between systems is that the system itself is not a participant in the data import process.

Using Commands to Transfer Data

Rather than transfer data at the entity level, the data in the source system should be used to generate a sequence of commands that can be executed on the target system. Those commands could include references to the original commands executed on the source system, along with the time those commands were originally executed and the initiating user details. Retaining this information may be crucial as changes are deployed, ensuring that the user performing the transfer is not made responsible for the actual changes performed by another user.

Conclusion

The use of commands to perform the creation, updating, and deleting of data has clear advantages over simple data access layer operations. Change tracking, auditing, and validation are critical to ensure that data is valid. As with most technical choices, whether or not this level of validation is required depends upon your requirements. in my experience, more often than not, the level of detail is required as auditing and change tracking eventually makes its way into the backlog.

Implementing Routing Slip with MassTransit

This article introduces MassTransit.Courier, a new project that implements the routing slip pattern on top of MassTransit, a free, open-source, and lightweight message bus for the .NET platform.

Introduction

When sagas were originally conceived in MassTransit, they were inspired by an excerpt from Chapter 5 in the book SOA Patterns by Arnon Rotem-Gal Oz. Over the past few months, the community has argued discussed how the use of the word saga has led to confusion and how early implementations included in both NServiceBus and MassTransit do not actually align with the original paper published in 1987 by Princeton University and written by Hector Garcia-Molina and Kenneth Salem in which the term was coined.

With MassTransit Courier, the intent is to provide a mechanism for creating and executing distributed transactions with fault compensation that can be used alongside the existing MassTransit sagas for monitoring and recovery.

Background

Over the past few years building distributed systems using MassTransit, a pattern I consistently see repeated is the orchestration of multiple services into a single business transaction. Using the existing MassTransit saga support to manage the state of the transaction, the actual processing steps are created as autonomous services that are invoked by the saga using command messages. Command completion is observed using an event or response message by the saga, at which point the next processing step is invoked. When the saga has invoked the final service the business transaction is complete.

As the processing required within a business transaction changes with evolving business requirements, a new version of the saga is required that includes the newly created processing steps. Knowledge of the new services becomes part of the saga, as well as the logic to identify which services need to be invoked for each transaction. The saga becomes rich with knowledge, and with great knowledge comes great responsibility (after all, knowledge is power right?). Now, instead of only orchestrating the transaction, the saga is responsible for identifying which services to invoke based on the content of the transaction. Another concern was the level of database contention on the saga tables. With every service invocation being initiated by the saga, combined with the saga observing service events and responses, the saga tables gets very busy.

Beyond the complexity of increasing saga responsibilities, more recently the business has requested the ability to selectively route a message through a series of services based on the content of the message. In addition to being able to dynamically route messages, the business needs to allow new services to be created and added to the inventory of available services. And this should be possible without modifying a central control point that dispatches messages to each service.

Like most things in computer science, this problem has already been solved.

The Routing Slip Pattern

A routing slip specifies a sequence of processing steps for a message. As each processing step completes, the routing slip is forwarded to the next step. When all the processing steps have completed, the routing slip is complete.

A key advantage to using a routing slip is it allows the processing steps to vary for each message. Depending upon the content of the message, the routing slip creator can selectively add processing steps to the routing slip. This dynamic behavior is in contrast to a more explicit behavior defined by a state machine or sequential workflow that is statically defined (either through the use of code, a DSL, or something like Windows Workflow).

MassTransit Courier

MassTransit Courier is a framework that implements the routing slip pattern. Leveraging a durable messaging transport and the advanced saga features of MassTransit, MassTransit Courier provides a powerful set of components to simplify the use of routing slips in distributed applications. Combining the routing slip pattern with a state machine such as Automatonymous results in a reliable, recoverable, and supportable approach for coordinating and monitoring message processing across multiple services.

In addition to the basic routing slip pattern, MassTransit Courier also supports compensations which allow processing steps to store process-related data so that reversible operations can be undone, using either a traditional rollback mechanism or by applying an offsetting operation. For example, a processing step that holds a seat for a patron could release the held seat when compensated.

MassTransit Courier is free software and is covered by the same open source license as MassTransit (Apache 2.0). You can install MassTransit.Courier into your existing solution using NuGet.

Activities

In MassTransit Courier, an Activity refers to a processing step that can be added to a routing slip. To create an activity, create a class that implements the Activity interface.

public class DownloadImageActivity :
    Activity<DownloadImageArguments, DownloadImageLog>
{
}

The Activity interface is generic with two arguments. The first argument specifies the activity’s input type and the second argument specifies the activity’s log type. In the example shown above, DownloadImageArguments is the input type and DownloadImageLog is the log type. Both arguments must be interface types so that the implementations can be dynamically created.

Implementing an Activity

An activity must implement two interface methods, Execute and Compensate. The Execute method is called while the routing slip is executing activities and the Compensate method is called when a routing slip faults and needs to be compensated.

When the Execute method is called, an execution argument is passed containing the activity arguments, the routing slip TrackingNumber, and methods to mark the activity as completed or faulted. The actual routing slip message, as well as any details of the underlying infrastructure, are excluded from the execution argument to prevent coupling between the activity and the implementation. An example Execute method is shown below.

ExecutionResult Execute(Execution<DownloadImageArguments> execution)
{
    DownloadImageArguments args = execution.Arguments;
    string imageSavePath = Path.Combine(args.WorkPath, 
        execution.TrackingNumber.ToString());

    _httpClient.GetAndSave(args.ImageUri, imageSavePath);

    return execution.Completed(new DownloadImageLogImpl(imageSavePath));
}

Once activity processing is complete, the activity returns an ExecutionResult to the host. If the activity executes successfully, the activity can elect to store compensation data in an activity log which is passed to the Completed method on the execution argument. If the activity chooses not to store any compensation data, the activity log argument is not required. In addition to compensation data, the activity can add or modify variables stored in the routing slip for use by subsequent activities.

In the example above, the activity creates an instance of a private class that implements the DownloadImageLog interface and stores the log information in the object properties. The object is then passed to the Completed method for storage in the routing slip before sending the routing slip to the next activity.

When an activity fails, the Compensate method is called for previously executed activities in the routing slip that stored compensation data. If an activity does not store any compensation data, the Compensate method is never called. The compensation method for the example above is shown below.

CompensationResult Compensate(Compensation<DownloadImageLog> compensation)
{
    DownloadImageLog log = compensation.Log;
    File.Delete(log.ImageSavePath);

    return compensation.Compensated();
}

Using the activity log data, the activity compensates by removing the downloaded image from the work directory. Once the activity has compensated the previous execution, it returns a CompensationResult by calling the Compensated method. If the compensating actions could not be performed (either via logic or an exception) and the inability to compensate results in a failure state, the Failed method can be used instead, optionally specifying an Exception.

Building a Routing Slip

Developers are discouraged from directly implementing the RoutingSlip message type and should instead use a RoutingSlipBuilder to create a routing slip. The RoutingSlipBuilder encapsulates the creation of the routing slip and includes methods to add activities, activity logs, and variables to the routing slip. For example, to create a routing slip with two activities and an additional variable, a developer would write:

var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity(“DownloadImage”, “rabbitmq://localhost/execute_downloadimage”, new
    {
        ImageUri = new Uri(“http://images.google.com/someImage.jpg”)
    });
builder.AddActivity(“FilterImage”, “rabbitmq://localhost/execute_filterimage”);
builder.AddVariable(“WorkPath”, “\\dfs\work”);

var routingSlip = builder.Build();

Each activity requires a name for display purposes and a URI specifying the execution address. The execution address is where the routing slip should be sent to execute the activity. For each activity, arguments can be specified that are stored and presented to the activity via the activity arguments interface type specify by the first argument of the Activity interface. The activities added to the routing slip are combined into an Itinerary, which is the list of activities to be executed, and stored in the routing slip.

Managing the inventory of available activities, as well as their names and execution addresses, is the responsibility of the application and is not part of the MassTransit Courier. Since activities are application specific, and the business logic to determine which activities to execute and in what order is part of the application domain, the details are left to the application developer.

Once built, the routing slip is executed, which sends it to the first activity’s execute URI. To make it easy and to ensure that source information is included, an extension method to IServiceBus is available, the usage of which is shown below.

bus.Execute(routingSlip); // pretty exciting, eh?

It should be pointed out that if the URI for the first activity is invalid or cannot be reached, an exception will be thrown by the Execute method.

Hosting Activities in MassTransit

To host an activity in a MassTransit service bus instance, the configuration namespace has been extended to include two additional subscription methods (thanks to the power of extension methods and a flexible configuration syntax, no changes to MassTransit were required). Shown below is the configuration used to host an activity.

var executeUri = new Uri(“rabbitmq://localhost/execute_example”);
var compensateUri = new Uri(“rabbitmq://localhost/compensate_example”);

IServiceBus compensateBus = ServiceBusFactory.New(x =>
    {
        x.ReceiveFrom(compensateUri);
        x.Subscribe(s => s.CompensateActivityHost<ExampleActivity, ExampleLog>(
            _ => new ExampleActivity());
    });

IServiceBus executeBus = ServiceBusFactory.New(x =>
    {
        x.ReceiveFrom(executeUri);
        x.Subscribe(s => s.ExecuteActivityHost<ExampleActivity, ExampleArguments>(
            compensateUri,
             _ => new ExampleActivity());
    });

In the above example two service bus instances are created, each with their own input queue. For execution, the routing slip is sent to the execution URI, and for compensation the routing slip is sent to the compensation URI. The actual URIs used are up to the application developer, the example merely shows the recommended approach so that the two addresses are easily distinguished. The URIs must be different!

Monitoring Routing Slips

During routing slip execution, events are published when the routing slip completes or faults. Every event message includes the TrackingNumber as well as a Timestamp (in UTC, of course) indicating when the event occurred:

  • RoutingSlipCompleted
  • RoutingSlipFaulted
  • RoutingSlipCompensationFailed

Additional events are published for each activity, including:

  • RoutingSlipActivityCompleted
  • RoutingSlipActivityFaulted
  • RoutingSlipActivityCompensated
  • RoutingSlipActivityCompensationFailed

By observing these events, an application can monitor and track the state of a routing slip. To maintain the current state, an Automatonymous state machine could be created. To maintain history, events could be stored in a database and then queried using the TrackingNumber of the RoutingSlip.

Wrapping Up

MassTransit Courier is a great way to compose dynamic processing steps into a routing slip that can be executed, monitored, and compensated in the event of a fault. When used in combination with the existing saga features of MassTransit, it is possible to coordinate a distributed set of services into a reliable and supportable system.