When MassTransit 3 was announced, it was also stated that many of the APIs have changed. This post covers some of the most data types, and describes how they are used as well as how they compare to previous version. In addition, the conceptual changes of MassTransit 3 will also be shared.
The Bus and Receive Endpoints
In MassTransit 3, a bus is a logical construct which connects to one or more hosts via a single message transport, either RabbitMQ or Azure Service Bus. A bus may include multiple receive endpoints in which each receiving endpoint is connected to a separate queue.
Previous versions of MassTransit only supported a single queue per service bus (which was configured using the ReceiveFrom method).
To support the configuration of receive endpoints, as well as advanced configuration specific to each transport, bus creation has changed. The shortest code to create a bus is shown below.
Previous versions of MassTransit created a service bus using
The bus created above has no receive endpoints. However, if a request is sent to an endpoint using the bus’s address as the return address, a temporary queue is created to receive the response. The bus’s queue name is assigned by the broker (in the case of RabbitMQ, for Azure the name can be specified or dynamically generated).
HINT: Don’t write a log message to display the bus’s address! Merely accessing the address property results in the creation of the temporary queue! How’s that for an awesome side effect! This should probably change to
Task<Uri> GetBusAddress()to signal the impact of using it!
IBus (so long IServiceBus)
In MassTransit 3,
IBus refers to a bus instance. A bus includes methods to obtain send endpoints, as well as publish messages.
IServiceBuswas used, but there were significant changes to the method signatures with dangerous implications from improper use (many methods now return a
Taskand execute asynchronously). Therefore, changing the interface to
IBuswas decided to be the best approach to avoid confusing bugs when upgrading existing services.
Starting the Bus
Once a bus is created, an
IBusControl interface is returned. The
IBusControl interface includes
IBus and adds the
When the application is ready to start the bus, the
Start method should be called. This method returns a
BusHandle which should be retained until the bus needs to be stopped.
A consumer is a class that handles (or in this case, consumes) one or more message types. When a message is read from a queue, an instance of the consumer is created using the configured consumer factory.
The lifecycle of the consumer is managed entirely by the consumer factory. This gives control over the construction and disposal of the consumer to the consumer factory. There are integrations for most of the dependency injection containers included with MassTransit (StructureMap, Autofac, Unity, Ninject, Castle Windsor).
Consumer Message Handlers
The consumer declares the handled message types via the
IConsumer interface. An example consumer of two message types,
B, is shown below.
Consumes<T>.Contextinterfaces were used to specify consumers. This was changed to simplify the consumer class definition – the original syntax was clever but not very discoverable.
Consume method is asynchronous, and returns a
Task that is awaited before acknowledging the message. If the consumer runs to the completion (task status of RanToCompletion), the message is acknowledged and removed from the queue. If the consumer faults (such as throwing an exception, resulting in a task status of Faulted or Canceled), the message is nack’d and remains on the queue.
Within a bus, zero or more receiving endpoints can be declared. Each receiving endpoint should have a different queue name, and can also specify the host on which the receive endpoint is to be connected. To configure a receive endpoint for the consumer above, see the example below.
Sending a message to the endpoint
To send a message to the receiving endpoint, as a quick example, the bus would be used to get the send endpoint, and then a message would be sent.
The message type,
A, is a simple object with properties:
Consuming the Message
Inside the consumer, the
Consume method handles the message. All messages are delivered within a
ConsumeContext<T> that is specific to the message and consumer. For example, if the consumer handled the message and then published an event it would use the context to publish the event.
In the consumer above, the value of the message is written (using the async IO methods) to a file (in this case, just the console – I’m not actually sure if the console is open for async i/o, but roll with it). Then, an event is published using the context. The published event is also just a class.
Now, the publish could have been awaited – which completes once the broker acknowledges that the publish has been written to the queue. However, the context is delegating the
Publish call, and therefore is able to capture the
Task returned and keep track of it inside the consumer context. The message will not be acknowledged until all pending tasks have completed. So the consumer could publish a dozen events without awaiting each one (which would be silly, honestly - since they’re all async) and the framework will handle awaiting until all of the messages have been published and then acknowledge the message being consumed.
Sending endpoints work the same way.
In the above example, the
Task returned from
sendEndpoint.Send is captured by the consume context (by inserting an intercepter in front of the
ISendEndpoint interface) and awaited by the consumer before acknowledging the message.
More to Come
This is just a short introduction to some of the API changes, to make it easy to migrate applications to the new version.
It’s important to remember that if JSON or XML serialization is being used, there is complete interoperability between services using MassTransit 2.x and MassTransit 3. So there is no need to update every service simultaneously – services can be updated as needed.
There will be more content, as well as updated documentation, as MassTransit becomes ready for general use. Until then, enjoy the alpha bits and share your feedback on the changes!