Category Archives: MassTransit

Implementing Routing Slip with MassTransit

This article introduces MassTransit.Courier, a new project that implements the routing slip pattern on top of MassTransit, a free, open-source, and lightweight message bus for the .NET platform.

Introduction

When sagas were originally conceived in MassTransit, they were inspired by an excerpt from Chapter 5 in the book SOA Patterns by Arnon Rotem-Gal Oz. Over the past few months, the community has argued discussed how the use of the word saga has led to confusion and how early implementations included in both NServiceBus and MassTransit do not actually align with the original paper published in 1987 by Princeton University and written by Hector Garcia-Molina and Kenneth Salem in which the term was coined.

With MassTransit Courier, the intent is to provide a mechanism for creating and executing distributed transactions with fault compensation that can be used alongside the existing MassTransit sagas for monitoring and recovery.

Background

Over the past few years building distributed systems using MassTransit, a pattern I consistently see repeated is the orchestration of multiple services into a single business transaction. Using the existing MassTransit saga support to manage the state of the transaction, the actual processing steps are created as autonomous services that are invoked by the saga using command messages. Command completion is observed using an event or response message by the saga, at which point the next processing step is invoked. When the saga has invoked the final service the business transaction is complete.

As the processing required within a business transaction changes with evolving business requirements, a new version of the saga is required that includes the newly created processing steps. Knowledge of the new services becomes part of the saga, as well as the logic to identify which services need to be invoked for each transaction. The saga becomes rich with knowledge, and with great knowledge comes great responsibility (after all, knowledge is power right?). Now, instead of only orchestrating the transaction, the saga is responsible for identifying which services to invoke based on the content of the transaction. Another concern was the level of database contention on the saga tables. With every service invocation being initiated by the saga, combined with the saga observing service events and responses, the saga tables gets very busy.

Beyond the complexity of increasing saga responsibilities, more recently the business has requested the ability to selectively route a message through a series of services based on the content of the message. In addition to being able to dynamically route messages, the business needs to allow new services to be created and added to the inventory of available services. And this should be possible without modifying a central control point that dispatches messages to each service.

Like most things in computer science, this problem has already been solved.

The Routing Slip Pattern

A routing slip specifies a sequence of processing steps for a message. As each processing step completes, the routing slip is forwarded to the next step. When all the processing steps have completed, the routing slip is complete.

A key advantage to using a routing slip is it allows the processing steps to vary for each message. Depending upon the content of the message, the routing slip creator can selectively add processing steps to the routing slip. This dynamic behavior is in contrast to a more explicit behavior defined by a state machine or sequential workflow that is statically defined (either through the use of code, a DSL, or something like Windows Workflow).

MassTransit Courier

MassTransit Courier is a framework that implements the routing slip pattern. Leveraging a durable messaging transport and the advanced saga features of MassTransit, MassTransit Courier provides a powerful set of components to simplify the use of routing slips in distributed applications. Combining the routing slip pattern with a state machine such as Automatonymous results in a reliable, recoverable, and supportable approach for coordinating and monitoring message processing across multiple services.

In addition to the basic routing slip pattern, MassTransit Courier also supports compensations which allow processing steps to store process-related data so that reversible operations can be undone, using either a traditional rollback mechanism or by applying an offsetting operation. For example, a processing step that holds a seat for a patron could release the held seat when compensated.

MassTransit Courier is free software and is covered by the same open source license as MassTransit (Apache 2.0). You can install MassTransit.Courier into your existing solution using NuGet.

Activities

In MassTransit Courier, an Activity refers to a processing step that can be added to a routing slip. To create an activity, create a class that implements the Activity interface.

public class DownloadImageActivity :
    Activity<DownloadImageArguments, DownloadImageLog>
{
}

The Activity interface is generic with two arguments. The first argument specifies the activity’s input type and the second argument specifies the activity’s log type. In the example shown above, DownloadImageArguments is the input type and DownloadImageLog is the log type. Both arguments must be interface types so that the implementations can be dynamically created.

Implementing an Activity

An activity must implement two interface methods, Execute and Compensate. The Execute method is called while the routing slip is executing activities and the Compensate method is called when a routing slip faults and needs to be compensated.

When the Execute method is called, an execution argument is passed containing the activity arguments, the routing slip TrackingNumber, and methods to mark the activity as completed or faulted. The actual routing slip message, as well as any details of the underlying infrastructure, are excluded from the execution argument to prevent coupling between the activity and the implementation. An example Execute method is shown below.

ExecutionResult Execute(Execution<DownloadImageArguments> execution)
{
    DownloadImageArguments args = execution.Arguments;
    string imageSavePath = Path.Combine(args.WorkPath, 
        execution.TrackingNumber.ToString());

    _httpClient.GetAndSave(args.ImageUri, imageSavePath);

    return execution.Completed(new DownloadImageLogImpl(imageSavePath));
}

Once activity processing is complete, the activity returns an ExecutionResult to the host. If the activity executes successfully, the activity can elect to store compensation data in an activity log which is passed to the Completed method on the execution argument. If the activity chooses not to store any compensation data, the activity log argument is not required. In addition to compensation data, the activity can add or modify variables stored in the routing slip for use by subsequent activities.

In the example above, the activity creates an instance of a private class that implements the DownloadImageLog interface and stores the log information in the object properties. The object is then passed to the Completed method for storage in the routing slip before sending the routing slip to the next activity.

When an activity fails, the Compensate method is called for previously executed activities in the routing slip that stored compensation data. If an activity does not store any compensation data, the Compensate method is never called. The compensation method for the example above is shown below.

CompensationResult Compensate(Compensation<DownloadImageLog> compensation)
{
    DownloadImageLog log = compensation.Log;
    File.Delete(log.ImageSavePath);

    return compensation.Compensated();
}

Using the activity log data, the activity compensates by removing the downloaded image from the work directory. Once the activity has compensated the previous execution, it returns a CompensationResult by calling the Compensated method. If the compensating actions could not be performed (either via logic or an exception) and the inability to compensate results in a failure state, the Failed method can be used instead, optionally specifying an Exception.

Building a Routing Slip

Developers are discouraged from directly implementing the RoutingSlip message type and should instead use a RoutingSlipBuilder to create a routing slip. The RoutingSlipBuilder encapsulates the creation of the routing slip and includes methods to add activities, activity logs, and variables to the routing slip. For example, to create a routing slip with two activities and an additional variable, a developer would write:

var builder = new RoutingSlipBuilder(NewId.NextGuid());
builder.AddActivity(“DownloadImage”, “rabbitmq://localhost/execute_downloadimage”, new
    {
        ImageUri = new Uri(“http://images.google.com/someImage.jpg”)
    });
builder.AddActivity(“FilterImage”, “rabbitmq://localhost/execute_filterimage”);
builder.AddVariable(“WorkPath”, “\\dfs\work”);

var routingSlip = builder.Build();

Each activity requires a name for display purposes and a URI specifying the execution address. The execution address is where the routing slip should be sent to execute the activity. For each activity, arguments can be specified that are stored and presented to the activity via the activity arguments interface type specify by the first argument of the Activity interface. The activities added to the routing slip are combined into an Itinerary, which is the list of activities to be executed, and stored in the routing slip.

Managing the inventory of available activities, as well as their names and execution addresses, is the responsibility of the application and is not part of the MassTransit Courier. Since activities are application specific, and the business logic to determine which activities to execute and in what order is part of the application domain, the details are left to the application developer.

Once built, the routing slip is executed, which sends it to the first activity’s execute URI. To make it easy and to ensure that source information is included, an extension method to IServiceBus is available, the usage of which is shown below.

bus.Execute(routingSlip); // pretty exciting, eh?

It should be pointed out that if the URI for the first activity is invalid or cannot be reached, an exception will be thrown by the Execute method.

Hosting Activities in MassTransit

To host an activity in a MassTransit service bus instance, the configuration namespace has been extended to include two additional subscription methods (thanks to the power of extension methods and a flexible configuration syntax, no changes to MassTransit were required). Shown below is the configuration used to host an activity.

var executeUri = new Uri(“rabbitmq://localhost/execute_example”);
var compensateUri = new Uri(“rabbitmq://localhost/compensate_example”);

IServiceBus compensateBus = ServiceBusFactory.New(x =>
    {
        x.ReceiveFrom(compensateUri);
        x.Subscribe(s => s.CompensateActivityHost<ExampleActivity, ExampleLog>(
            _ => new ExampleActivity());
    });

IServiceBus executeBus = ServiceBusFactory.New(x =>
    {
        x.ReceiveFrom(executeUri);
        x.Subscribe(s => s.ExecuteActivityHost<ExampleActivity, ExampleArguments>(
            compensateUri,
             _ => new ExampleActivity());
    });

In the above example two service bus instances are created, each with their own input queue. For execution, the routing slip is sent to the execution URI, and for compensation the routing slip is sent to the compensation URI. The actual URIs used are up to the application developer, the example merely shows the recommended approach so that the two addresses are easily distinguished. The URIs must be different!

Monitoring Routing Slips

During routing slip execution, events are published when the routing slip completes or faults. Every event message includes the TrackingNumber as well as a Timestamp (in UTC, of course) indicating when the event occurred:

  • RoutingSlipCompleted
  • RoutingSlipFaulted
  • RoutingSlipCompensationFailed

Additional events are published for each activity, including:

  • RoutingSlipActivityCompleted
  • RoutingSlipActivityFaulted
  • RoutingSlipActivityCompensated
  • RoutingSlipActivityCompensationFailed

By observing these events, an application can monitor and track the state of a routing slip. To maintain the current state, an Automatonymous state machine could be created. To maintain history, events could be stored in a database and then queried using the TrackingNumber of the RoutingSlip.

Wrapping Up

MassTransit Courier is a great way to compose dynamic processing steps into a routing slip that can be executed, monitored, and compensated in the event of a fault. When used in combination with the existing saga features of MassTransit, it is possible to coordinate a distributed set of services into a reliable and supportable system.

MassTransit v2.5.3 Now Supports the TPL

As I’ve started to use MassTransit with SignalR, one of the things that annoyed me was the hoops I had to jump through to get a nice asynchronous request from SignalR into MassTransit. There was a lot of plumbing since the MT did not support the TPL.

Well, I’ve changed that. With version 2.5.3 (a prerelease version on NuGet), you can now get a really nice clean syntax to return tasks from server-side SignalR hubs (and other calls that expect a Task return value.

Shown above is a SignalR hub that sends a request message off to some service. The LocationResult handler that is added within the closure returns a Task<LocationResult>, which can be returned to SignalR allowing the server-side code to remain asynchronous. If additional work needed to be done to transform the message to another type or do perform some type of validation, a .ContinueWith() could be added to the task to return the proper result type.

If the request times out, the task for the handler will be cancelled, so be sure to take that into account.

Pretty power stuff eh?

Oh, here’s a demo of how to use it with SignalR: SignalRMassTransitTPLDemo.zip

NWA User Group SignalR Demo Code

Last week, I gave a talk at the Northwest Arkansas .NET user group on SignalR. Based on the response (as I’ve seen at nearly every event I’ve spoken about SignalR), there is a lot of interest in this library for building rich HTML applications with .NET backend services.

The demo code from the talk, which includes an action item list, along with an external service that posts events to browser clients via MassTransit, is available for download below.

NWADemo2012.zip

MassTransit v2.0 Beta Available Now on NuGet!

After what seems like a long slumber, along with work being done on other projects such as Topshelf and Stact, it is our great pleasure to announce the first beta release of MassTransit v2.0. What originally started out as a minor “1.3” update has turned into a full-out cleanup of the codebase, including a refinement of the configuration API. Since there were some breaking changes to the configuration, we felt a 2.0 moniker was better to ensure users of the framework understood the depth of the changes made.

And what a list of changes it is (TL;DR = We filled it with awesomeness):

  1. Configuration
    MassTransit v2.0 now includes a streamlined configuration model built around an extensible fluent interface (inspired by Stact and Topshelf and sharing a common, consistent design). As a result, getting started with MassTransit is now easier than ever. In version 2.0, all configuration starts with the ServiceBusFactory and Intellisense guides you from there forward. The result is a clean, understandable API and a quicker out-of-the-box experience.

  2. Container-Free Support
    With the release of MassTransit 2.0, using a dependency injection container is now optional. When we started MassTransit, we leveraged the container extensively to assemble the internal workings of the bus. As we added support for other containers, required features that were not supported by a particular container led to some creative solutions (read: hacks) that were less than optimal. By moving away from a “container-first” approach, we have increased the reliability of the software and now provide container-specific extensions to subscribe consumers from the container in one simple step. We also threw in support for Autofac!

  3. Quick-Start
    By simplifying the configuration, and dropping the need for a container, it is now fast and easy to get started using our new QuickStart:
    http://docs.masstransit-project.com/en/latest/configuration/quickstart.html

  4. #NuGet
    NuGet packages have been added for the base MassTransit project, with any external dependencies (log4net and Magnum) resolved using the proper NuGet packages. Any additional references are downstream in additional NuGet packages, such as support for persisting sagas using NHibernate (MassTransit.NHibernate), and the various dependency injection containers supported.

  5. Multiple Subscription Service Options
    In addition to the existing RuntimeServices included with MassTransit, an all-new peer-to-peer subscription service has been added. By leveraging the reliable multi-cast support in MSMQ, services can now exchange subscription information without the need for a centralized subscription service. To ensure everything is setup correctly, a VerifyMsmqConfiguration method has been added that will check the installation of MSMQ and install any missing components. This is the first iteration of multi-cast support, and we need to get some mileage on it. In the meantime, the original run-time services continue to work as expected.

  6. Documentation
    Which brings us to the next big update. DOCS! They’re not perfect, and they’re far from complete, but we have focused on the configuration story to help get you up and running. As we see a need for more documentation in a given area, we will continue to flush out the docs appropriately. The docs are located at http://docs.masstransit-project.com/ and are being hosted by the fine people at http://readthedocs.org. [Thanks Eric!]

  7. Support for .NET 4.0 and .NET 3.5
    The project files and solution have all been updated to Visual Studio 2010 SP1. By default, all projects are now built in the IDE targeting .NET 4.0. The command-line build (which has been revamped to use Rake and Albacore) builds both .NET 3.5 and .NET 4.0 assemblies, including the run-time services and System View. The NuGet packages also include the proper bindings for the target project run-time version (you must use the full .NET 4.0 profile with MassTransit, the client profile is not supported).

  8. Transport Support
    Internally, the transports and endpoints have been redesigned to improve the support for new transports like RabbitMQ (and improve our ActiveMQ support). For example, transports are now inbound, outbound, or both, allowing us to properly leverage fan-out exchanges on RabbitMQ for publishing and subscribing to messages. There is more to come in this area as we take greater advantage of these advanced transport features. If you’re a RabbitMQ or ActiveMQ user and don’t mind getting your hands dirty, now is a great time to jump in and help improve transport support.

  9. Distributor Consumer And Saga Support
    Work on the MassTransit distributor subsystem continues to be improved. Testing on a multi-master system has been completed which will allow it to serve multiple distributors to improve load balancing efficiency. Support for all sagas (previously only state machine sagas were supported) has been added as well.

  10. Swinging the Feature Axe
    Some previous troublesome and poorly supported features (Batching and Message Grouping) were removed from the 2.0 release to reduce code complexity. Also in light of the new Parallel Tasks work in the framework the Parallel namespace has been removed.

In the next few days, I’ll be posting an annotated walkthrough of the new configuration API. In the meantime, fire up Visual Studio 2010, create ConsoleApplication69, switch to the full .NET 4.0 framework, and Add a Library Package Reference to MassTransit using NuGet. Paste the code from the Quick Start into your program.cs and check it out!

MassTransit Update

It’s been a while since I’ve posted about MassTransit, the .NET distributed application framework and service bus that Dru Sellers, myself, and several other contributors have been working on for the past 2.5 years. This is mostly due to a pretty heavy schedule in the first quarter (CodeMash, the MVP Summit, Pablo’s Fiesta, the North Dallas .NET User Group, and having an in-ground swimming pool built), along with a lot of exploratory coding on some new features for the framework.

While traveling to community events, it’s amazing to run into folks that are using MassTransit in their applications — particularly ones that I’ve never heard from before. It’s a cool feeling to know that people are finding value in the effort we’ve put into it. Over the past few months, several organizations have been finalizing the testing of new applications built on top of MassTransit, taking advantage of the state-driven saga support (including the new distributor for load balancing saga instances across servers), in preparation for production launches. Several contributors have offered tremendous help with this new functionality, including development of and testing with the distributor support.

It is great to see users of the framework taking an active role in shaping the feature set to meet a more generalized set of requirements. When Dru and I started creating MassTransit, we had a fairly narrow feature set that needed to be implemented. As the use of messaging in our applications expanded, we started to identify new features that would provide additional benefits. Being able to harvest application-specific code into the framework has provided a high level of reuse which helps everyone.

GitHub

The biggest move that we made in the past few months was leaving GoogleCode behind and migrating the project to GitHub. If you have not yet discovered it yet, Git is an amazing distributed version control system that offers tremendous flexibility when working on highly distributed projects, such as open source projects. Combined with the amazing social collaboration that occurs on GitHub, we have merged significantly more features from contributors on GitHub in the past few months than we had previously received the entire time we were hosted at GoogleCode. Many have forked the main project (which is hosted on my GitHub account, and is also used to drive the official builds on the CodeBetter TeamCity server) and made changes that I have merged into the main project.

You can still download a compressed archive of the latest source from GitHub by clicking the download source link on the project source page. You can also download the latest build (or the tagged v1.0 build) from the TeamCity artifacts link. The build runs automatically when code is push to GitHub, so the latest build is always from the latest bits in the master branch.

Major Milestone

On March 1st, we marked a v1.0 release candidate of MassTransit (and the related projects Topshelf and Magnum). With open source projects there is always a syndrome of 0.x, where many projects never reach 1.0 yet are still used in production systems at multiple organizations. Considering the number of organizations using MassTransit (and even Topshelf by itself for hosting Windows services), we decided it was time to mark the release 1.0 and freeze the feature set for that line of the codebase.

Since the 1.0 release candidate, there has been very little active development on the MassTransit codebase. The reason for this is simple, we wanted to allow the framework a little time to soak into the community. There are a lot of features that we want to put into the framework, and several of these are under heavy development outside of the master codebase, but the main feature set for 1.0 was released to allow organizations to go forward with implementations that were waiting on an official release.

Documentation

We have heard you, and we are going to start improving the documentation. We’ve set up a site specific to each project (MassTransit, Topshelf, Magnum) and are going to be harvesting the content from the wiki to create a reference set of documentation for using MassTransit. Hopefully we can take some of the questions from the discussion list and get them into a QA/FAQ section as well.

As an aside, I started this post based on a purse fight that was held on Twitter this morning in regards to OSS activity in the .NET community. It is certainly important as an open source project owner to keep the lines of communication flowing in regards to the project status, new features, and roadmap. I plan to work with Dru over the next few days to get these details laid out on the web site so that we can get feedback from the community.

In the next few weeks, I hope to start detailing out some of the 2.0 features that are planned for MassTransit. There are several exciting features in the pipeline, including an entirely new set of edge components for interfacing with clients connected via Ajax/JSON, WCF, or regular web services. As I get my actors together, I hope to post some details, as well as complete my series on building a service gateway using the new edge components.

Building a Service Gateway Using MassTransit, Part 3

This post is the third in a series on building a highly available service gateway. The implementation will be built in C# using MassTransit, StructureMap, ASP.NET MVC, and NHibernate.

Did somebody say code?

The past two posts began to explain how to build a service gateway using MassTransit. In this post, I’m going to share some of the initial code that makes up the gateway service. The gateway itself consists of two components. The first implements the communication to the external service with a set of messages that are only used internally by the service. The second is the saga that provides the interface to the service gateway.

Service Contract

The interface exposed to the application consists of two messages, the first for the command and the second for the response. The message contracts are defined using interfaces, allowing the class for the message to be an internal implementation detail.

The message contract representing a request for order details includes the CustomerId and the OrderId.

public interface RetrieveOrderDetails
{
	string OrderId { get; }
	string CustomerId { get; }
}

When the order details are received, the following message is published.

public interface OrderDetailsReceived
{
	string OrderId { get; }
	string CustomerId { get; }
	DateTime Created { get; }
	OrderStatus Status { get; }
}

public enum OrderStatus
{
	Unknown = 0,
	Submitted = 1,
	Accepted = 2,
	InProcess = 3,
	Complete = 4,
}

The CustomerId and OrderId are the same as the values passed in the request. Created is when the order was created, and Status is an enum representing the status of the order.

Notice that no internal values are included — no primary key from the order table and no primary key from the customer table. The request and response are correlated on identifiers that make sense in the application domain. While SQL purists will point out that numeric primary keys are quicker for retrieving rows in a database, they make for a very fragile interface with other components in the system. Reliance on a primary key outside of the context of the system storing the order details is a path to friction or outright failure.

Time To Make The Saga

At this point in the design of our service, the need for a saga to manage the request state is not entirely obvious. While the TDD purists might want to call YAGNI at this point, let me assure you that “it will all be… revealed!” So for now, let us take a look at the first pass of our saga definition.

public class OrderDetailsRequestSaga :
	SagaStateMachine<OrderDetailsRequestSaga>,
	ISaga
{
	static OrderDetailsRequestSaga()
	{
		Define(Saga);
	}

Our saga state machine uses a static initializer to define the states, events, and transitions of a saga. The previous code merely defines our class as a saga and calls our saga initialization method (shown below).

	private static void Saga()
	{
		Correlate(RequestReceived)
			.By((saga, message) => saga.CustomerId == message.CustomerId &&
			                       saga.OrderId == message.OrderId &&
			                       saga.CurrentState == WaitingForResponse);

		Correlate(ResponseReceived)
			.By((saga, message) => saga.CustomerId == message.CustomerId &&
			                       saga.OrderId == message.OrderId &&
			                       saga.CurrentState == WaitingForResponse);

Since our request criteria include our customer id and our order id, we use those to correlate the message to the saga. We also include the state of the saga to ensure that we do not match to a request that has already completed. We will look at some other ways we can enhance the performance of the service later on by using some additional states.

	public static State Initial { get; set; }
	public static State WaitingForResponse { get; set; }
	public static State Completed { get; set; }

The three states we have defined, including an initial state when a new saga instance is created, a waiting for response state one our request has been sent to the service, and a completed state once the response has been received and published.

	public static Event<RetrieveOrderDetails> RequestReceived { get; set; }
	public static Event<OrderDetailsResponse> ResponseReceived { get; set; }
	public static Event<OrderDetailsRequestFailed> RequestFailed { get; set; }

The three events that we have defined, including the message contract that maps to the event. The subscription logic for the saga will automatically map message handlers for these events that will invoke the actions depending upon the current state of the saga.

		Initially(
			When(RequestReceived)
				.Then((saga, request) =>
					{
						saga.OrderId = request.OrderId;
						saga.CustomerId = request.CustomerId;
					})
				.Publish((saga, request) => new SendOrderDetailsRequest
					{
						RequestId = saga.CorrelationId,
						CustomerId = saga.CustomerId,
						OrderId = saga.OrderId,
					})
				.TransitionTo(WaitingForResponse));

The first event handler, RequestReceived, is invoked when the saga is created in response to the RetrieveOrderDetails message. The handler copies the properties of the request, and then publishes the request message to the proxy that will call the external web service. After the message is published, the state of the saga transitions to the waiting for response state. When using transactional queues, the receipt of the message, creation of the saga in the database, sending of the command message to the proxy, and saving the saga are all part of a single distributed transaction. This ensures that everything completes as a single operation to ensure no requests are lost.

		During(WaitingForResponse,
			When(ResponseReceived)
				.Then((saga, response) =>
					{
						saga.OrderCreated = response.Created;
						saga.OrderStatus = response.Status;
					})
				.Publish((saga, request) => new OrderDetails
					{
						CustomerId = saga.CustomerId,
						OrderId = saga.OrderId,
						Created = saga.OrderCreated.Value,
						Status = saga.OrderStatus,
					})
				.TransitionTo(Completed));
	}

The second event handler, ResponseReceived, is invoked when the OrderDetailsResponse message is received. The results of the request are stored in the saga and a message is published containing the details of the order back to the original requestor. Another approach would be to capture the requestor address (via the ResponseAddress header from the original message) and then resolve that address using the endpoint factory to send the response directly to the requestor. I don’t really encourage this approach without having a truly unique identifier for each request.

	public OrderDetailsRequestSaga(Guid correlationId)
	{
		CorrelationId = correlationId;
	}

	protected OrderDetailsRequestSaga()
	{
	}

	public virtual string CustomerId { get; set; }
	public virtual string OrderId { get; set; }
	public virtual OrderStatus OrderStatus { get; set; }
	public virtual DateTime? OrderCreated { get; set; }

	public virtual Guid CorrelationId { get; set; }
	public virtual IServiceBus Bus { get; set; }
}

The rest of the saga class is shown above for completeness. The properties are part of the saga and get saved when the saga is persisted (using the NHibernate saga persister, or in the case of the sample the in-memory implementation). The constructor with the Guid is used to initialize the saga when a new one is created, the protected one is there for NHibernate to be able to persist the saga.

The Service Proxy

The saga uses the external service proxy to perform the actual work, which is shown in the proxy class below.

public class OrderDetailsWebServiceProxy :
	Consumes<SendOrderDetailsRequest>.All
{
	public void Consume(SendOrderDetailsRequest request)
	{
		// make the call to the service to get the order details here

		var details = new OrderDetailsResponse
			{
				OrderId = request.OrderId,
				CustomerId = request.CustomerId,
				Created = (-1).Days().FromUtcNow(),
				Status = OrderStatus.InProcess,
			};

		CurrentMessage.Respond(details, x => x.ExpiresAt(5.Minutes().FromNow()));
	}
}

The message handler uses the criteria from the SendOrderDetailsRequest message (which was published by the saga) to call the external service and retrieve the order details. The details are then returned to the saga in the form of an OrderDetailsResponse message which is internal to the service (and therefore not part of the interface assembly that is provided to applications that want to use the order details service).

Test That Thang

Now that our saga has been developed, we need to be able to test it. A unit test will be created that creates a testing instance of the service bus (see the sample for the implementation details) and verifies that the saga responds properly to the request. To request order details, a very simple client would subscribe to the response message and then publish the request.

const string orderId = "ABC123";
const string customerId = "12345";

LocalBus.Subscribe<OrderDetailsReceived>(message =>
	{
		response.Set(message);
	},
	x => x.OrderId == orderId && x.CustomerId == customerId);

RetrieveOrderDetails request = new RetrieveOrderDetailsRequest(customerId, orderId);
LocalBus.Publish(request, x => x.SendResponseTo(LocalBus.Endpoint));

The subscribe method used above specifies that when a message of type OrderDetailsReceived is received, if the contents of the message match the predicate specified (which in this case, is checking the OrderId and CustomerId contained in the message) then the statement specified should be called. Our example is from the integration test (built using NUnit) that verifies the service performs from end-to-end.

The syntax above is functional, and it will work, but it does not represent the most scalable approach. In the next post, I’ll start to explain how to build a much more scalable method of handling thousands of concurrent requests on a single machine using IIS.

In the meantime, the sample code is available in the MassTransit trunk as a standalone solution. You can find it in the trunk\src\Samples\ServiceGatewaySample folder. There are unit tests that verify the calling syntax shown above and a service that hosts the services (both the saga, and the proxy service).

Building a Service Gateway Using MassTransit, Part 2

This post is the second in a series on building a highly available service gateway. The implementation will be built in C# using MassTransit, StructureMap, ASP.NET MVC, and NHibernate.

Continued…

In part one, I discussed two exchange patterns that are often exposed as a web service. In this installment, I’m going to cover a more complex exchange pattern that including makes a request to an external system in response to a request on our web service.


One of the comments on the previous post raised the question of using messaging for queries. Udi, Ayende, and I agree that this so-called “Data-SOA” is a bad thing. The queries I am presenting in this article are made against an external service and not an internal database. In many cases, this service might have limited availability and limited throughput, making it important to isolate our service to increase availability.

Complex Request

In the simple request, order status is a small enough data point that caching it at the web service makes sense. The details of an order, however, are much more involved and it is not practical to keep the details cached in case they are requested (which in this case, is much less often that the order status). To support the order detail request, we will apply a pattern that separates the inbound request from the outbound request, ensures that requests are performed only once (minimizing the click, click, click refresh mentality of some users), and even retain requests if the outbound service is unavailable until the request expires or the outbound service becomes available.

The first thing we want to build is the service that will be responsible for calling the external web service. To start, we’ll define two messages representing the happy path of calling the service. The first message contains the criteria of the request (such as the order id and perhaps the customer id) and the second message contains the details of the order that were returned by the web service. In addition to the messages on the happy path, we may also define additional messages used to publish exception information related to the request.

Once the message contracts are defined to call the web service, we need to build a service that will handle the request messages. To do this, we’ll build a message consumer for the request message, host it inside Topshelf, and subscribe to a service bus bound to the input queue of the service.
The message handler will use the request message properties to prepare the request to the external web service and produce the response message once the request returns. In this example, the external web service only supports synchronous requests (in a later article I will try to cover calling remote services that support asynchronous requests).

At this point, we have a service that responds to a command message (the request) and produces a result depending upon the outcome of the request. What we need now is a way to coordinate the requests to the service to ensure that requests are not lost due to an unavailable service, errors are retried once a failed service becomes available, and duplicate requests are ignored.

To coordinate the service requests, a saga will be created to manage the state of each request. A new message will be created to initiate the saga containing the same data that is needed to produce the request message that is sent to our gateway service. And as with the service, messages will be created to return the results of the request to the consumer of the external service. The saga will use our state machine driven saga syntax, making the business logic understandable at a glance. The saga will also define the retry parameters that should be used to recover from service outages by specifying an exception policy. The messages that are used by the requester to interact with the saga will become the interface that is used by our system to make requests to the external service. This includes the web service we are in turn providing to our customers. The internal messages used to communicate between the saga and the gateway service are not intended for use outside of our gateway service component.


A nice side benefit of this architecture involves the actual call made by the gateway service. Since the interface is defined by the messages that are orchestrated by the saga, the backing implementation can be changed without impacting the consumers of the saga-based front end. This can be a huge benefit when it comes time to change service providers or bring an external service in-house either through acquisition or new product developments.

With the saga in place (and for this example, we will go ahead and host it in the same process that we are hosting the gateway service), we are now ready to build the web service request handler. Rather than go straight into that now, I think I’m going to stop here and save that for part three. After part three is finished, I’ll start posting some of the code for the examples and include it in the samples folder to make it easy to pull down and experiment with on your own machine.