Details on how to publish and consume messages using MassTransit to communicate between bounded contexts.
Wrapt projects are broken up into distinct boundaries called bounded contexts. With this kind of separation you may need to communicate between bounded contexts for whatever reason. Generally, this would be one bounded context sending out a notification that an event occurred and one or more additional bounded contexts reacting to that notification to perform some action in response. Some examples might be:
OrderRequested
notification being sent and and getting consumed by a BillOrder
consumer as well as a ConfirmOrder
consumer. You could even take this a step further and create a saga or routing slip to orchestrate the process.Patient
record in the Ordering
bounded context and send out a PatientUpdated
notification so that the Billing
bounded context can update the associated patient information in their database as well.💡 If you're not familiar with eventing and MassTransit, I've started a blog series on the topic that I recommend you check out.
Let's look at an example like #2 above and say that I have a project built with the below domain template. Of note for this example:
Ordering
bc as well as a matching consumer in my Billing
bcAddBus
property to make sure that I am adding my event bus to each bcDomainName: VsaLab
Messages:
- Name: IPatientUpdated
Properties:
- Name: PatientId
Type: guid
BoundedContexts:
- SolutionName: Ordering
Port: 1200
DbContext:
ContextName: OrderingDbContext
DatabaseName: OrderingDbContext
Provider: SqlServer
Entities:
- Name: Patient
Properties:
- Name: ExternalId
Type: string
CanFilter: true
CanSort: true
- Name: InternalId
Type: string
CanFilter: true
CanSort: true
- Name: FirstName
Type: string
CanFilter: true
CanSort: true
- Name: LastName
Type: string
CanFilter: true
CanSort: true
- Name: Dob
Type: datetimeoffset?
CanFilter: true
CanSort: true
Environments:
- EnvironmentName: Development
BrokerSettings:
Host: localhost
VirtualHost: /
Username: guest
Password: guest
Bus:
AddBus: true
Producers:
- EndpointRegistrationMethodName: PatientUpdatedEndpoint
ExchangeName: patient-updated
MessageName: IPatientUpdated
ExchangeType: fanout
ProducerName: PatientUpdated
SwaggerConfig:
Title: MySwaggerDoc
Description: This is my swagger doc
SwaggerEndpointName: "v1"
AddSwaggerComments: true
ApiContact:
Name: Paul
Email: paul@test.com
Url: https://www.thisispaulswebsite.com
- SolutionName: Billing
Port: 1205
DbContext:
ContextName: BillingDbContext
DatabaseName: BillingDbContext
Provider: Postgres
Entities:
- Name: Patient
Properties:
- Name: ExternalId
Type: string
CanFilter: true
CanSort: true
- Name: InternalId
Type: string
CanFilter: true
CanSort: true
- Name: FirstName
Type: string
CanFilter: true
CanSort: true
- Name: LastName
Type: string
CanFilter: true
CanSort: true
- Name: Dob
Type: datetimeoffset?
CanFilter: true
CanSort: true
Environments:
- EnvironmentName: Development
BrokerSettings:
Host: localhost
VirtualHost: /
Username: guest
Password: guest
Bus:
AddBus: true
Consumers:
- EndpointRegistrationMethodName: SyncPatientEndpoint
ConsumerName: SyncPatient
ExchangeName: patient-updated
MessageName: IPatientUpdated
QueueName: sync-billing-patient
ExchangeType: fanout
UsesDb: true
SwaggerConfig:
Title: MySwaggerDoc
Description: This is my swagger doc
SwaggerEndpointName: "v1"
AddSwaggerComments: true
ApiContact:
Name: Paul
Email: paul@test.com
Url: https://www.thisispaulswebsite.com
So the idea here is that, when we update a patient in Ordering
we want to send out a notification that we did that for other bounded contexts to know about. So Craftsman scaffolded out a producer for us in
VsaLab.Ordering.src.Ordering.WebApi.Features.Producers
, but we want to emit the event when we update a patient, so let's move the event publishing to our UpdatePatient
feature. To do this, I'm going to add an IPublishEndpoint
move the publishing code to my UpdatePatient
feature, and delete the producer that we don't need in this case.
The message publishing looks like this. All I'm doing here is adding the patient id from the command to an anonymous object to use as our message and then publishing that message.
var message = new
{
request.PatientId
};
await _publishEndpoint.Publish<IPatientUpdated>(message);
When we're done, our UpdatePatient
feature will look something like this:
namespace Ordering.WebApi.Features.Patients
{
using Ordering.Core.Entities;
using Ordering.Core.Dtos.Patient;
using Ordering.Core.Exceptions;
using Ordering.Infrastructure.Contexts;
using Ordering.WebApi.Features.Patients.Validators;
using AutoMapper;
using AutoMapper.QueryableExtensions;
using MediatR;
using Microsoft.EntityFrameworkCore;
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Generic;
using MassTransit;
using Messages;
public static class UpdatePatient
{
public class UpdatePatientCommand : IRequest<bool>
{
public Guid PatientId { get; set; }
public PatientForUpdateDto PatientToUpdate { get; set; }
public UpdatePatientCommand(Guid patient, PatientForUpdateDto patientToUpdate)
{
PatientId = patient;
PatientToUpdate = patientToUpdate;
}
}
public class Handler : IRequestHandler<UpdatePatientCommand, bool>
{
private readonly IPublishEndpoint _publishEndpoint;
private readonly OrderingDbContext _db;
private readonly IMapper _mapper;
public Handler(OrderingDbContext db, IMapper mapper, IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
_mapper = mapper;
_db = db;
}
public async Task<bool> Handle(UpdatePatientCommand request, CancellationToken cancellationToken)
{
var recordToUpdate = await _db.Patients
.FirstOrDefaultAsync(p => p.PatientId == request.PatientId);
if (recordToUpdate == null)
{
// log error
throw new KeyNotFoundException();
}
_mapper.Map(request.PatientToUpdate, recordToUpdate);
var saveSuccessful = await _db.SaveChangesAsync() > 0;
if (!saveSuccessful)
{
// add log
throw new Exception("Unable to save the requested changes. Please check the logs for more information.");
}
var message = new
{
request.PatientId
};
await _publishEndpoint.Publish<IPatientUpdated>(message);
return true;
}
}
}
}
Now, our consumer is already set up in our Billing
bc, so if we set both projects as startup projects and put a breakpoint in our consumer, you should see it get hit! This completes the connection to communicate between the bounded contexts.
From here, you can add whatever work needs to be done in the consumer. In this case, we are only sending the PatientId
, so you'd probably need to make a call back to Ordering
to get the entire set of patient information. Another option
could be to add all of the patient properties to the message and save yourself a call back to Ordering
for all the patient info.