最近因为业务架构需要,编写一个支付订单结果通知的服务,但发布方和订阅方不在同一个项目内,虽然两者的框架都是EasyNetQ,但版本差距很大,发布方是3.x,接受方是6.x,在订阅消息时遇到了一些问题,这里记录下解决方案。
首先就是市面上的所有主流消息队列框架封装的都过于完善,在发布消息和订阅消息时,会对消息的Body以外的属性,比如Header做一些额外的操作,以帮助框架完成更高级的功能,但这带来的问题也是显而易见的,因为各个框架对发布消息和订阅消息的“协议”有所差别,所以经常会出现订阅方无法识别消息而消费失败的情况。
比如Cap这个框架,对Header有一些操作,如果消息没有Header,则Cap无法消费这个消息,而EasyNetQ则是为了方便订阅方反序列化对象,直接将对象的完全限定名写到了消息属性内,在订阅方消费时则按照这个完全限定名查找程序集内对应的类型,但这也带来了一个问题就是,如果跨项目跨语言跨框架的情况下,会导致无法正确消费消息。
框架的官方都会提供替换其内部组件的方式来实现自定义的方式,比如重写Type的序列化规则或者重写ConsumerSelector规则等等,但该方法往往是全局的,我自己项目的发布方有历史代码,要兼容产品的老版本,不敢轻易去动,所以只能在新写的订阅方来处理这个问题,
以EasyNetQ为例,查阅了非常多的资料,最终锁定了如下的方式进行消息订阅:
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("订阅消息开始");
try
{
var advancedBus = _bus.Advanced;
var queue = advancedBus.QueueDeclare(RabbitRouteConfiguration.NOTIFY_ORDER_GOLD_PAY_RESULT_QUEUE);
_bus.Advanced.Consume(queue, (msgBody, mp,mri) =>
{
string body = Encoding.Default.GetString(msgBody);
var model = JsonConvert.DeserializeObject<NotifyMessageModel>(body);
var goldPayNotifyHandler = _services.GetRequiredService<GoldPayNotifyHandler>();
goldPayNotifyHandler.Handle(model);
});
}
catch(Exception ex)
{
_logger.LogError($"发生系统异常:{ex.Message}|{ex.StackTrace}");
}
_logger.LogInformation("订阅消息结束");
return Task.CompletedTask;
}
这种方式就是用EasyNetQ的AdvancedBus对象,直接定义Consume,强行指定Queue进行订阅,收到消息的Body为byte[]类型,直接转成string后反序列化即可。