• Post author:
  • Post category:后端
  • Post comments:0评论
  • Reading time:1 mins read

最近因为业务架构需要,编写一个支付订单结果通知的服务,但发布方和订阅方不在同一个项目内,虽然两者的框架都是EasyNetQ,但版本差距很大,发布方是3.x,接受方是6.x,在订阅消息时遇到了一些问题,这里记录下解决方案。

首先就是市面上的所有主流消息队列框架封装的都过于完善,在发布消息和订阅消息时,会对消息的Body以外的属性,比如Header做一些额外的操作,以帮助框架完成更高级的功能,但这带来的问题也是显而易见的,因为各个框架对发布消息和订阅消息的“协议”有所差别,所以经常会出现订阅方无法识别消息而消费失败的情况。

比如Cap这个框架,对Header有一些操作,如果消息没有Header,则Cap无法消费这个消息,而EasyNetQ则是为了方便订阅方反序列化对象,直接将对象的完全限定名写到了消息属性内,在订阅方消费时则按照这个完全限定名查找程序集内对应的类型,但这也带来了一个问题就是,如果跨项目跨语言跨框架的情况下,会导致无法正确消费消息。

框架的官方都会提供替换其内部组件的方式来实现自定义的方式,比如重写Type的序列化规则或者重写ConsumerSelector规则等等,但该方法往往是全局的,我自己项目的发布方有历史代码,要兼容产品的老版本,不敢轻易去动,所以只能在新写的订阅方来处理这个问题,

以EasyNetQ为例,查阅了非常多的资料,最终锁定了如下的方式进行消息订阅:

        protected override Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("订阅消息开始");
            try
            {
                var advancedBus = _bus.Advanced;
                var queue = advancedBus.QueueDeclare(RabbitRouteConfiguration.NOTIFY_ORDER_GOLD_PAY_RESULT_QUEUE);
                _bus.Advanced.Consume(queue, (msgBody, mp,mri) =>
                {
                    string body = Encoding.Default.GetString(msgBody);
                    var model = JsonConvert.DeserializeObject<NotifyMessageModel>(body);
                    var goldPayNotifyHandler = _services.GetRequiredService<GoldPayNotifyHandler>();
                    goldPayNotifyHandler.Handle(model);
                });

            }
            catch(Exception ex)
            {
                _logger.LogError($"发生系统异常:{ex.Message}|{ex.StackTrace}");
            }
            _logger.LogInformation("订阅消息结束");
            return Task.CompletedTask;
        }

这种方式就是用EasyNetQ的AdvancedBus对象,直接定义Consume,强行指定Queue进行订阅,收到消息的Body为byte[]类型,直接转成string后反序列化即可。

葫芦

葫芦,诞生于1992年8月11日,游戏宅,胶佬,爱好摸鱼,一个干过超市收银,工地里搬过砖,当过广告印刷狗,做过电焊铁艺的现役.Net程序员。

发表回复