EF Core中发布领域事件的合适时机
领域事件大部分发生在领域模型的业务逻辑方法上或者领域服务上,我们可以在一个领域事件发生的时候立即调用IMediator的Publish方法来发布领域事件。我们一般在聚合根的实体类对象的ChangeName、构造方法等方法中发布领域事件,因为无论是应用服务还是领域服务,最终都要调用聚合根中的方法来操作聚合,我们这样做可以确保领域事件不会被漏掉。但是在实体类的业务方法中立即进行领域时间的发布可能会有以下问题。
可能存在重复发送领域事件的情况。比如,在“修改用户信息”这个应用服务操作中我们分别调用实体类的ChangeName、ChangeAge、ChangeEmail方法修改用户的姓名、年龄和邮箱。因此每个Change???方法中都会发布“实体类被修改”的领域事件,所以领域事件的处理者就会被多次调用,这是没有必要的,其实只要发布一次“实体类被修改”的领域事件即可。
领域事件发布太早。为了确保新增加的实体类能够发布“新增实体类”的领域事件,我们需要在实体类的构造方法中发布领域事件,但是很有可能因为数据验证没有通过等原因,我们最终没有把这个新增的实体类保存到数据库中,这样在构造方法中过早地发布领域事件就可能导致“误报”的问题。
参考eShopOnContainers项目中的做法,把领域事件的发布延迟到上下文保存修改时。也就是实体类中只注册要发布的领域事件,然后再上下文的SaveChanges方法被调用时,我们再发布领域事件。领域事件是由聚合根进行管理的,因此我们定义了供聚合根进行事件注册的接口IDomainEvents。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| {
public interface IDomainEvents { IEnumerable<INotification> GetDomainEvents(); void AddDomainEvent(INotification eventItem); void AddDomainEventIfAbsent(INotification eventItem); void ClearDomainEvents(); } }
|
为了简化实体类的代码编写,我们编写实现了IDomainEvents接口的抽象实体类BaseEntity。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| { public abstract class BaseEntity : IDomainEvents { private List<INotification> DomainEvents = new(); public void AddDomainEvent(INotification eventItem) { DomainEvents.Add(eventItem); }
public void AddDomainEventIfAbsent(INotification eventItem) { if(!DomainEvents.Contains(eventItem)) { DomainEvents.Add(eventItem); } }
public void ClearDomainEvents() { DomainEvents.Clear(); }
public IEnumerable<INotification> GetDomainEvents() { return DomainEvents; } } }
|
我们需要在上下文中保存数据的时候发布注册的领域事件。在DDD中,每个聚合都对应一个上下文,因此项目中的上下文类非常多。为了简化上下文码的编写,我们编写BaseContext类,将在SaveChanges中发布领域事件的代码分装到这个类中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
public class BaseDbContext:DbContext { private IMediator mediator; public BaseDbContext(DbContextOptions options,IMediator mediator) : base(options) { this.mediator = mediator; } public override int SaveChanges(bool acceptAllChangesOnSuccess) { throw new NotImplementedException("Dot't call SaveCheanges"); } public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess,CancellationToken cancellationToken = default) { var domainEntities = this.ChangeTracker.Entries<IDomainEvents>() .Where(x => x.Entity.GetDomainEvents().Any()); var domainEvents = domainEntities.SelectMany(x => x.Entity.GetDomainEvents()).ToList(); domainEntities.ToList().ForEach(entity=>entity.Entity.ClearDomainEvents()); foreach(var domainEvent in domainEvents) { await mediator.Publish(domainEvent); } return await base.SaveChangesAsync(acceptAllChangesOnSuccess,cancellationToken); } }
|
因为我们需要发布注册的领域事件,所以我们通过构造方法注入IMediator服务;我们重写父类的SaveChangesAsync方法,在调用父类的SaveChangesAsync方法保存修改之前,我们把所有实体类中注册的领域事件发布出去;第15行代码中获得ChangeTracker是上下文中用来对实体类的变化进行追踪的对象,Entries<IDomainEvents>获得的是所有实现了IDomainEvents接口的追踪实体类;我们在项目中强制要求不能使用同步的SaveChanges方法,因此第10行代码中对SaveChanges的调用抛出异常。
至此,我们完成了EFCore中简化领域事件发布的几个接口和抽象类的开发。接下来,我们编写用来测试的实体类和上下文,首先我们编写代表用户的实体类User
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
public class User:BaseEntity { public Guid Id { get; init; } public string UserName { get; init; } public string Email { get; private set; } public string? Nickname { get; private set; } public int? Age { get; private set; } public bool IsDeleted { get; private set; } public User() { } public User(string userName,string email) { this.Id = Guid.NewGuid(); this.UserName = userName; this.Email = email; this.IsDeleted = false; AddDomainEvent(new UserAddedEvent(this)); } public void ChangNickName(string? value) { this.Nickname = value; AddDomainEventIfAbsent(new UserUpdatedEvent(Id)); } public void ChangeAge(int value) { this.Age=value; AddDomainEventIfAbsent(new UserUpdatedEvent(Id)); } public void SoftDelete() { this.IsDeleted=true; AddDomainEvent(new UserSoftDeletedEvent(Id)); } }
|
我们在第16行代码的有参构造方法中,发布了UserAddedEvent领域事件,这样当我们创建新的实体类并且保存修改的时候,这个领域事件就会被发布。但是如果EF Core从数据库中加载已有数据的时候,也执行第16行代码的有参构造方法,就会导致在加载数据的时候也发步UserAddedEvent领域事件,这就发生逻辑错误了,因此我们在第9行代码中提供一个无参构造方法供EF Core从数据库中加载数据是使用。
因为我们可能连续调用ChangeNickName、ChangeAge等方法,所有我们在第21、26行代码中通过AddDomainEventlfAbsent注册领域事件,从而避免消息的重复发布。
1 2 3 4 5 6
| { public record UserAddedEvent(User Item):INotification; public record UserUpdatedEvent(Guid Id):INotification; public record UserSoftDeletedEvent(Guid Id):INotification;
}
|
接下来,我们编写事件处理类来对这些领域事件进行处理。首先我们编写响应UserAddedEvent领域事件,然后向用户发送注册邮件的NewUserSendEmailHandler类。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public class NewUserSendEmailHandler : INotificationHandler<UserAddedEvent> { private readonly ILogger<NewUserSendEmailHandler> _logger;
public NewUserSendEmailHandler(ILogger<NewUserSendEmailHandler> logger) { _logger = logger; }
public Task Handle(UserAddedEvent notification, CancellationToken cancellationToken) { var user = notification.Item; _logger.LogInformation($"向{user.Email}发送欢迎邮件"); return Task.CompletedTask; } }
|
使用日志输出代替真正的日志发送。在实际项目中,由于邮件发送比较耗时,建议把邮件发送的代码放到后台线程执行,而不是异步等待邮件发送的结果。
下面我们在实现一个“当用户的个人信息被修改后,发邮件通知用户的事件处理者”的功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public class ModifyUserLogHandler : INotificationHandler<UserUpdatedEvent> { private readonly UserDbContext _dbContext; private readonly ILogger<ModifyUserLogHandler> _logger;
public ModifyUserLogHandler(BaseDbContext dbContext, ILogger<ModifyUserLogHandler> logger) { _dbContext = dbContext; _logger = logger; }
public async Task Handle(UserUpdatedEvent notification, CancellationToken cancellationToken) { var user=await _dbContext.Users.FindAsync(notification.Id); _logger.LogInformation($"通知用户{user.Email}的信息被修改"); } }
|
因为UserUpdatedEvent中只包含被修改用户的标识符,所以我们在第12行代码中通过FindAsync获取被修改用户的详细信息。因为FindAsync会首先从上下文的缓存中获取对象,而修改操作之前被修改的对象已经存在与缓存中了,所以用FindAsync不仅能够获取还没有提交到数据库的对象,而且由于FindAsync操作不会再到数据库中查询,因此程序的性能更高。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| { [Route("api/[controller]/[action]")] [ApiController] public class UsersController : ControllerBase { private UserDbContext context;
public UsersController(UserDbContext context) { this.context = context; }
[HttpPost] public async Task<IActionResult> Add(AddUserRequest req) { var user = new User(req.UserName, req.Email); context.Users.Add(user); await context.SaveChangesAsync(); return Ok(); }
[HttpPut] [Route("{id}")] public async Task<IActionResult> Update(Guid id,UpdateUserRequest req) { User? user = context.Users.Find(id); if (user==null) { return NotFound($"id={id}的User不存在"); } user.ChangeAge(req.Age); user.ChangeEmail(req.Email); user.ChangeNickName(req.NickName); await context.SaveChangesAsync(); return Ok(); }
[HttpDelete] [Route("id")] public async Task<IActionResult> Delete(Guid id) { User? user = context.Users.Find(id); if (user == null) { return NotFound($"id={id}的User不存在"); } user.SoftDelete(); await context.SaveChangesAsync(); return Ok(); } } }
|
RabbitMQ的基本使用
和领域事件不同,集成事件用于在微服务间进行事件的传递,因此这是服务器间的通信,所以必须借助于第三方服务器作为事件总线。我们一般使用消息中间件来作为事件总线。明前有Redis、RabbitMQ、Kafka、ActiveMQ等。
我们先了解RabbitMQ中的几个基本概念。
信道(channel):信道是消息的生产者、消息者和服务器之间进行通信的虚拟连接。
为什么叫“虚拟连接”呢?因此TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP链接的基础上构建了虚拟通信。我们尽量重复使用TCP连接,而通信是可以用完就关闭的。
队列(queue):队列是用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取消息。
交换机(exchange):交换机用于把消息路由到一个或者多个队列中。
Hello World
现在让我们生成两个项目,一个用于发布者,一个用于使用者

发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| using RabbitMQ.Client; using System.Net.Quic; using System.Text;
var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; factory.UserName = "root"; factory.Password = "root"; factory.VirtualHost = "vhost";
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
const string message = "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "hello", body: body); Console.WriteLine($" [x] Sent {message}");
Console.WriteLine(" Press [enter] to exit");
Console.ReadLine();
|
接收
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Runtime.CompilerServices; using System.Text;
var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; factory.UserName = "root"; factory.Password = "root"; factory.VirtualHost = "vhost";
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);
Console.WriteLine("[*] Waiting for messages");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [x] Received {message}"); return Task.CompletedTask; };
await channel.BasicConsumeAsync("hello", autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
|
工作队列

工作队列(又名:任务队列)背后的主要思想是避免 立即执行资源密集型任务,并且必须等待 它完成。相反,我们将任务安排在稍后完成。我们将任务封装为消息并将其发送到队列。正在运行的 worker 进程 将弹出任务并最终执行 工作。当您运行许多 worker 时,任务将在它们之间共享。
创建两个项目
在之前发送项目添加GetMessage方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64
| using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text;
var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; factory.UserName = "root"; factory.Password = "root"; factory.VirtualHost = "vhosst";
using var connection = await factory.CreateConnectionAsync(); using var channel = await connection.CreateChannelAsync();
await channel.QueueDeclareAsync( queue: "task_queue", durable: true, exclusive: false, autoDelete: false, arguments: null );
await channel.BasicQosAsync( prefetchSize: 0, prefetchCount: 1, global: false );
Console.WriteLine("[*] Waiting for message");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += async (model, ea) => { byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($"[x] 接收{message}");
int dots = message.Split('.').Length - 1; await Task.Delay(dots * 1000);
Console.WriteLine("[x] Done");
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false); };
await channel.BasicConsumeAsync( queue: "task_queue", autoAck: false, consumer: consumer );
Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
|
发布/订阅

在订阅模型中,多了一个Exchange角色,而且过程略有变化:
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。如何操作,取决于Exchange的类型。Exchange有常见的以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routingKey的队列
Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
临时队列
您可能还记得,我们之前使用的队列具有 具体名称(remember 和 ?)。能够命名 队列对我们来说至关重要 —— 我们需要将 worker 指向 相同的队列。在以下情况下,为队列命名非常重要 希望在生成者和使用者之间共享队列。hellotask_queue
但对于我们的 Logger 来说,情况并非如此。我们想听听所有 日志消息,而不仅仅是其中的一个子集。我们是 也只对当前流淌的消息感兴趣,而不是在旧的消息中 的。要解决这个问题,我们需要两件事。
首先,每当我们连接到 Rabbit 时,我们都需要一个新的空队列。 为此,我们可以创建一个具有随机名称的队列,或者 更好的是 - 让服务器为我们选择一个随机队列名称。
其次,一旦我们断开了消费者的连接,队列应该是 自动删除。
在 .NET 客户端中,当我们不提供任何参数时,我们会创建一个具有生成名称的非持久、独占、自动删除队列:QueueDeclareAsync()
1 2
| QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName;
|
现在我们需要 告诉 Exchange 将消息发送到我们的队列。这种关系 在 exchange 和 queue 之间称为 Binding。
1
| await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| using RabbitMQ.Client; using System.Text;
var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; factory.UserName = "root"; factory.Password = "root"; factory.VirtualHost = "vhost";
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body); Console.WriteLine($"[x] 发送{message}");
Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
static string GetMessage(string[] args) { return ((args.Length > 0) ? string.Join(" ", args) : "info:Hello World"); }
|
在建立连接后,我们声明了 交换。此步骤是必需的,因为发布到不存在的 禁止交换。
如果还没有队列绑定到 Exchange,则消息将丢失。 但这对我们来说没关系;如果还没有消费者在监听,我们可以安全地丢弃该消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text;
var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; factory.UserName = "root"; factory.Password = "root"; factory.VirtualHost = "vhost";
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName;
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
Console.WriteLine("[*] Waiting for logs.");
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) => { byte[] body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($" [X] {message}"); return Task.CompletedTask; };
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
|
路由

队列与交换机的绑定,不能是任意绑定,而是要指定一个RoutingKey
消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| using RabbitMQ.Client; using System.Text;
var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; factory.UserName = "root"; factory.Password = "root"; factory.VirtualHost = "vhost";
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: "direct_logs", type: ExchangeType.Direct);
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World";
var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: "direct_logs", routingKey: severity, body: body); Console.WriteLine($" [x] 发送'{severity}:{message}'");
Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.Text;
if (args.Length < 1) { Console.Error.WriteLine("Usage:{0} [info] [warning] [error]", Environment.GetCommandLineArgs()[0]); Console.WriteLine("Press [enter] to exit."); Console.ReadLine(); Environment.ExitCode = 1; return; }
var factory = new ConnectionFactory(); factory.HostName = "localhost"; factory.Port = 5672; factory.UserName = "root"; factory.Password = "root"; factory.VirtualHost = "vhost";
using var connection = await factory.CreateConnectionAsync(); using var channel = await connection.CreateChannelAsync();
await channel.ExchangeDeclareAsync(exchange: "direct_logs", type: ExchangeType.Direct);
var queueDeclareResult = await channel.QueueDeclareAsync(); string queueName = queueDeclareResult.QueueName;
foreach (string? severity in args) { await channel.QueueBindAsync(queue: queueName, exchange: "direct_logs", routingKey: severity); }
Console.WriteLine(" [*] Waitiy for messages.");
var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine($" [x] Received '{routingKey}:{message}'"); return Task.CompletedTask; };
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);
Console.WriteLine(" Press [enter] to exit."); Console.ReadLine();
|
如果您只想保存 ‘warning’ 和 ‘error’ (而不是 ‘info’) 日志 messages 添加到文件中,只需打开控制台并键入:
1
| dotnet run info warning error
|
.NET中简化DDD集成事件的框架
使用Zack.EventBus,并简化了以后迁移到其他MQ服务器的工作量。
创建两个ASP.NET Core Web API项目,它们分别是发布集成事件的项目和消费集成事件的项目,然后我们为这两个项目都安装NuGe包:Zack.EventBus.
我们在配置系统下创建一个名字为EventBus的节点,节点下包含HostName、ExchangName两个属性,它们分别代表RabbitMQ服务器的地址和交换机的名字。
1 2 3 4 5 6 7 8 9 10 11 12 13
| { "Logging": { "LogLevel": { "Default": "Information", "Microsoft.AspNetCore": "Warning" } }, "AllowedHosts": "*", "EventBus": { "HostName": "localhost", "ExchangName": "EventBusDemo1" } }
|
- 在两个项目中的Prgram.cs文件中的builder.Build()上面增加对IntegrationEventRabbitMQOptions进行配置的代码以及对AddEventBus的调用,然后还要在builder.Build()下面调用UseEventBus()。
发送者
1 2 3 4 5 6 7 8 9 10
| var eventBusSec = builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly());
var app = builder.Build();
app.UseEventBus();
|
接收者
1 2 3 4 5
| var eventBusSec = builder.Configuration.GetSection("EventBus"); builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec); builder.Services.AddEventBus("EventBusDemo1_Q2",Assembly.GetExecutingAssembly()); var app = builder.Build(); app.UseEventBus();
|
这两个项目代码几乎一样,只有调用的AddEventBus方法的第一个参数queueName的值不一样,因为这个参数用来设定程序绑定的队列的名字,一般每个微服务项目的queueName参数值都不同,一边每个程序都能收到消息。对于同一个微服务的多个部署的集群实例,我们一般设置它们的queueName参数值相同,这样对于同一个消息,同一个微服务项目只有一个集群实例收到该消息,这符合大部分项目的需求。如果需要同一个微服务项目的每个集群实例都收到消息,则需要把每个集群实例的queueName参数值设置为不同的值,比如在原有的queueName后面再附加机器名、IP地址等集群内的唯一标识信息。
AddEventBus方法的第二个参数为含有监听集成事件的处理者代码的程序集。
- 我们在需要发布领域事件的类中注入IEventBus服务,然后调用IEventBus的Publish方法发布消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public class DemoController : ControllerBase { private IEventBus eventBus;
public DemoController(IEventBus eventBus) { this.eventBus = eventBus; }
[HttpPost] public string Publish() { eventBus.Publish("UserAdded", new { UserName = "YOUXIANYU", Age = 19 }); return "ok"; } }
|
- 我们创造一个实现了IIntegrationEventHandler接口的类,这个类用来处理收到的事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| [EventName("UserAdded")] public class UserAddesEventHandler : IIntegrationEventHandler { private readonly ILogger<UserAddesEventHandler> logger;
public UserAddesEventHandler(ILogger<UserAddesEventHandler> logger) { this.logger = logger; }
public Task Handle(string eventName, string eventData) { logger.LogInformation("新建了用户:" + eventData); return Task.CompletedTask; } }
|