MassTransit
一、MassTransit
- 基本概念
定位:基于 .NET 平台 的开源消息总线框架,用于构建分布式、异步通信的应用程序,支持多种消息传输协议(如 RabbitMQ、Azure Service Bus、Kafka 等)。
设计目标:简化分布式系统中消息传递的开发,提供高层次的抽象(如消费者、管道、 Saga 模式等),降低开发者对消息中间件底层细节的依赖。
- 核心特点
多传输协议支持:可无缝集成 RabbitMQ、Azure Service Bus、Kafka、ActiveMQ 等多种消息代理。
编程模型友好:
基于 C# 语言,支持依赖注入(DI)和 Lambda 表达式,代码简洁易读。
内置消费者管道(Consumer Pipeline),支持消息过滤、重试、日志记录等中间件。
分布式事务支持:通过 Saga 模式 实现长流程事务的异步协调(如订单状态更新、支付回调处理)。
高可用性与监控:
支持消费者集群和负载均衡。
集成 Prometheus、Grafana 等监控工具,提供运行时指标和健康检查。
- 应用场景
.NET 微服务架构:作为服务间通信的核心组件(尤其适合 C# 开发团队)。
复杂业务流程:需要 Saga 模式处理的长事务场景(如电商订单的支付、发货、退款流程)。
多消息代理混合架构:系统需要兼容多种消息中间件时(如迁移至云服务时过渡)。
- 优缺点
优点:
深度集成 .NET 生态,开发效率高,适合 C# 开发者。
高层次抽象隐藏了消息中间件的复杂性,聚焦业务逻辑。
缺点:
仅限 .NET 平台,跨语言支持有限。
对非 .NET 技术栈(如 Java、Python)不友好。
MassTransit 是广泛使用的 .NET 服务总线,可简化基于消息的应用程序开发,而 MassTransit.RabbitMQ 则提供与 RabbitMQ 的必要集成。
安装NuGet包
1 2
| Install-Package MassTransit Install-Package MassTransit.RabbitMQ
|
- TransferData 消息模型配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| { public record TransferData { public string Type { get; init; } public int Amount { get; init; } }
public class Client { public string Name { get; set; } public int Pin { get; set; } }
public class Account { public string Name { get; set; } public int Deposit { get; set; } } public class CurrentBalance { public int Amount { get; set; } public string Currency { get; set; } public long Balance { get; set; } } }
|
- 在Program.cs中配置MassTransit。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| builder.Services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { cfg.Host(new Uri("rabbitmq://localhost"), h => { h.Username("root"); h.Password("root"); }); }); });
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84
| { [Route("api/[controller]/[action]")] [ApiController] public class QueueSenderController : ControllerBase { private readonly IBus bus; private readonly IRequestClient<TransferData> requestClient;
public QueueSenderController(IBus bus, IRequestClient<TransferData> requestClient) { this.bus = bus; this.requestClient = requestClient; }
[HttpPost("send-command")] public async Task<IActionResult> SendCommand() { var account = new Account() { Name = "张三", Deposit = 500 }; var url = new Uri("rabbitmq://localhost/send-command");
var endpoint = await bus.GetSendEndpoint(url); await endpoint.Send(account);
return Ok("命令已成功发送"); }
[HttpPost("publish-event")] public async Task<IActionResult> PublishEvent() { await bus.Publish(new Client() { Name = "李四", Pin = 123456 });
return Ok("事件发布成功"); }
[HttpPost("request-response")] public async Task<IActionResult> RequestResponse() { var transferData = new TransferData() { Type = "Withdrawal", Amount = 25 }; var request = requestClient.Create(transferData); var response = await request.GetResponse<CurrentBalance>(); return Ok($"响应数据: {response.Message.Currency}, 金额: {response.Message.Amount}"); } } }
|
配置消费者
创建接口,定义消费类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| { public interface IConsumer { void Consume(Client client);
void Consume(Account account);
void Consume(TransferData transferData); void Consume(CurrentBalance currentBalance); } }
|
1 2 3 4 5 6 7 8 9 10 11 12
| { public class PublisherServicer : IConsumer<Client> { public async Task Consume(ConsumeContext<Client> context) { var info = context.Message; } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| { public class RequestResponseServicer : IConsumer<TransferData> { public async Task Consume(ConsumeContext<TransferData> context) { var data = context.Message;
var nowBalance = new CurrentBalance() { Balance = 1000 - data.Amount };
await context.RespondAsync(nowBalance); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| { public class SenderServicer : IConsumer<Account> { public async Task Consume(ConsumeContext<Account> context) { var account = context.Message; } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| builder.Services.AddMassTransit(x => { x.AddConsumer<PublisherServicer>(); x.AddConsumer<SenderServicer>(); x.AddConsumer<RequestResponseServicer>();
x.UsingRabbitMq((contxit, config) => { config.Host(new Uri("rabbitmq://localhost"), h => { h.Username("root"); h.Password("root"); });
config.ReceiveEndpoint("send-command", e => { e.ConfigureConsumer<SenderServicer>(contxit); });
config.ReceiveEndpoint("publish-event", e => { e.ConfigureConsumer<PublisherServicer>(contxit); });
config.ReceiveEndpoint("request-response", e => { e.ConfigureConsumer<RequestResponseServicer>(contxit); }); }); });
|