EF Core中发布领域事件的合适时机

领域事件大部分发生在领域模型的业务逻辑方法上或者领域服务上,我们可以在一个领域事件发生的时候立即调用IMediator的Publish方法来发布领域事件。我们一般在聚合根的实体类对象的ChangeName、构造方法等方法中发布领域事件,因为无论是应用服务还是领域服务,最终都要调用聚合根中的方法来操作聚合,我们这样做可以确保领域事件不会被漏掉。但是在实体类的业务方法中立即进行领域时间的发布可能会有以下问题。

  1. 可能存在重复发送领域事件的情况。比如,在“修改用户信息”这个应用服务操作中我们分别调用实体类的ChangeName、ChangeAge、ChangeEmail方法修改用户的姓名、年龄和邮箱。因此每个Change???方法中都会发布“实体类被修改”的领域事件,所以领域事件的处理者就会被多次调用,这是没有必要的,其实只要发布一次“实体类被修改”的领域事件即可。

  2. 领域事件发布太早。为了确保新增加的实体类能够发布“新增实体类”的领域事件,我们需要在实体类的构造方法中发布领域事件,但是很有可能因为数据验证没有通过等原因,我们最终没有把这个新增的实体类保存到数据库中,这样在构造方法中过早地发布领域事件就可能导致“误报”的问题。

参考eShopOnContainers项目中的做法,把领域事件的发布延迟到上下文保存修改时。也就是实体类中只注册要发布的领域事件,然后再上下文的SaveChanges方法被调用时,我们再发布领域事件。领域事件是由聚合根进行管理的,因此我们定义了供聚合根进行事件注册的接口IDomainEvents。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
{

/// <summary>
/// 定义领域事件集合的接口。
/// 该接口用于聚合根或实体对象中,管理与其相关的领域事件。
/// 领域事件通常用于在领域模型内部发生重要业务行为时,
/// 将这些行为以事件的形式发布给外部进行处理(如发送通知、集成等)。
///
/// 主要职责包括:
/// 1. 获取当前已注册的所有领域事件。
/// 2. 添加新的领域事件。
/// 3. 如果不存在则添加新的领域事件(防止重复添加)。
/// 4. 清空所有已注册的领域事件。
/// </summary>
public interface IDomainEvents
{
IEnumerable<INotification> GetDomainEvents();
void AddDomainEvent(INotification eventItem);
void AddDomainEventIfAbsent(INotification eventItem);
void ClearDomainEvents();
}
}

为了简化实体类的代码编写,我们编写实现了IDomainEvents接口的抽象实体类BaseEntity。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
{
/// <summary>
/// 领域实体基类,支持领域事件的收集与管理。
///
/// 主要功能说明:
/// 1. 通过 AddDomainEvent 方法添加领域事件(INotification 实例)。
/// 2. 通过 AddDomainEventIfAbsent 方法避免重复添加相同事件。
/// 3. 通过 ClearDomainEvents 方法清空所有已收集的领域事件。
/// 4. 通过 GetDomainEvents 方法获取当前收集的所有领域事件。
///
/// 使用场景:
/// - 领域模型在业务操作中可调用 AddDomainEvent/AddDomainEventIfAbsent 添加事件。
/// - 应用层可在持久化操作后统一发布并清理领域事件。
/// </summary>
public abstract class BaseEntity : IDomainEvents
{
private List<INotification> DomainEvents = new();
public void AddDomainEvent(INotification eventItem)
{
DomainEvents.Add(eventItem);
}

public void AddDomainEventIfAbsent(INotification eventItem)
{
if(!DomainEvents.Contains(eventItem))
{
DomainEvents.Add(eventItem);
}
}

public void ClearDomainEvents()
{
DomainEvents.Clear();
}

public IEnumerable<INotification> GetDomainEvents()
{
return DomainEvents;
}
}
}

我们需要在上下文中保存数据的时候发布注册的领域事件。在DDD中,每个聚合都对应一个上下文,因此项目中的上下文类非常多。为了简化上下文码的编写,我们编写BaseContext类,将在SaveChanges中发布领域事件的代码分装到这个类中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

/// <summary>
/// 基础DbContext,集成领域事件发布机制。
/// 在调用SaveChangesAsync时,会自动收集实现IDomainEvents接口的实体上的领域事件,
/// 并通过IMediator依次发布这些事件,最后再提交数据库变更。
/// 注意:禁止直接调用同步的SaveChanges方法。
/// </summary>
public class BaseDbContext:DbContext
{
private IMediator mediator;
public BaseDbContext(DbContextOptions options,IMediator mediator) : base(options)
{
this.mediator = mediator;
}
public override int SaveChanges(bool acceptAllChangesOnSuccess)
{
throw new NotImplementedException("Dot't call SaveCheanges");
}
public override async Task<int> SaveChangesAsync(bool acceptAllChangesOnSuccess,CancellationToken cancellationToken = default)
{
var domainEntities = this.ChangeTracker.Entries<IDomainEvents>()
.Where(x => x.Entity.GetDomainEvents().Any());
var domainEvents = domainEntities.SelectMany(x => x.Entity.GetDomainEvents()).ToList();
domainEntities.ToList().ForEach(entity=>entity.Entity.ClearDomainEvents());
foreach(var domainEvent in domainEvents)
{
await mediator.Publish(domainEvent);
}
return await base.SaveChangesAsync(acceptAllChangesOnSuccess,cancellationToken);
}
}

因为我们需要发布注册的领域事件,所以我们通过构造方法注入IMediator服务;我们重写父类的SaveChangesAsync方法,在调用父类的SaveChangesAsync方法保存修改之前,我们把所有实体类中注册的领域事件发布出去;第15行代码中获得ChangeTracker是上下文中用来对实体类的变化进行追踪的对象,Entries<IDomainEvents>获得的是所有实现了IDomainEvents接口的追踪实体类;我们在项目中强制要求不能使用同步的SaveChanges方法,因此第10行代码中对SaveChanges的调用抛出异常。

至此,我们完成了EFCore中简化领域事件发布的几个接口和抽象类的开发。接下来,我们编写用来测试的实体类和上下文,首先我们编写代表用户的实体类User

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/// <summary>
/// 用户领域实体,包含用户的基本信息及领域事件的发布。
///
/// 主要功能说明:
/// 1. 构造函数创建用户时发布 UserAddedEvent 领域事件。
/// 2. 修改昵称和年龄时,若未发布过 UserUpdatedEvent,则发布该事件。
/// 3. 软删除用户时发布 UserSoftDeletedEvent 领域事件。
/// 4. 继承自 BaseEntity,支持领域事件的收集与管理。
/// </summary>

public class User:BaseEntity
{
public Guid Id { get; init; }
public string UserName { get; init; }
public string Email { get; private set; }
public string? Nickname { get; private set; }
public int? Age { get; private set; }
public bool IsDeleted { get; private set; }
public User() { }
public User(string userName,string email)
{
this.Id = Guid.NewGuid();
this.UserName = userName;
this.Email = email;
this.IsDeleted = false;
AddDomainEvent(new UserAddedEvent(this));
}
public void ChangNickName(string? value)
{
this.Nickname = value;
AddDomainEventIfAbsent(new UserUpdatedEvent(Id));
}
public void ChangeAge(int value)
{
this.Age=value;
AddDomainEventIfAbsent(new UserUpdatedEvent(Id));
}
public void SoftDelete()
{
this.IsDeleted=true;
AddDomainEvent(new UserSoftDeletedEvent(Id));
}
}

我们在第16行代码的有参构造方法中,发布了UserAddedEvent领域事件,这样当我们创建新的实体类并且保存修改的时候,这个领域事件就会被发布。但是如果EF Core从数据库中加载已有数据的时候,也执行第16行代码的有参构造方法,就会导致在加载数据的时候也发步UserAddedEvent领域事件,这就发生逻辑错误了,因此我们在第9行代码中提供一个无参构造方法供EF Core从数据库中加载数据是使用。

因为我们可能连续调用ChangeNickName、ChangeAge等方法,所有我们在第21、26行代码中通过AddDomainEventlfAbsent注册领域事件,从而避免消息的重复发布。

1
2
3
4
5
6
{
public record UserAddedEvent(User Item):INotification;
public record UserUpdatedEvent(Guid Id):INotification;
public record UserSoftDeletedEvent(Guid Id):INotification;

}

接下来,我们编写事件处理类来对这些领域事件进行处理。首先我们编写响应UserAddedEvent领域事件,然后向用户发送注册邮件的NewUserSendEmailHandler类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class NewUserSendEmailHandler : INotificationHandler<UserAddedEvent>
{
private readonly ILogger<NewUserSendEmailHandler> _logger;

public NewUserSendEmailHandler(ILogger<NewUserSendEmailHandler> logger)
{
_logger = logger;
}

public Task Handle(UserAddedEvent notification, CancellationToken cancellationToken)
{
var user = notification.Item;
_logger.LogInformation($"向{user.Email}发送欢迎邮件");
return Task.CompletedTask;
}
}

使用日志输出代替真正的日志发送。在实际项目中,由于邮件发送比较耗时,建议把邮件发送的代码放到后台线程执行,而不是异步等待邮件发送的结果。

下面我们在实现一个“当用户的个人信息被修改后,发邮件通知用户的事件处理者”的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class ModifyUserLogHandler : INotificationHandler<UserUpdatedEvent>
{
private readonly UserDbContext _dbContext;
private readonly ILogger<ModifyUserLogHandler> _logger;

public ModifyUserLogHandler(BaseDbContext dbContext, ILogger<ModifyUserLogHandler> logger)
{
_dbContext = dbContext;
_logger = logger;
}

public async Task Handle(UserUpdatedEvent notification, CancellationToken cancellationToken)
{
var user=await _dbContext.Users.FindAsync(notification.Id);
_logger.LogInformation($"通知用户{user.Email}的信息被修改");
}
}

因为UserUpdatedEvent中只包含被修改用户的标识符,所以我们在第12行代码中通过FindAsync获取被修改用户的详细信息。因为FindAsync会首先从上下文的缓存中获取对象,而修改操作之前被修改的对象已经存在与缓存中了,所以用FindAsync不仅能够获取还没有提交到数据库的对象,而且由于FindAsync操作不会再到数据库中查询,因此程序的性能更高。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
{
[Route("api/[controller]/[action]")]
[ApiController]
public class UsersController : ControllerBase
{
private UserDbContext context;

public UsersController(UserDbContext context)
{
this.context = context;
}

[HttpPost]
public async Task<IActionResult> Add(AddUserRequest req)
{
var user = new User(req.UserName, req.Email);
context.Users.Add(user);
await context.SaveChangesAsync();
return Ok();
}

[HttpPut]
[Route("{id}")]
public async Task<IActionResult> Update(Guid id,UpdateUserRequest req)
{
User? user = context.Users.Find(id);
if (user==null)
{
return NotFound($"id={id}的User不存在");
}
user.ChangeAge(req.Age);
user.ChangeEmail(req.Email);
user.ChangeNickName(req.NickName);
await context.SaveChangesAsync();
return Ok();
}

[HttpDelete]
[Route("id")]
public async Task<IActionResult> Delete(Guid id)
{
User? user = context.Users.Find(id);
if (user == null)
{
return NotFound($"id={id}的User不存在");
}
user.SoftDelete();
await context.SaveChangesAsync();
return Ok();
}
}
}

RabbitMQ的基本使用

和领域事件不同,集成事件用于在微服务间进行事件的传递,因此这是服务器间的通信,所以必须借助于第三方服务器作为事件总线。我们一般使用消息中间件来作为事件总线。明前有Redis、RabbitMQ、Kafka、ActiveMQ等。

我们先了解RabbitMQ中的几个基本概念。

  1. 信道(channel):信道是消息的生产者、消息者和服务器之间进行通信的虚拟连接。
    为什么叫“虚拟连接”呢?因此TCP连接的建立是非常消耗资源的,所以RabbitMQ在TCP链接的基础上构建了虚拟通信。我们尽量重复使用TCP连接,而通信是可以用完就关闭的。

  2. 队列(queue):队列是用来进行消息收发的地方,生产者把消息放到队列中,消费者从队列中获取消息。

  3. 交换机(exchange):交换机用于把消息路由到一个或者多个队列中。

Hello World

现在让我们生成两个项目,一个用于发布者,一个用于使用者

发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
using RabbitMQ.Client;
using System.Net.Quic;
using System.Text;

// 创建连接工厂对象,用于配置RabbitMQ连接参数
var factory = new ConnectionFactory();
factory.HostName = "localhost"; // RabbitMQ服务器地址
factory.Port = 5672; // 端口号
factory.UserName = "root"; // 用户名
factory.Password = "root"; // 密码
factory.VirtualHost = "vhost"; // 虚拟主机

// 创建与RabbitMQ的连接(异步)
using var connection = await factory.CreateConnectionAsync();
// 创建信道(异步),用于与RabbitMQ进行通信
using var channel = await connection.CreateChannelAsync();

// 声明一个名为"hello"的队列,如果不存在则自动创建
// durable: 队列是否持久化,false表示不持久化
// exclusive: 是否排他,false表示允许多个连接访问
// autoDelete: 是否自动删除,false表示不自动删除
// arguments: 其他参数,这里为null
await channel.QueueDeclareAsync(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);

// 要发送的消息内容
const string message = "Hello World!";
// 将消息内容编码为字节数组
var body = Encoding.UTF8.GetBytes(message);

// 发送消息到指定队列(exchange为空字符串表示默认交换机)
await channel.BasicPublishAsync(exchange: string.Empty, routingKey: "hello", body: body);
Console.WriteLine($" [x] Sent {message}");

Console.WriteLine(" Press [enter] to exit");
// 等待用户按下回车键后退出程序
Console.ReadLine();

接收

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Runtime.CompilerServices;
using System.Text;

// 创建连接工厂对象,用于配置RabbitMQ连接参数
var factory = new ConnectionFactory();
factory.HostName = "localhost"; // RabbitMQ服务器地址
factory.Port = 5672; // 端口号
factory.UserName = "root"; // 用户名
factory.Password = "root"; // 密码
factory.VirtualHost = "vhost"; // 虚拟主机

// 创建与RabbitMQ服务器的连接
using var connection = await factory.CreateConnectionAsync();
// 在连接的基础上创建一个通道(channel),用于收发消息
using var channel = await connection.CreateChannelAsync();

// 声明一个名为"hello"的队列,如果队列不存在则会自动创建
// durable: false 表示队列在RabbitMQ重启后不会保留
// exclusive: false 表示队列可以被多个连接共享
// autoDelete: false 表示队列在没有消费者时不会自动删除
await channel.QueueDeclareAsync(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null);

Console.WriteLine("[*] Waiting for messages"); // 提示正在等待消息

// 创建一个异步事件消费者,用于接收队列中的消息
var consumer = new AsyncEventingBasicConsumer(channel);

// 当收到消息时触发的事件
consumer.ReceivedAsync += (model, ea) =>
{
// 获取消息的内容(字节数组)
var body = ea.Body.ToArray();
// 将字节数组转换为字符串
var message = Encoding.UTF8.GetString(body);
// 打印收到的消息
Console.WriteLine($" [x] Received {message}");
return Task.CompletedTask;
};

// 开始消费"hello"队列中的消息,autoAck: true 表示自动确认消息
await channel.BasicConsumeAsync("hello", autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit."); // 提示用户按回车键退出
Console.ReadLine(); // 等待用户输入,防止程序退出

工作队列

工作队列(又名:任务队列)背后的主要思想是避免 立即执行资源密集型任务,并且必须等待 它完成。相反,我们将任务安排在稍后完成。我们将任务封装为消息并将其发送到队列。正在运行的 worker 进程 将弹出任务并最终执行 工作。当您运行许多 worker 时,任务将在它们之间共享。

创建两个项目

在之前发送项目添加GetMessage方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

// 创建连接工厂,配置RabbitMQ服务器信息
var factory = new ConnectionFactory();
factory.HostName = "localhost"; // RabbitMQ服务器地址
factory.Port = 5672; // 端口号
factory.UserName = "root"; // 用户名
factory.Password = "root"; // 密码
factory.VirtualHost = "vhosst"; // 虚拟主机

// 创建连接和通道(Channel)
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

// 声明一个持久化队列,名称为task_queue
await channel.QueueDeclareAsync(
queue: "task_queue", // 队列名称
durable: true, // 持久化
exclusive: false, // 非独占
autoDelete: false, // 不自动删除
arguments: null // 其他参数
);

// 设置每次只分发一个消息给消费者,保证公平分发
await channel.BasicQosAsync(
prefetchSize: 0, // 不限制消息大小
prefetchCount: 1, // 每次只分发一个消息
global: false // 只对当前通道生效
);

Console.WriteLine("[*] Waiting for message");

// 创建异步消费者
var consumer = new AsyncEventingBasicConsumer(channel);

// 注册接收消息的事件处理方法
consumer.ReceivedAsync += async (model, ea) =>
{
// 获取消息内容
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($"[x] 接收{message}");

// 模拟耗时操作,根据消息中.的数量决定等待时间
int dots = message.Split('.').Length - 1;
await Task.Delay(dots * 1000);

Console.WriteLine("[x] Done");

// 手动确认消息已处理
await channel.BasicAckAsync(deliveryTag: ea.DeliveryTag, multiple: false);
};

// 开始消费队列中的消息,关闭自动确认
await channel.BasicConsumeAsync(
queue: "task_queue", // 队列名称
autoAck: false, // 关闭自动确认
consumer: consumer // 消费者对象
);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

发布/订阅

在订阅模型中,多了一个Exchange角色,而且过程略有变化:
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。如何操作,取决于Exchange的类型。Exchange有常见的以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routingKey的队列
Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

临时队列

您可能还记得,我们之前使用的队列具有 具体名称(remember 和 ?)。能够命名 队列对我们来说至关重要 —— 我们需要将 worker 指向 相同的队列。在以下情况下,为队列命名非常重要 希望在生成者和使用者之间共享队列。hellotask_queue

但对于我们的 Logger 来说,情况并非如此。我们想听听所有 日志消息,而不仅仅是其中的一个子集。我们是 也只对当前流淌的消息感兴趣,而不是在旧的消息中 的。要解决这个问题,我们需要两件事。

首先,每当我们连接到 Rabbit 时,我们都需要一个新的空队列。 为此,我们可以创建一个具有随机名称的队列,或者 更好的是 - 让服务器为我们选择一个随机队列名称。

其次,一旦我们断开了消费者的连接,队列应该是 自动删除。

在 .NET 客户端中,当我们不提供任何参数时,我们会创建一个具有生成名称的非持久、独占、自动删除队列:QueueDeclareAsync()

1
2
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;

现在我们需要 告诉 Exchange 将消息发送到我们的队列。这种关系 在 exchange 和 queue 之间称为 Binding。

1
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
using RabbitMQ.Client;
using System.Text;

// 创建连接工厂并设置RabbitMQ服务器的相关参数
var factory = new ConnectionFactory();
factory.HostName = "localhost"; // RabbitMQ服务器地址
factory.Port = 5672; // 端口号
factory.UserName = "root"; // 用户名
factory.Password = "root"; // 密码
factory.VirtualHost = "vhost"; // 虚拟主机

// 异步创建连接
using var connection = await factory.CreateConnectionAsync();
// 异步创建信道(channel)
using var channel = await connection.CreateChannelAsync();

// 声明一个名为"logs"的交换机,类型为Fanout(广播模式)
await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);

// 获取要发送的消息内容
var message = GetMessage(args);
// 将消息内容编码为字节数组
var body = Encoding.UTF8.GetBytes(message);
// 通过交换机发布消息,routingKey为空表示广播
await channel.BasicPublishAsync(exchange: "logs", routingKey: string.Empty, body: body);
Console.WriteLine($"[x] 发送{message}");

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

// 获取命令行参数作为消息内容,如果没有参数则发送默认消息
static string GetMessage(string[] args)
{
return ((args.Length > 0) ? string.Join(" ", args) : "info:Hello World");
}

在建立连接后,我们声明了 交换。此步骤是必需的,因为发布到不存在的 禁止交换。

如果还没有队列绑定到 Exchange,则消息将丢失。 但这对我们来说没关系;如果还没有消费者在监听,我们可以安全地丢弃该消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

// 创建连接工厂,设置RabbitMQ服务器的相关参数
var factory = new ConnectionFactory();
factory.HostName = "localhost"; // RabbitMQ服务器地址
factory.Port = 5672; // 端口号
factory.UserName = "root"; // 用户名
factory.Password = "root"; // 密码
factory.VirtualHost = "vhost"; // 虚拟主机

// 创建与RabbitMQ的连接
using var connection = await factory.CreateConnectionAsync();
// 创建通道,用于与RabbitMQ进行通信
using var channel = await connection.CreateChannelAsync();

// 声明一个名为"logs"的交换机,类型为fanout(广播模式)
await channel.ExchangeDeclareAsync(exchange: "logs", type: ExchangeType.Fanout);

// 创建一个临时队列,队列名称由RabbitMQ自动生成
QueueDeclareOk queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;

// 将队列绑定到"logs"交换机,不指定路由键(fanout类型会忽略路由键)
await channel.QueueBindAsync(queue: queueName, exchange: "logs", routingKey: string.Empty);

Console.WriteLine("[*] Waiting for logs.");

// 创建一个异步事件消费者,用于接收消息
var consumer = new AsyncEventingBasicConsumer(channel);
// 注册接收消息的事件处理方法
consumer.ReceivedAsync += (model, ea) =>
{
// 获取消息体并转换为字符串
byte[] body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [X] {message}");
return Task.CompletedTask;
};

// 开始消费队列中的消息,autoAck=true表示自动确认消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

路由

队列与交换机的绑定,不能是任意绑定,而是要指定一个RoutingKey
消息的发送方在向Exchange发送消息时,也必须指定消息的RoutingKey
Exchange不再把消息交给每一个绑定的队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,才会接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
using RabbitMQ.Client;
using System.Text;

// 创建连接工厂并设置RabbitMQ服务器的相关参数
var factory = new ConnectionFactory();
factory.HostName = "localhost"; // RabbitMQ服务器地址
factory.Port = 5672; // 端口号
factory.UserName = "root"; // 用户名
factory.Password = "root"; // 密码
factory.VirtualHost = "vhost"; // 虚拟主机

// 异步创建连接
using var connection = await factory.CreateConnectionAsync();
// 异步创建通道
using var channel = await connection.CreateChannelAsync();

// 声明一个名为"direct_logs"的直连交换机
await channel.ExchangeDeclareAsync(exchange: "direct_logs", type: ExchangeType.Direct);

// 获取日志级别(如info、warning、error),默认为info
var severity = (args.Length > 0) ? args[0] : "info";
// 获取要发送的消息内容,默认为"Hello World"
var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello World";
// 将消息内容编码为字节数组
var body = Encoding.UTF8.GetBytes(message);
// 发送消息到指定的交换机和路由键(即日志级别)
await channel.BasicPublishAsync(exchange: "direct_logs", routingKey: severity, body: body);
Console.WriteLine($" [x] 发送'{severity}:{message}'");

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

// 检查命令行参数,至少需要一个参数(info、warning 或 error)
if (args.Length < 1)
{
Console.Error.WriteLine("Usage:{0} [info] [warning] [error]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine("Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}

// 创建连接工厂,设置 RabbitMQ 服务器的连接信息
var factory = new ConnectionFactory();
factory.HostName = "localhost";
factory.Port = 5672;
factory.UserName = "root";
factory.Password = "root";
factory.VirtualHost = "vhost";

// 建立与 RabbitMQ 的连接和通道(channel)
using var connection = await factory.CreateConnectionAsync();
using var channel = await connection.CreateChannelAsync();

// 声明一个 direct 类型的交换机,名称为 direct_logs
await channel.ExchangeDeclareAsync(exchange: "direct_logs", type: ExchangeType.Direct);

// 声明一个临时队列,队列名称由 RabbitMQ 自动生成
var queueDeclareResult = await channel.QueueDeclareAsync();
string queueName = queueDeclareResult.QueueName;

// 将队列绑定到交换机上,使用命令行参数作为路由键
foreach (string? severity in args)
{
await channel.QueueBindAsync(queue: queueName, exchange: "direct_logs", routingKey: severity);
}

Console.WriteLine(" [*] Waitiy for messages.");

// 创建异步消费者,接收消息时触发事件
var consumer = new AsyncEventingBasicConsumer(channel);
consumer.ReceivedAsync += (model, ea) =>
{
// 获取消息体并转换为字符串
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
// 打印收到的消息和路由键
Console.WriteLine($" [x] Received '{routingKey}:{message}'");
return Task.CompletedTask;
};

// 开始消费队列中的消息,自动确认消息
await channel.BasicConsumeAsync(queueName, autoAck: true, consumer: consumer);

Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

如果您只想保存 ‘warning’ 和 ‘error’ (而不是 ‘info’) 日志 messages 添加到文件中,只需打开控制台并键入:

1
dotnet run info warning error

.NET中简化DDD集成事件的框架

使用Zack.EventBus,并简化了以后迁移到其他MQ服务器的工作量。

  1. 创建两个ASP.NET Core Web API项目,它们分别是发布集成事件的项目和消费集成事件的项目,然后我们为这两个项目都安装NuGe包:Zack.EventBus.

  2. 我们在配置系统下创建一个名字为EventBus的节点,节点下包含HostName、ExchangName两个属性,它们分别代表RabbitMQ服务器的地址和交换机的名字。

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"EventBus": {
"HostName": "localhost",
"ExchangName": "EventBusDemo1"
}
}
  1. 在两个项目中的Prgram.cs文件中的builder.Build()上面增加对IntegrationEventRabbitMQOptions进行配置的代码以及对AddEventBus的调用,然后还要在builder.Build()下面调用UseEventBus()。

发送者

1
2
3
4
5
6
7
8
9
10
// 从配置文件中获取名为"EventBus"的配置节
var eventBusSec = builder.Configuration.GetSection("EventBus");
// 将"EventBus"配置节绑定到IntegrationEventRabbitMQOptions对象,用于后续依赖注入
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
// 注册事件总线服务,指定队列名称为"EventBusDemo1_Q1",并扫描当前程序集中的事件处理器
builder.Services.AddEventBus("EventBusDemo1_Q1", Assembly.GetExecutingAssembly());

var app = builder.Build();
// 启用事件总线中间件,自动订阅和处理集成事件
app.UseEventBus();

接收者

1
2
3
4
5
var eventBusSec = builder.Configuration.GetSection("EventBus");
builder.Services.Configure<IntegrationEventRabbitMQOptions>(eventBusSec);
builder.Services.AddEventBus("EventBusDemo1_Q2",Assembly.GetExecutingAssembly());
var app = builder.Build();
app.UseEventBus();

这两个项目代码几乎一样,只有调用的AddEventBus方法的第一个参数queueName的值不一样,因为这个参数用来设定程序绑定的队列的名字,一般每个微服务项目的queueName参数值都不同,一边每个程序都能收到消息。对于同一个微服务的多个部署的集群实例,我们一般设置它们的queueName参数值相同,这样对于同一个消息,同一个微服务项目只有一个集群实例收到该消息,这符合大部分项目的需求。如果需要同一个微服务项目的每个集群实例都收到消息,则需要把每个集群实例的queueName参数值设置为不同的值,比如在原有的queueName后面再附加机器名、IP地址等集群内的唯一标识信息。

AddEventBus方法的第二个参数为含有监听集成事件的处理者代码的程序集。

  1. 我们在需要发布领域事件的类中注入IEventBus服务,然后调用IEventBus的Publish方法发布消息。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class DemoController : ControllerBase
{
// 事件总线接口,用于发布和订阅事件
private IEventBus eventBus;

// 通过依赖注入获取事件总线实例
public DemoController(IEventBus eventBus)
{
this.eventBus = eventBus;
}

// 通过 HTTP POST 请求触发该方法,发布一个名为 "UserAdded" 的事件
[HttpPost]
public string Publish()
{
// 发布事件,事件名为 "UserAdded",携带用户信息(用户名和年龄)
eventBus.Publish("UserAdded", new { UserName = "YOUXIANYU", Age = 19 });
// 返回操作结果
return "ok";
}
}
  1. 我们创造一个实现了IIntegrationEventHandler接口的类,这个类用来处理收到的事件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 事件名称为 "UserAdded",用于标识该事件处理器处理的是用户新增事件
[EventName("UserAdded")]
public class UserAddesEventHandler : IIntegrationEventHandler
{
// 日志记录器,用于输出日志信息
private readonly ILogger<UserAddesEventHandler> logger;

// 构造函数,通过依赖注入获取日志记录器
public UserAddesEventHandler(ILogger<UserAddesEventHandler> logger)
{
this.logger = logger;
}

// 处理事件的方法
// eventName: 事件名称
// eventData: 事件数据(这里一般是新用户的信息)
public Task Handle(string eventName, string eventData)
{
// 记录一条日志,表示新建了一个用户,并输出用户相关数据
logger.LogInformation("新建了用户:" + eventData);
return Task.CompletedTask;
}
}