Managing Long-Lived Transactions with MassTransit.Saga

One of the first applications we built with MassTransit provides messaging for a long-running transaction started by an application submitting a request. The request is formatted into a X12 envelope and sent to a web service. An intermediate response is returned (a X12 997) with a correlation identifier for the request. Another web service is polled for the response, which can be the result or an indication that the request is still pending. When the response is received, the X12 document is translated and stored in the database. Finally, the user is notified that the transaction is complete and the result is displayed.

As work proceeded on this application, I started to recognize the need for something to coordinate the different steps in a transaction involving multiple loosely-coupled services. Due to the duration of these transactions (the example above can take anywhere from three to sixty seconds), it is unreasonable to keep a single System.Transactions style transaction open the entire time. I started researching how others approached the problem and found a couple of articles that helped. After reading Sagas by Hector Garcaa-Molrna and Kenneth Salem (© 1987 ACM, PDF) and the chapter on sagas in the upcoming book Practical SOA by Arnon Rotem-Gal-Oz (PDF), I started to think about how this could be implemented within MassTransit.

I should note that NServiceBus (another open source service bus) also supports sagas, but I purposely avoiding taking a look at how Udi Dahan implemented them. Once saga support in MassTransit is complete I plan to review the source for NServiceBus to see how the implementations differ. I spoke with Udi at ALT.NET Seattle and his writing has been both educational and inspirational. A lot of great discussions in the NServiceBus mailing list have been an excellent resource as well.

So after a few weeks of trying to flesh out the structure (using TDD, of course), I finally arrived at what I think will be a highly usable infrastructure for handling sagas. In the project MassTransit.Saga.Tests, I’ve created a test that simulates a user registering for a web site. The class for the registration is shown below.

public class RegisterUserSaga :
	InitiatedBy< RegisterUser >,
	Orchestrates< UserVerificationEmailSent >,
	Orchestrates< UserValidated >,
	ISaga< RegisterUserSaga >
{
	private string _displayName;
	private string _email;
	private string _password;
	private string _username;

	public RegisterUserSaga(Guid correlationId)
	{
		CorrelationId = correlationId;
	}

	public Guid CorrelationId { get; private set; }
	public IObjectBuilder Builder { get; set; }
	public IServiceBus Bus { get; set; }
	public Action< RegisterUserSaga > Save { get; set; }

	public void Consume(RegisterUser message)
	{
		_displayName = message.DisplayName;
		_username = message.Username;
		_password = message.Password;
		_email = message.Email;

		Save(this);
		Bus.Publish(new SendUserVerificationEmail(CorrelationId, _email));
	}

	public void Consume(UserVerificationEmailSent message)
	{
		// once the verification e-mail has been sent, we allow 24 hours to pass before we
		// remove this transaction from the registration queue
		Bus.Publish(new UserRegistrationPending(CorrelationId));
		Bus.Publish(new UpdateSagaTimeout(CorrelationId, TimeSpan.FromHours(24)));
	}

	public void Consume(UserValidated message)
	{
		// at this point, the user has clicked the link in the validation e-mail
		Bus.Publish(new UserRegistrationComplete(CorrelationId));
		Bus.Publish(new CompleteSaga(CorrelationId));
	}
}

At the top of the class, the messages consumed by the saga are specified. InitiatedBy indicates the message initiates a new instance of the saga. Orchestrates is for messages that are part of the saga once it has been initiated. All saga instances are identified by a Guid and all messages consumed by the saga should have the CorrelatedBy< Guid > interface. A saga must also implement the ISaga generic interface to allow certain properties to be set giving the saga instance access to the bus and the object builder.

Once the saga class is added to the bus (via the AddComponent method), any messages consumed by the saga will be dispatched to the saga instance. A generic ISagaRepository must also be registered in the container so that sagas can be persisted between messages. The saga dispatcher uses the repository to either load or create the instance of the saga. Since instances of the saga class are saved, the class can expect the members to also be persisted between messages allowing state to be retained.

There is still some work to be done, including a service to handle timeouts and retries. It will be up to the developer to handle any compensating actions that need to be taken in the case of a failure. Therefore, it is highly suggested that the saga also consume any Fault< T > messages that are published when a message consumer throws an exception — particularly if the consumer is not part of the saga (such as an application or domain service).

The code is currently in the trunk and slated to be part of the 0.3 release.