Nov 15

Last week, I had the pleasure of attending Øredev in Malmö, Sweden. While at the conference, I presented two sessions — including a new talk on Actor Model Programming in C#. This was the first official presentation I’ve given on the subject, having done an ad-hoc version of the session at Pablo’s Fiesta this year (which went fairly well, likely due to the awesome Chicken and Waffles at 24 Diner the night before). Early feedback from the Øredev session was positive, which is encouraging since I will be giving an updated version of the talk at CodeMash 2.0.1.2 in January.

First, I wanted to share a few links to the content discussed in the session, including the GitHub Project, the NuGet package, and the TeamCity build. I will update the post with the video link once the presentation video is available, along with the slide deck.

Second, I plan to post a series of blog posts explaining how actor model programming is a great model for building concurrent applications, despite the difficulties that the actor model has had in becoming more mainstream (some of those difficulties are explaining in this article by Paul Mackay).

In the meantime, I’m going to take a hard look at how different languages have implemented the actor model (many of which have influenced the current syntax used in Stact). I’m also taking a step back and identifying other ways the model can be implemented the minimize many of the difficulties and bring some modern programming style to the model. Concurrency is certainly difficult, but I’m convinced that many aspects can be made more approachable by applying some existing idioms to the problem.

If you do take a look at Stact, please offer any feedback you have via Twitter (I’m @PhatBoyG) or GitHub (using issues, whatever). If the traffic grows, we’ll setup a Google group to keep things manageable.

Until next time…

 

May 03

After what seems like a long slumber, along with work being done on other projects such as Topshelf and Stact, it is our great pleasure to announce the first beta release of MassTransit v2.0. What originally started out as a minor “1.3” update has turned into a full-out cleanup of the codebase, including a refinement of the configuration API. Since there were some breaking changes to the configuration, we felt a 2.0 moniker was better to ensure users of the framework understood the depth of the changes made.

And what a list of changes it is (TL;DR = We filled it with awesomeness):

  1. Configuration
    MassTransit v2.0 now includes a streamlined configuration model built around an extensible fluent interface (inspired by Stact and Topshelf and sharing a common, consistent design). As a result, getting started with MassTransit is now easier than ever. In version 2.0, all configuration starts with the ServiceBusFactory and Intellisense guides you from there forward. The result is a clean, understandable API and a quicker out-of-the-box experience.

  2. Container-Free Support
    With the release of MassTransit 2.0, using a dependency injection container is now optional. When we started MassTransit, we leveraged the container extensively to assemble the internal workings of the bus. As we added support for other containers, required features that were not supported by a particular container led to some creative solutions (read: hacks) that were less than optimal. By moving away from a “container-first” approach, we have increased the reliability of the software and now provide container-specific extensions to subscribe consumers from the container in one simple step. We also threw in support for Autofac!

  3. Quick-Start
    By simplifying the configuration, and dropping the need for a container, it is now fast and easy to get started using our new QuickStart:
    http://docs.masstransit-project.com/en/latest/configuration/quickstart.html

  4. #NuGet
    NuGet packages have been added for the base MassTransit project, with any external dependencies (log4net and Magnum) resolved using the proper NuGet packages. Any additional references are downstream in additional NuGet packages, such as support for persisting sagas using NHibernate (MassTransit.NHibernate), and the various dependency injection containers supported.

  5. Multiple Subscription Service Options
    In addition to the existing RuntimeServices included with MassTransit, an all-new peer-to-peer subscription service has been added. By leveraging the reliable multi-cast support in MSMQ, services can now exchange subscription information without the need for a centralized subscription service. To ensure everything is setup correctly, a VerifyMsmqConfiguration method has been added that will check the installation of MSMQ and install any missing components. This is the first iteration of multi-cast support, and we need to get some mileage on it. In the meantime, the original run-time services continue to work as expected.

  6. Documentation
    Which brings us to the next big update. DOCS! They’re not perfect, and they’re far from complete, but we have focused on the configuration story to help get you up and running. As we see a need for more documentation in a given area, we will continue to flush out the docs appropriately. The docs are located at http://docs.masstransit-project.com/ and are being hosted by the fine people at http://readthedocs.org. [Thanks Eric!]

  7. Support for .NET 4.0 and .NET 3.5
    The project files and solution have all been updated to Visual Studio 2010 SP1. By default, all projects are now built in the IDE targeting .NET 4.0. The command-line build (which has been revamped to use Rake and Albacore) builds both .NET 3.5 and .NET 4.0 assemblies, including the run-time services and System View. The NuGet packages also include the proper bindings for the target project run-time version (you must use the full .NET 4.0 profile with MassTransit, the client profile is not supported).

  8. Transport Support
    Internally, the transports and endpoints have been redesigned to improve the support for new transports like RabbitMQ (and improve our ActiveMQ support). For example, transports are now inbound, outbound, or both, allowing us to properly leverage fan-out exchanges on RabbitMQ for publishing and subscribing to messages. There is more to come in this area as we take greater advantage of these advanced transport features. If you’re a RabbitMQ or ActiveMQ user and don’t mind getting your hands dirty, now is a great time to jump in and help improve transport support.

  9. Distributor Consumer And Saga Support
    Work on the MassTransit distributor subsystem continues to be improved. Testing on a multi-master system has been completed which will allow it to serve multiple distributors to improve load balancing efficiency. Support for all sagas (previously only state machine sagas were supported) has been added as well.

  10. Swinging the Feature Axe
    Some previous troublesome and poorly supported features (Batching and Message Grouping) were removed from the 2.0 release to reduce code complexity. Also in light of the new Parallel Tasks work in the framework the Parallel namespace has been removed.

In the next few days, I’ll be posting an annotated walkthrough of the new configuration API. In the meantime, fire up Visual Studio 2010, create ConsoleApplication69, switch to the full .NET 4.0 framework, and Add a Library Package Reference to MassTransit using NuGet. Paste the code from the Quick Start into your program.cs and check it out!

Jul 01

Today I was honored for the second time with the Microsoft MVP award. It’s great to be recognized for my efforts in the .NET community over the past year. The next year is already shaping up to be another great one, with upcoming speaking engagements at Dallas TechFest, Devlink (Nashville, TN), St. Louis Day(s, plural) of .NET, and the Heartland Developers Conference in Omaha, NE.

If you are near any of these great events, I hope you are able to attend, learn a few things, and most importantly meet others that are part of the software development community. I also would encourage you to attend a few sessions outside of your regular development platform to get an idea of how other technologies solve the same problems in their own way. The cost to value of all these events is an absolute bargain, and many have early registration discounts that are only good for a limited time, so be sure to get registered to ensure the best price.

I look forward to meeting some of you over the next few months, so if you are at one of my talks or see me in the hall, be sure to introduce yourself and give a shout out.

(word)

Jan 22

Last week I presented at Codemash on ASP.NET MVC. I received a request for the demo code, which you can download here.

Codemash was an awesome event, one that hope to attend again in the future!

Oct 14

One feature that is often overlooked in software development is the output of information that can be observed by operations once the application is in production. Fortunately, many open source projects are leveraging log4net to provide a configurable level of runtime information that can be useful in figuring out why a system is behaving a certain way (and face, if you’re looking, it’s more than likely behaving badly). Logging, however, is only one view into an application — one that might not deliver the appropriate information in a useful way.

Anyone who has used a computer with any interest is familiar with system monitoring tools. Task Manager (or if you’re really cool Process Explorer) is the first place Windows users look when their system starts to crawl, Mac users turn to Activity Monitor, and I’m sure Linux users have some really obscure command-line tool as well. These coarse grained tools are usually enough for users, however, an operations team needs a higher degree of visibility into application — particularly if they are expected to determine how to tune the application for better performance.

For operations on Windows, Performance Monitor provides detailed information for running applications in real-time. On a web server, it is easy to find out how many threads your ASP.NET application is using, as well as how many requests are queued. That information can be correlated with processor utilization to help determine if the bottleneck is the CPU, the network, or possibly even the database server. When it comes to troubleshooting issues on a live system, more information is always helpful to determine the source of the problem.

To support this level of visibility in MassTransit, performance counter support has been added. Performance counters in .NET are part of the System.Diagnostics namespace. There are various counter types that can be defined, including counts, rates, and averages. When an application wants to output performance counters, it has to create a category and specify the counters that are included in the category. For instance:

ConsumerThreadCount = new RuntimePerformanceCounter("Consumer Threads",
	"The current number of threads processing messages.",
	PerformanceCounterType.NumberOfItems32);

ReceiveRate = new RuntimePerformanceCounter("Received/s",
	"The number of messages received per second",
	PerformanceCounterType.RateOfCountsPerSecond32);

These are two of the counters defined by the MassTransit category. The first is a count that is updated when the number of threads in use changes. The second is a rate which gets incremented once for every message received. The actual calculation and display of the rate is handled by the performance monitoring tools – the application does not need to calculate it.

ConsumerDuration = new RuntimePerformanceCounter("Average Consumer Duration",
	"The average time a consumer spends processing a message.",
	PerformanceCounterType.AverageCount64);

ConsumerDurationBase = new RuntimePerformanceCounter("Average Consumer Duration Base",
	"The average time a consumer spends processing a message.",
	PerformanceCounterType.AverageBase);

This counter is used to report the average consumer duration of a message. For an average, two counters are used. One is the base which is incremented for each occurrence and the counter is the actual count that is added. So for each message, the base is incremented once and the duration is incremented by the amount of time spent executing the consumer.

In adding performance counter support, I wanted to do it in a way that didn’t leak the details of updating performance information throughout the framework. It was at this point that I turned to the Magnum Pipeline. Using the pipeline to publish the metrics allowed me to isolate the actual performance counter interface to a single method in a single class for the service bus. So instead of passing interfaces around all the components that make up the bus, a single event aggregator is passed instead. When you start up the bus, the performance counter code subscribes to the events as shown:

_eventAggregatorScope.Subscribe<MessageReceived>(message =>
	{
		_counters.ReceiveCount.Increment();
		_counters.ReceiveRate.Increment();
		_counters.ReceiveDuration.IncrementBy((long) message.ReceiveDuration.TotalMilliseconds);
		_counters.ReceiveDurationBase.Increment();
		_counters.ConsumerDuration.IncrementBy((long) message.ConsumeDuration.TotalMilliseconds);
		_counters.ConsumerDurationBase.Increment();
	});

Now, when the bus receives a message, it sends the event to the event aggregator (an instance of the Magnum Pipeline).

var message = new MessageReceived
	{
		MessageType = messageType,
		ReceiveDuration = receiveTime,
		ConsumeDuration = consumeTime,
	};

	_eventAggregator.Send(message);

Since the Magnum Pipeline is publish/subscription, additional consumers could also opt-in to the MessageReceived event and perform other actions as well. I also plan to add counters per message type, allowing a finer grained view at message counts and consumer durations.

While the main story behind this post is the new counters available in MassTransit, my hope is that this brief introduction to performance counters was useful as well. You can learn more about performance counters from various articles that have been posted (such as a good one on CodeProject). You can check out the Magnum Pipeline in the Magnum project which is hosted at GoogleCode.

Sep 16

Last year when we were reviewing the backlog of items that we wanted to build for MassTransit, one item that kept rising to the top of the list is a solid story for evolving message producers over the lifecycle of an enterprise system. Being able to publish events that current and down-level subscribers could consume was a key goal to avoid having to upgrade systems all at once when a publisher is updated. Fortunately, it hasn’t been a real concern in our application since we deploy the entire system as a whole with each delivery.

Nonetheless, a way to update a service that publishes messages without requiring every subscribing service to be updated at the same time was need.

Eliminating Impediments

Before we could implement interface subscriptions, there were a few things in the way that needed to be addressed, things that were not easy to implement.

First, we were still doing binary message serialization. While we had the ability to use the .NET XML Serializer, it tends to be slow and difficult to fit into the model we had built with MT. Back in May, XML became the default serialization format using an entirely new serializer built from scratch.

Second, we wanted to ensure that a publisher could publish a single message and have it delivered to all of the interested subscribers regardless of whether they had subscribed to the message class or one of the interfaces implemented by the class. In MassTransit, subscriptions are added by type a defined using a plain old CLR object (POCO). In the 0.6 release, we replaced the message dispatcher with a new type-based pipeline for both inbound and outbound messages. Starting with an object and working down the type structure of the message, messages are pushed through the pipeline to interested message sinks. In the case of the outbound pipeline, it makes it easy to push a class through that has interfaces, since the interfaces can be assigned from the message object. Another hurdle eliminated.

Implementing Interfaces

Once the hurdles were eliminated, it was actually very easy to add interface subscriptions. Since most of the internal bits had been reworked leveraging the power of expressions and generics, it was simply a matter of tweaking a few parts of the serializer and we were ready to rock and roll. Ensuring that message objects retain their type through the various pathways inside the system was also important, and resulted in fixing a couple of low hanging bugs related to message retry and fault publishing.

The one bit of code that needed to be built was a way to provide a backing class for an interface to store the property values. At first, I looked at using something like LinFu or DynamicProxy2 to create a proxy for the interface and intercept the property accessors, but this had a problem. I did not want property setters on the interface. At that point, I started looking at using the Emit classes, AutoMapper, the FastProperty expression-based accessors, and how Udi had dealt with it inside NServiceBus. What I ended up with was a very fast, cached object builder implementation that is integrated within the message deserializer. In the words of Cartman, “It’s pretty cool.”

There isn’t really a difference in the code between using classes and interfaces from either the producing or consuming end. While a producer will likely continue to publish a class, it just has to implement the message interface on that class, allowing the consumer to subscribe to the interface, breaking the dependency on the actual class published by the producer. The pipeline will then properly serialize out a message for that interface and send it directly to the consumer.

I’m pretty excited about this, and hope to update some of the pre-built services to use interfaces instead of classes in the near future. In the meantime, pull down the latest trunk and check it out.

Sep 12

This post details some of the internal changes to how MassTransit, an open-source lightweight service bus, communicates with transports such as MSMQ, ActiveMQ, and TIBCO. These changes are not likely to impact anyone using MassTransit, they are all well below the abstraction layer provided by the bus. At the same time, I felt it was important to share the change, along with the reasons it was made, with those that are using MassTransit.

When MassTransit was first started, MSMQ was the only transport we intended to support. In due time, however, it was determined that support for transports such as ActiveMQ and TIBCO was important. The ability to run on Linux and OS X under Mono (which does not support the System.Messaging namespace) as well as interoperability with Java systems using JMS (a specification for messaging, implemented by messaging systems like ActiveMQ and TIBCO) were the primary drivers of this decision. At the same time, insulating developers from the particulars of each transport was equally important.

To communicate with an endpoint, MassTransit uses the IEndpoint interface. The service bus would receive messages from an endpoint using this method:

IEnumerable< IMessageSelector > SelectiveReceive(TimeSpan timeout);

This involved making a call that returned an enumeration of message selectors, allowing the caller to step through the messages until an interesting message (in the case of the bus, a message with a subscribed consumer). The concerns of receiving a message were seemingly spread at random across three or four different classes (and yes, I wrote this crap). The reason for the complexity was solid though – I need the ability to selectively receive a message from a queue and skip over ones in which I have no interest.

The complexity of dealing with the yield return/break syntax of enumerators and managing scope is difficult. The programming semantics behind it are difficult to understand. I wanted something better. With all the time I’ve been spending since this was written dealing with nested closures, lambda functions, and continuations I realized there was a better way to reduce the complexity while at the same time improving extensibility.

The new signature for the receive method on an endpoint looks like this:

void Receive(Func< object, Action< object > > receiver, TimeSpan timeout);

With this new interface, the caller need only pass a method that accepts an object and returns a method that also accepts an object. The first method provides the caller an opportunity to inspect the message object to determine if the message will be consumed by the bus. If the bus is not interested, it can simply return null. If it is interested, it returns a method (either anonymous or a regular class method) that will consume the message. The endpoint will then call the returned method with the message once it has been received successfully. If the endpoint determines that the message is no longer available (if it were picked up by another process reading from the same queue for example), the returned method is not called.

The calling method looks something like this:

_endpoint.Receive(m => message => { doSomethingWith(message); });

This interface is far less complex to implement, and also made it easy to make a clean separation of what is an endpoint and what is a transport. Which leads me to…

Endpoint and Transport Split?

Sadly that reads like a Hollywood headline, but it is true. Endpoints now deal only with address resolution of sending and receiving messages and translating between the transport format and a message object (including de/serialization). New transport classes are now responsible for the actual communication with the various queue implementations supported by MassTransit.

For example, previously there was one class, MsmqEndpoint, that contained all the aspects of talking to MSMQ regardless of the type of queue (local non-transactional, local transactional, remote). Now beneath the endpoint itself, there are three MSMQ transports, one for each of these scenarios. Each of these transports cleanly deals with the particulars only, for example, the non-transactional transport has no transactional concerns in it at all.

Introducing ITransport

The new ITransport interface is narrow, dealing only with the simplest form of communication — streams. The send and receive methods from the endpoint are matched, but instead of dealing with objects, streams are used. Every transport should provide stream support at a minimum. The receive method of the transport looks like:

void Receive(Func< Stream, Action< Stream > > receiver, TimeSpan timeout);

While all transports implement streams, there is a benefit to communicating at a level above streams for certain types of endpoints. For example, when using MSMQ there are advantages to communicating directly with the Message object such as having access to the transport level message ID, the message label, and other interesting properties. To support this, the MsmqEndpoint only accepts an IMsmqTransport interface, which inherits from ITransport and adds:

void Receive(Func< Message, Action< Message > > receiver, TimeSpan timeout);

Other transports may benefit from a custom interface as well, but it is only implemented for MSMQ at this point. ActiveMQ, Loopback, and Multicast UDP all use the base stream interface.

Looking Forward

This rewrite was not purely for entertainment value (well, it was fun). Latency when sending a message from a machine to a remote queue is orders of magnitude slower than writing to a local queue. And in addition, local queues have the advantage of being local — which is important considering the first fallacy of distributed computing — the network is reliable (NOT!). To compensate for this, a more reliable method of sending messages to a remote queue is needed. By ensuring that messages sent/published by an application are durable regardless of network failure, developers can use this fire-and-forget approach to messaging that is key to building event driven applications.

To handle this, MassTransit now uses a store and forward transport for remote MSMQ queues. The store and forward transport will automatically create a local queue to cache the outbound messages destined for the remote queue. When a message is sent to the remote queue, the transport writes it to the local queue and returns to the caller. An asynchronous method then delivers the message in the background. The same transports that are used by the endpoint are reused by the store and forward transport, maintaining that high level of code reuse.

Note that on Windows Server 2003, I have observed that MSMQ will accept messages destined for a unreachable remote queue and attempt redelivery itself, but only for transactional queues (at least, that is what I have seen).

Wrapping Up

While it is always hopeful that changes like this will go by unnoticed, there is always the chance that there are some unintended consequences (read: bugs). Hopefully any of these will be weeded out quickly. In the meantime, I hope to start work on some availability features to support load balancing of command services.

Jul 27

In the past few weeks, both Udi Dahan and Jeremy D. Miller have posted on events. Udi posted about domain events, while Jeremy posted about his use of the event aggregator pattern in StoryTeller. In each case, events are represented as messages and each message is a class in C#. And in each post, a small publish/subscribe system is described that allowed objects (be it a domain object, domain service, or a controller) to subscribe to messages. Other objects could then use that same system to publish events to the subscribed objects.

Now while you could use MassTransit out of the box to handle this type of event aggregation, it is a bit heavy. The in-memory message transport serializes the message, which makes it impossible to pass a continuation or an object reference as part of an event. There is also a very service-oriented thread model where each consumer runs on a different thread making synchronization an important concern for unit testing. While it would work, it is not always the shiniest hammer in the toolbox for UI-based application.

To address this, one of the things I’ve been adding to Magnum over the past few weeks is a new version of the pipeline that handles message distribution in MassTransit. In this implementation, I wanted a way to implement the event aggregator pattern with the same flexibility that I get with MassTransit but designed for an in-process mode of execution. At the same time, I wanted to make sure that I could scale this solution via adapters to extend events to MassTransit for publishing out-of-process.

Note, I use the word event and message interchangeably in this post.

First, I wanted it to be able to handle any object without any constraints on the type. To this end, I came up with a very narrow API that only deals with the publishing of a message.

public interface Pipe
{
	void Send< T >(T message) where T : class;
}

The Send method is fairly obvious, it is used to send a message to any consumers that are subscribed to the message. With this implementation, consumers that are subscribed to any type to which the message can be assigned will also get the message. Consider the following class structure:

public class CustomerChanged
{
	public Customer Customer { get; set; }
}

public class CustomerRatingDowngraded :
	CustomerChanged
{
}

A consumer that subscribed to the CustomerChanged type would receive the message if a CustomerRatingDowngraded message was published. It also works for interfaces, as long as the message object being published supports the interface.

An obvious omission from this API is any method of subscribing consumers to the pipeline. To subscribe to the pipeline, an extension method on the Pipe interface creates a new subscription scope. A subscription scope, represented by the ISubscriptionScope interface, makes changes to the pipeline resulting in the creation of a new pipeline. A series of visitors are used to create a new version of the pipeline with the consumers added, along with another visitor to remove the consumers when they unsubscribe. ISubscriptionScope implements IDisposable so to unsubscribe your application can just dispose of the object.

It is interesting to note that much like the Expression class in .NET, pipelines are immutable. Since pipelines cannot be changed, the need to lock parts of the pipeline during message distribution is removed. By removing the need for locking to ensure safe operation in a concurrent environment, performance improves and blocking is eliminated. At the same time, consumers can subscribe and unsubscribe from the pipeline as needed without disrupting the system.

public void Start()
{
	// this creates an empty pipeline that accepts any object
	_eventAggregator=PipeSegment.Input(PipeSegment.End());

	_scope=_eventAggregator.NewSubscriptionScope();
	_scope.Subscribe< CustomerChanged >(message=>Trace.WriteLine("Customer changed: "+message.CustomerName));
}

In this example, pipe and scope would likely be member variables that would be released when the containing object is stopped or disposed. Multiple subscriptions can be added to a single scope, each one modifying the pipeline as it is added.

When I discuss event-based programming, I often mention the need for visualization tools in order to ensure the system is performing as expected. In the example above, I could use the TracePipeVisitor to verify that the consumer was indeed subscribed to the pipeline (by calling new TracePipeVisitor() .Trace(_eventAggregator) and viewing the results in the output window).

Input< Object >:
RecipientList< Object >:
     Filter< CustomerChanged >: Allow Magnum.Specs.Pipeline.Messages.CustomerChanged
     RecipientList< CustomerChanged >:
          MessageConsumer< CustomerChanged >:

As consumers are added, the pipeline is built up using a series of PipeSegment classes. The Input segment is the initial entry point to the pipeline and by having the responsibility is the only segment that actually changes in the pipeline. The RecipientList is a one-to-many switch that delivers incoming messages to each consumer. The Filter segment only passes a specific type through the filter, preventing unwanted messages from receiving the consumer. The MessageConsumer actually invokes the method that was subscribed to the message.

In the above example, the message consumer was accepted using the MessageConsumer delegate type, which is analogous to Action with T being the message type. Another way to subscribe is to implement the IConsume method as shown below.

public class ListViewController :
	IConsume< CustomerChanged >
{
	public ListViewController(ListView customerListView)
	{
		_customerListView = customerListView;
	}
	public void Consume(CustomerChanged message)
	{
		_customerListView.DoSomeUpdate(message.Customer);
	}
}

A class can implement the IConsume method to indicate that it is interested in messages of type T. In this case, the CustomerChanged message is of particular interest as it is used to update the user interface in response to a customer change event. The instance of the controller can be subscribed to the pipeline by calling the Subscribe method passing the object reference itself.

public void BootstrapUserInterfaceControllers()
{
	_customerListViewController = new ListViewController(customerListView);

	_scope=_eventAggregator.NewSubscriptionScope();
	_scope.Subscribe(_customerListViewController);
}

This is the first in a series of posts about the pipeline in Magnum. As I add the remaining functionality, including asynchronous message consumers, aggregate consumers, and automatic binding to the Magnum StateMachine (similar to how sagas are done using MassTransit), I’ll post about how they are used. I encourage you to take a look at the code and particularly the unit tests to see the different ways the pipeline can be used.