Enhancements to MassTransit (or Weekend of Coding)
This past couple of weeks I’ve been putting some serious time into MassTransit. My primary goal is to improve the internal architecture and remove some of the MSMQ leaks into the infrastructure. Our original goal was to stick purely to MSMQ, however, as we got more into messaging systems we found that there are a lot of other transports with different advantages. For example, using ActiveMQ would make it easy to add integration with Java applications down the line. The problem at this point was all the code designed around MSMQ was making it difficult to support other transport types.
About two weeks ago, I started working on a completely new method of dispatching messages within the service bus. My goal was to support a pure producer/consumer model using a publish/subscribe pattern. With this in mind, I built a new message dispatcher that allowed for a new way of specifying message subscriptions. Previously to this change, a service interested in messages would do the following:
_bus.Subscribe<MyMessage>(MyHandlerMethod);
public void MyHandlerMethod(IMessageContext<MyMessage> context) {}
Because of this structure, there had to be an instance of the class in memory and it somehow needed to be started so that subscriptions could be added. The class also needed to be stopped so that it could remove any subscriptions from the service bus. Another goal was to be able to use an object builder to create objects as needed to handle messages. For example, we wanted to use Castle Windsor to dynamically build objects to handle messages and get all the injection benefits of the container.
To support this new style, I added some new interfaces and made it possible to register either an object or a class with the service bus. As an actual example, compare the original subscription client code to the new version:
Before the changes:
public class SubscriptionClient : IHostedService
{
public void Start()
{
_serviceBus.Subscribe<AddSubscription>(HandleAddSubscription);
_serviceBus.Subscribe<RemoveSubscription>(HandleRemoveSubscription);
}
public void Stop()
{
_serviceBus.Unsubscribe<AddSubscription>(HandleAddSubscription);
_serviceBus.Unsubscribe<RemoveSubscription>(HandleRemoveSubscription);
}
public void HandleAddSubscription(IMessageContext<AddSubscription> ctx)
{
_cache.Add(ctx.Message.Subscription);
}
public void HandleRemoveSubscription(IMessageContext<RemoveSubscription> ctx)
{
_cache.Add(ctx.Message.Subscription);
}
}
And now after the changes:
public class SubscriptionClient : IHostedService,
Consumes<AddSubscription>.All,
Consumes<RemoveSubscription>.All
{
public void Consume(AddSubscription message)
{
_cache.Add(message.Subscription);
}
public void Consume(RemoveSubscription message)
{
_cache.Remove(message.Subscription);
}
public void Start()
{
_serviceBus.Subscribe(this);
}
public void Stop()
{
_serviceBus.Unsubscribe(this);
}
}
The code just makes more sense and is easier to understand after the changes. In addition, you can also just call _bus.AddComponent
Also notice that there are various types of consumers supported, indicated by the interface used in the consuming class. Consumes
The third option presently available is Consumes
So now a request/reply pattern would look something like this:
class Controller : Consumes<Response>.For<Guid>
{
public void Consume(Response message)
{}
public void Action()
{
_actionId = Guid.NewGuid();
_bus.Subscribe(this);
_bus.Publish(new Request(_actionId, someValue, someValue2);
}
}
When the object subscribes to the bus, the correlation identifier is used to filter incoming messages so that only correlated messages are delivered to the object. This is cleaner from a interface contract perspective since you can look at a service and see what messages are produced and ensure that your controller implements all of the expected responses.
While working on these API changes, I also made a number of other changes including:
-
Messages no longer need to implement IMessage, plain old objects can be used
-
Removed all threading from the endpoint (asynchronous message dispatching is now handled by a thread manager in the service bus
-
Added a DispatchMode so that messages could be dispatched synchronously for unit tests
I also wrote a new sample called HeavyLoad to benchmark the performance of the bus when using various transports. A variety of message per second tests are performed to see how well the system can be expected to perform based on the type of messaging being done. Early tests on my system (Windows 2003 server in VMware Fusion) show MSMQ performance to be between 950-1500 messages per second (for a 300 byte message, persistent) and around 500 messages per second doing a correlated request/response with a single thread (but using the asynchronous dispatcher). If I were to rewrite the test to use multiple message send threads I would expect performance to increase somewhat since my load test is a bit naive at the present time.
At the same time, I managed to extend the subscription support to include correlated subscriptions. The only subscription cache that currently supports the extensions is the DistributedSubscriptionCache (which uses memcached to share subscription information across a distributed group of systems). The key goal here was to enable MassTransit to support a distributed request/reply architecture using publish/subscribe with correlated subscriptions to specifically route messages to their intended consumers. I plan to make heavy use of this in an upcoming project so I wanted to see it work.
In addition to all the changes, I also updated a few of the samples and made various tweaks to the infrastructure to make it cleaner. There are several more tweaks on the whiteboard that I’m hoping to investigate in the next week. Once those are done, full ActiveMQ support is up next including running the tests under Mono on OSX.
So a lot of changes since the 0.1 tag was put down a couple of weeks ago. I expect there will be some continued testing and tweaking this week as Dru seeks to understand all the changes that were made. While I’ve been doing this stuff, Dru has gotten a kick ass start on a new dashboard to monitor an application built using MassTransit. The goal there is to provide a single pane of glass view into the health of a system, including subscriptions, endpoint throughput, message counts, etc. We’ve got some cool ideas how to make the information available and hope to make this alone one of the cool features to help support distributed messaging applications.
If you haven’t checked out MassTransit, you can get the latest source from the GoogleCode repository. There is a message board for questions, or feel free to contact Dru or myself with any questions.