Aug 28

One of the first applications we built with MassTransit provides messaging for a long-running transaction started by an application submitting a request. The request is formatted into a X12 envelope and sent to a web service. An intermediate response is returned (a X12 997) with a correlation identifier for the request. Another web service is polled for the response, which can be the result or an indication that the request is still pending. When the response is received, the X12 document is translated and stored in the database. Finally, the user is notified that the transaction is complete and the result is displayed.

As work proceeded on this application, I started to recognize the need for something to coordinate the different steps in a transaction involving multiple loosely-coupled services. Due to the duration of these transactions (the example above can take anywhere from three to sixty seconds), it is unreasonable to keep a single System.Transactions style transaction open the entire time. I started researching how others approached the problem and found a couple of articles that helped. After reading Sagas by Hector Garcaa-Molrna and Kenneth Salem (© 1987 ACM, PDF) and the chapter on sagas in the upcoming book Practical SOA by Arnon Rotem-Gal-Oz (PDF), I started to think about how this could be implemented within MassTransit.

I should note that NServiceBus (another open source service bus) also supports sagas, but I purposely avoiding taking a look at how Udi Dahan implemented them. Once saga support in MassTransit is complete I plan to review the source for NServiceBus to see how the implementations differ. I spoke with Udi at ALT.NET Seattle and his writing has been both educational and inspirational. A lot of great discussions in the NServiceBus mailing list have been an excellent resource as well.

So after a few weeks of trying to flesh out the structure (using TDD, of course), I finally arrived at what I think will be a highly usable infrastructure for handling sagas. In the project MassTransit.Saga.Tests, I’ve created a test that simulates a user registering for a web site. The class for the registration is shown below.


public class RegisterUserSaga :
	InitiatedBy< RegisterUser >,
	Orchestrates< UserVerificationEmailSent >,
	Orchestrates< UserValidated >,
	ISaga< RegisterUserSaga >
{
	private string _displayName;
	private string _email;
	private string _password;
	private string _username;

	public RegisterUserSaga(Guid correlationId)
	{
		CorrelationId = correlationId;
	}

	public Guid CorrelationId { get; private set; }
	public IObjectBuilder Builder { get; set; }
	public IServiceBus Bus { get; set; }
	public Action< RegisterUserSaga > Save { get; set; }

	public void Consume(RegisterUser message)
	{
		_displayName = message.DisplayName;
		_username = message.Username;
		_password = message.Password;
		_email = message.Email;

		Save(this);
		Bus.Publish(new SendUserVerificationEmail(CorrelationId, _email));
	}

	public void Consume(UserVerificationEmailSent message)
	{
		// once the verification e-mail has been sent, we allow 24 hours to pass before we
		// remove this transaction from the registration queue
		Bus.Publish(new UserRegistrationPending(CorrelationId));
		Bus.Publish(new UpdateSagaTimeout(CorrelationId, TimeSpan.FromHours(24)));
	}

	public void Consume(UserValidated message)
	{
		// at this point, the user has clicked the link in the validation e-mail
		Bus.Publish(new UserRegistrationComplete(CorrelationId));
		Bus.Publish(new CompleteSaga(CorrelationId));
	}
}

At the top of the class, the messages consumed by the saga are specified. InitiatedBy indicates the message initiates a new instance of the saga. Orchestrates is for messages that are part of the saga once it has been initiated. All saga instances are identified by a Guid and all messages consumed by the saga should have the CorrelatedBy< Guid > interface. A saga must also implement the ISaga generic interface to allow certain properties to be set giving the saga instance access to the bus and the object builder.

Once the saga class is added to the bus (via the AddComponent method), any messages consumed by the saga will be dispatched to the saga instance. A generic ISagaRepository must also be registered in the container so that sagas can be persisted between messages. The saga dispatcher uses the repository to either load or create the instance of the saga. Since instances of the saga class are saved, the class can expect the members to also be persisted between messages allowing state to be retained.

There is still some work to be done, including a service to handle timeouts and retries. It will be up to the developer to handle any compensating actions that need to be taken in the case of a failure. Therefore, it is highly suggested that the saga also consume any Fault< T > messages that are published when a message consumer throws an exception — particularly if the consumer is not part of the saga (such as an application or domain service).

The code is currently in the trunk and slated to be part of the 0.3 release.

Aug 22

I was reading through the Xgrid documentation for OS X yesterday after reading an article on Integrating Xgrid Into Cocoa Applications. The article gave me some ideas and I decided to see what it would take to build a distributed processing system on top of MassTransit. The result is a new MassTransit.Grid namespace that includes support for building distributed task processing into an application. The following sections define the language used in the distributed task classes.

Distributed Tasks

A distributed task contains one or more subtasks that need to be processed concurrently across multiple systems. To create a distributed task, create a class that implements IDistributedTask. The input and output types for the subtasks must also be defined by the distributed task class.


public interface IDistributedTask< TTask , TInput, TOutput >
{
    int SubTaskCount { get; }
    TInput GetSubTaskInput(int subTaskId);
    void DeliverSubTaskOutput(int subTaskId, TOutput output);
    void NotifySubTaskException(int subTaskId, Exception ex);
    void WhenCompleted(Action<ttask> action);
}

Subtasks

A subtask is an individual unit of work within a distributed task. Each subtask should be completely standalone and not depend upon the completion of any other subtask within the distributedtask. There is no attempt to execute the subtasks within a distributed task in order. A subtask has specific input and output types, each of which are defined by a class (POCO style). These input types are used to determine which workers are used to process the subtasks.

DistributedTaskController

To insulate the application from the details of coordinating the subtasks, a generic DistributedTaskController is used. This class is built from the class that implements IDistributedTask, along with the input and output types. Once created, the application can call .Start() to being processing the distributed task. The controller performs any initial identification of workers that are available to process the subtasks, along with the coordination to ensure that workers are not overloaded.


public class DistributedTaskController< TTask , TInput, TOutput >

TTask is the class that implements IDistributedTask, TInput is the subtask input type, and TOutput is the subtask output type.

Workers

To make it easy to create workers to handle subtasks, a default worker implementation is available. This worker handles the coordination with the DistributedTaskController, along with the delegation of the messages to the actual subtask worker. For example, a worker that accepts a GenerateFileHash object and outputs a FileHashGenerated object would be setup as shown:


public class FileHashGenerator :
       ISubTaskWorker<  GenerateFileHash , FileHashGenerated >
   {
       public void ExecuteTask(GenerateFileHash input, Action< FileHashGenerated > output)
       {
           string path = input.Path;
 
           // do work here
 
           output(new FileHashGenerated());
       }
   }

The worker can then be added to the container for servers that will be processing the subtasks using:


_container.AddComponent<  FileHashGenerator >();
_bus.AddComponent< SubTaskWorker < FileHashGenerator, GenerateFileHash, FileHashGenerated > >();

This will register the SubTaskWorker for the worker as a message handler for the messages that are used on the transport to transfer the input and output data between the controller and the subtask workers.

Exception Handling

If an exception occurs in a subtask, the worker and controller leverage the built-in fault handling support of MassTransit to notify the distributed task that an exception has occurred. The controller will call the NotifySubTaskException method with the subTaskId and the exception that was thrown by the worker allowing the distributed task to determine the next course of action based on that failure. Options would include simply aborting the distributed task, fixing the input data and adding it to the end of the subtask list, or some other application-defined behavior.

Dynamically Adding Subtasks

To reduce the impact of setup time on the overall duration of a distributed task, it is not necessary to have all of the subtasks loaded before starting the distributed task. This also allows additional subtasks to be added based on the output from other subtasks. For example, a task to parse a remote file system may identify additional folders that need scanned for content. The distributed task could just add those folders to the end of the subtask list and they would be picked up by the controller. By allowing this, the distributed task is responsible for calling the delegate set by the controller to indicate that all of the subtasks have completed. The DistributedTaskController will then release any resources that were in use.

Sample in Unit Tests

A quick sample was built in the unit tests (MassTransit.Grid.Test) that shows an integer factoring service. The distributed task creates a bunch of very large integers and processes them as a distributed task between the workers that are available. Hopefully this demonstrates how the classes are hooked together since this was used to drive out the feature set.

Wrapping Up

This is just a brief introduction to the distributed processing capabilities that were added to MassTransit. There are likely some additional features to add that will hopefully be identified as the feature it put to use. Therefore, it is important to note that this feature is still in development and should go through some considerable testing before putting it into use in a production application. Any feedback is always welcome (including patches) so try it out!

Dec 30

Over the past couple of weeks, I have been working on nailing down some design for a service bus to add real-time capability to one of our applications. This has led to spending some time looking at nServiceBus as well as a few other examples. I’ve also been reading Enterprise Integration Patterns (Hohpe/Woolf) to understand some of the established patterns in the space.

It’s interesting to see the different approaches people take to things like transactions and message recovery. From my experience with health care transactions, there is no way to have a distributed transaction across the myriad of systems involved in a health care exchange. With no clear standards and hardly any real-time communication with insurance companies, auditing and electronic reconciliation are the only plausible methods of determining the state of each transaction.

The discussion is lively at the nServiceBus Yahoo Group, so if you have an interest in that style of integration, stop in and join the conversation.

Dru Sellers and I have started working on a simplified version of a service bus called MassTransit. It’s an open-source project, Apache 2.0 License, and could use some comments and/or contributions. We’re going at it with the simplified YAGNI approach to meet some specific needs for work-related projects. We’re also focusing on an easy to configure, easy to code approach to reduce the difficulty of building systems using message-based communication.

It’s an ongoing project to find the right solution for a real business problem, so we’ll see how it goes!