MassTransit

一、MassTransit

  1. 基本概念
    定位:基于 .NET 平台 的开源消息总线框架,用于构建分布式、异步通信的应用程序,支持多种消息传输协议(如 RabbitMQ、Azure Service Bus、Kafka 等)。
    设计目标:简化分布式系统中消息传递的开发,提供高层次的抽象(如消费者、管道、 Saga 模式等),降低开发者对消息中间件底层细节的依赖。
  2. 核心特点
    多传输协议支持:可无缝集成 RabbitMQ、Azure Service Bus、Kafka、ActiveMQ 等多种消息代理。
    编程模型友好:
    基于 C# 语言,支持依赖注入(DI)和 Lambda 表达式,代码简洁易读。
    内置消费者管道(Consumer Pipeline),支持消息过滤、重试、日志记录等中间件。
    分布式事务支持:通过 Saga 模式 实现长流程事务的异步协调(如订单状态更新、支付回调处理)。
    高可用性与监控:
    支持消费者集群和负载均衡。
    集成 Prometheus、Grafana 等监控工具,提供运行时指标和健康检查。
  3. 应用场景
    .NET 微服务架构:作为服务间通信的核心组件(尤其适合 C# 开发团队)。
    复杂业务流程:需要 Saga 模式处理的长事务场景(如电商订单的支付、发货、退款流程)。
    多消息代理混合架构:系统需要兼容多种消息中间件时(如迁移至云服务时过渡)。
  4. 优缺点
    优点:
    深度集成 .NET 生态,开发效率高,适合 C# 开发者。
    高层次抽象隐藏了消息中间件的复杂性,聚焦业务逻辑。
    缺点:
    仅限 .NET 平台,跨语言支持有限。
    对非 .NET 技术栈(如 Java、Python)不友好。

MassTransit 是广泛使用的 .NET 服务总线,可简化基于消息的应用程序开发,而 MassTransit.RabbitMQ 则提供与 RabbitMQ 的必要集成。

安装NuGet包

1
2
Install-Package MassTransit
Install-Package MassTransit.RabbitMQ
  1. TransferData 消息模型配置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
{
// TransferData 记录类型,用于表示一次转账操作的数据
// Type: 转账类型(如“存款”或“取款”)
// Amount: 转账金额
public record TransferData
{
public string Type { get; init; }
public int Amount { get; init; }
}

// Client 记录类型,表示一个客户的信息
// Name: 客户姓名
// Pin: 客户的PIN码(个人识别码)
public class Client
{
public string Name { get; set; }
public int Pin { get; set; }
}

// Account 记录类型,表示一个账户的信息
// Name: 账户持有人姓名
// Deposit: 账户余额
public class Account
{
public string Name { get; set; }
public int Deposit { get; set; }
}
// CurrentBalance 类表示当前账户余额信息
// Amount: 当前余额金额
// Currency: 货币类型(如美元、欧元等)
// Balance: 余额的整数表示形式
public class CurrentBalance
{
public int Amount { get; set; }
public string Currency { get; set; }
public long Balance { get; set; }
}
}
  1. 在Program.cs中配置MassTransit。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 配置 MassTransit 使用 RabbitMQ 作为消息总线
builder.Services.AddMassTransit(x =>
{
// 使用 RabbitMQ 进行传输
x.UsingRabbitMq((context, cfg) =>
{
// 配置 RabbitMQ 主机地址和认证信息
cfg.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("root"); // 用户名
h.Password("root"); // 密码
});
});
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
{
/// <summary>
/// QueueSenderController 控制器用于演示通过 MassTransit 向 RabbitMQ 发送命令、发布事件和请求响应。
/// </summary>
[Route("api/[controller]/[action]")]
[ApiController]
public class QueueSenderController : ControllerBase
{
// MassTransit 的总线对象,用于发送和发布消息
private readonly IBus bus;
// 用于请求-响应模式的客户端
private readonly IRequestClient<TransferData> requestClient;

/// <summary>
/// 构造函数,注入总线和请求客户端
/// </summary>
/// <param name="bus">MassTransit 总线</param>
/// <param name="requestClient">请求-响应客户端</param>
public QueueSenderController(IBus bus, IRequestClient<TransferData> requestClient)
{
this.bus = bus;
this.requestClient = requestClient;
}

/// <summary>
/// 发送命令消息到指定的 RabbitMQ 队列
/// </summary>
/// <returns>操作结果</returns>
[HttpPost("send-command")]
public async Task<IActionResult> SendCommand()
{
// 创建一个账户对象,模拟要发送的数据
var account = new Account()
{
Name = "张三",
Deposit = 500
};
// 指定 RabbitMQ 队列的地址
var url = new Uri("rabbitmq://localhost/send-command");

// 获取发送端点并发送消息
var endpoint = await bus.GetSendEndpoint(url);
await endpoint.Send(account);

return Ok("命令已成功发送");
}

/// <summary>
/// 发布事件消息,所有订阅者都能收到
/// </summary>
/// <returns>操作结果</returns>
[HttpPost("publish-event")]
public async Task<IActionResult> PublishEvent()
{
// 发布一个客户端事件,所有监听该事件的服务都能收到
await bus.Publish(new Client()
{
Name = "李四",
Pin = 123456
});

return Ok("事件发布成功");
}

/// <summary>
/// 发送请求并等待响应,演示请求-响应模式
/// </summary>
/// <returns>响应内容</returns>
[HttpPost("request-response")]
public async Task<IActionResult> RequestResponse()
{
// 构造转账数据
var transferData = new TransferData()
{
Type = "Withdrawal",
Amount = 25
};
// 发送请求并等待响应
var request = requestClient.Create(transferData);
var response = await request.GetResponse<CurrentBalance>();
return Ok($"响应数据: {response.Message.Currency}, 金额: {response.Message.Amount}");
}
}
}

配置消费者

创建接口,定义消费类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
{
/// <summary>
/// IConsumer 接口定义了消费不同类型消息的方法。
/// 每个方法用于处理一种特定的数据类型。
/// </summary>
public interface IConsumer
{
/// <summary>
/// 消费 Client 类型的消息。
/// </summary>
/// <param name="client">要处理的 Client 对象。</param>
void Consume(Client client);

/// <summary>
/// 消费 Account 类型的消息。
/// </summary>
/// <param name="account">要处理的 Account 对象。</param>
void Consume(Account account);

/// <summary>
/// 消费 TransferData 类型的消息。
/// </summary>
/// <param name="transferData">要处理的 TransferData 对象,包含转账类型和金额。</param>
void Consume(TransferData transferData);
/// <summary>
/// 消费 CurrentBalance 类型的消息。
/// </summary>
void Consume(CurrentBalance currentBalance);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
{
// PublisherServicer 类实现了 MassTransit 的 IConsumer<Client> 接口,用于消费 Client 类型的消息
public class PublisherServicer : IConsumer<Client>
{
// 当有 Client 类型的消息到达时,Consume 方法会被自动调用
public async Task Consume(ConsumeContext<Client> context)
{
// 从消息上下文中获取消息体(Client 对象)
var info = context.Message;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
{
/// <summary>
/// RequestResponseServicer 类实现了 IConsumer 接口,用于处理 TransferData 类型的消息。
/// 当收到转账请求时,计算当前余额并响应给请求方。
/// </summary>
public class RequestResponseServicer : IConsumer<TransferData>
{
/// <summary>
/// 消费并处理 TransferData 消息。
/// 先获取消息中的转账金额,然后用 1000 减去该金额,得到当前余额,最后将余额作为响应返回。
/// </summary>
/// <param name="context">包含消息内容和上下文信息的对象</param>
public async Task Consume(ConsumeContext<TransferData> context)
{
// 获取消息内容
var data = context.Message;

// 计算当前余额:假设初始余额为 1000,减去转账金额
var nowBalance = new CurrentBalance()
{
Balance = 1000 - data.Amount
};

// 将当前余额作为响应返回给请求方
await context.RespondAsync(nowBalance);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
/// <summary>
/// 消息消费者服务,实现了 IConsumer&lt;Account&gt; 接口,用于处理 Account 类型的消息。
/// </summary>
public class SenderServicer : IConsumer<Account>
{
/// <summary>
/// 消费消息的方法,当接收到 Account 类型的消息时被调用。
/// </summary>
/// <param name="context">包含消息和上下文信息的 ConsumeContext 对象。</param>
public async Task Consume(ConsumeContext<Account> context)
{
// 获取消息体中的 Account 实例
var account = context.Message;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 配置 MassTransit 服务
builder.Services.AddMassTransit(x =>
{
// 注册消息消费者:PublisherServicer、SenderServicer、RequestResponseServicer
x.AddConsumer<PublisherServicer>();
x.AddConsumer<SenderServicer>();
x.AddConsumer<RequestResponseServicer>();

// 使用 RabbitMQ 作为消息中间件
x.UsingRabbitMq((contxit, config) =>
{
// 配置 RabbitMQ 主机地址和登录信息
config.Host(new Uri("rabbitmq://localhost"), h =>
{
h.Username("root"); // 用户名
h.Password("root"); // 密码
});

// 配置 send-command 队列,绑定 SenderServicer 消费者
config.ReceiveEndpoint("send-command", e =>
{
e.ConfigureConsumer<SenderServicer>(contxit);
});

// 配置 publish-event 队列,绑定 PublisherServicer 消费者
config.ReceiveEndpoint("publish-event", e =>
{
e.ConfigureConsumer<PublisherServicer>(contxit);
});

// 配置 request-response 队列,绑定 RequestResponseServicer 消费者
config.ReceiveEndpoint("request-response", e =>
{
e.ConfigureConsumer<RequestResponseServicer>(contxit);
});
});
});