当前位置:首页 > 文章 > 正文内容

哎呀,我老大写Bug啦——记一次MessageQueue的优化

廖万里2年前 (2022-10-27)文章18301

 MessageQueue,顾名思义消息队列,在系统开发中也是用的比较多的一个中间件吧。我们这里主要用它来做日志管理和订单管理的,记得老老大(恩,是的,就是老老大,因为他已经跳槽了)还在的时候,当时也是为了赶项目进度,他也参与开发了,那时候我才刚刚入职,他负责写后端这块,我来了就把他手上的任务接过来了,(接着接着……就辞职了)。哎呀,我老大写Bug啦——记一次MessageQueue的优化

之后我们的开发仍然有条不紊的开发着,直到今年的一月份吧,才上线开始运行,然后就出现了常规状态,上线之后就开始爆炸,

                                                                                     哎呀,我老大写Bug啦——记一次MessageQueue的优化

这个页面打不开呀,那个内容没东西呀,第三方登录问题呀,支付问题呀,临时再改需求呀……(该来的都来了),加班、debug、测试、再debug……,然后经过几天的修复,终于完成了跟自己电脑一样稳定的运行,组员们都美滋滋的,今晚加个鸡腿才行。

                                                                                    哎呀,我老大写Bug啦——记一次MessageQueue的优化

都说祸不单行,古人是不会骗我们的,Bug怎么会修得完呢?天真,要是Bug能修得完还要我们来干啥,好景不长,果然,过了一周之后,组员突然群里叫喳喳,

哎呀,我老大写Bug啦——记一次MessageQueue的优化哎呀,我老大写Bug啦——记一次MessageQueue的优化

what is it ? 

 哎呀,我老大写Bug啦——记一次MessageQueue的优化

 

来了,今天的主角登场了,我也要开始加班了。

RabbitMQ

  这个是今天要说的东西,基础概念什么的不是今天要说的重点,重点是:

哎呀,我老大写Bug啦——记一次MessageQueue的优化

 

RabbitMQ内存使得整个服务器濒临瘫痪,远程登录服务器都差点挤不进去的状态,别看截图目前才1.3G,吃个午饭回来,就2.3G了,可怕不可怕?咋回事?

老板喊你回来加班啦

  先不管了,线上优先解决,手动先Reset回收资源以释放空间,这个只是临时的办法,然后检查一下rabbitMQ的配置有没有问题,路径在

 C:\Users\Administrator\AppData\Roaming\RabbitMQ 

哎呀,我老大写Bug啦——记一次MessageQueue的优化

完全是默认的配置,完全ojbk啊,那到底咋回事?继续检查,想想不如从项目开始吧,然后查看项目中的代码,都是从来自【MessageLib】的组件调用

哎呀,我老大写Bug啦——记一次MessageQueue的优化

哎呀,我老大写Bug啦——记一次MessageQueue的优化

好了,叫我老老大要这个组件的代码,他把git的地址就发给我,我把项目down下来,

哎呀,我老大写Bug啦——记一次MessageQueue的优化

这个封装的组件内容不多,主要的文件一目了然,其实就是用到这个两个组件来进行的二次封装来调用

哎呀,我老大写Bug啦——记一次MessageQueue的优化

主要的代码是在【MessageQueue.cs】文件里,展示一下当时的代码情况:

哎呀,我老大写Bug啦——记一次MessageQueue的优化 View Code

然后我就发现了这一段代码!

哎呀,我老大写Bug啦——记一次MessageQueue的优化
        /// <summary>
        /// 程序自运行并开始监听        /// </summary>
        public static void Run()
        {
            System.Timers.Timer timer = new System.Timers.Timer();
            timer.Interval = 1000;
            timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到达时间的时候执行事件;    
            timer.AutoReset = true;//设置是执行一次(false)还是一直执行(true);    
            timer.Enabled = true;//是否执行System.Timers.Timer.Elapsed事件;    
        }
哎呀,我老大写Bug啦——记一次MessageQueue的优化
哎呀,我老大写Bug啦——记一次MessageQueue的优化
        /// <summary>
        /// 启动线程异步调用        /// </summary>
        /// <param name="channelType"></param>
        private static void Send(string channelType)
        {
            Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
            thread.IsBackground = true;
            thread.Start(channelType);
        }
哎呀,我老大写Bug啦——记一次MessageQueue的优化

哎呀,我老大写Bug啦——记一次MessageQueue的优化

  老老大写Bug了,当Run()起来之后,队列中【NoticQueue】有内容,就开始推送消息,发送消息Send(),每来一次推送new一个线程并设置为后台线程,然后发送消息。好了,明白了,这里的线程很混乱,因为线程操作不当,new了N多个频道,并且没有主动回收,这也难怪内存暴涨呢。并且要是Run()调用多次,后果更加不堪设想。

加班改起来

  开始动手吧,业务主要推送有普通消息、错误消息和通知消息,那么将队列与线程组装一起,新增一个类QueueTask.cs:

哎呀,我老大写Bug啦——记一次MessageQueue的优化
using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;using MessageLib.Core;using MessageLib.Core.ClassBean;using EasyNetQ;using EasyNetQ.Topology;using System.Linq.Expressions;namespace MessageLib.Core
{    public class QueueTask
    {        private Queue<Item> QueueData = new Queue<Item>(5000);        //队列数目发布数量
        private int max_count_to_pulish = 1000;        public  bool isRunning = false;        private string itemType = ItemType.info;        private string MessageRouter = ItemType.info;        public QueueTask(string itemType,string MessageRouter)
        {            this.itemType = itemType;            this.MessageRouter = MessageRouter;
        }        /// <summary>
        /// 可供外部使用的消息入列操作        /// </summary>
        public void Push(Item item, IBus IBus)
        {
            QueueData.Enqueue(item);            if (!isRunning)
                Run(IBus);
        }        public void Run(IBus IBus)
        {            if (!isRunning)
            {
                Timer timerNotic = new Timer(PulishMsg, IBus, 1000, 1000);
                isRunning = true;
            }
        }        private void PulishMsg(object state)
        {
            IBus IBus = state as IBus;            if (QueueData.Count > 0)
            {
                PublisMsg(itemType, IBus);
            }
        }        private void PublisMsg(object channel, IBus BusInstance)
        {            try
            {                string channelName = channel as string;                if (QueueData.Count > 0)
                {                    var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));                    var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);                    var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);                    while (QueueData.Count > 0)
                    {
                        Item item = QueueData.Dequeue();                        if (item != null)
                        {                            var properties = new MessageProperties();                            var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
                            Message.Properties.AppId = item.appid;
                            BusInstance.Advanced.Publish(exchange, mqqueue.Name, false, Message);
                        }
                    }
                }
            }            catch (Exception ex)
            {
                Console.WriteLine("PublisMsg error:" + ex.Message);
            }
        } 

        public void Read<T>(IBus BusInstance,Action<Item> dealAction) where T : Item
        {            try
            {                string channelName = itemType;                var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));                var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);                var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);                var Consume = BusInstance.Advanced.Consume(mqqueue, registration => Task.Run(() =>
                {
                    registration.Add<string>((message, info) => 
                    {
                        Item data = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(message.Body);
                        dealAction(data);
                    });
                }));
            }            catch (Exception ex)
            {
                Console.WriteLine("Read error:" + ex.Message);
            }
        }
    }
}
哎呀,我老大写Bug啦——记一次MessageQueue的优化

 

然后,在MessageQueue.cs修改为单例模式:

哎呀,我老大写Bug啦——记一次MessageQueue的优化 View Code

每次推送消息的时候,每个QueueTask就自己维护自己的线程和队列了,当调用推送之后,就开始运作起来。恩,应该没问题了。然后就发布nuget,再更新项目,然后发布。观察一段时间,恩,完美。

哎呀,我老大写Bug啦——记一次MessageQueue的优化

哎呀,我老大写Bug啦——记一次MessageQueue的优化

 

事件二

  事情过后,B端开始搞起来了,然后涉及到订单系统,跟老大(不是老老大,老老大那时候已经跑了)商量之后确定使用消息队列来做订单的事件的拓展,然后就直接美滋滋的调用好之前写的了,没想到啊,这次是线程涨!因为订单是从B端推送过来的,B端肯定没事,订单后台订阅消息之后,读取过程中出现的线程增多,然后看看之前写的Read()方法,感觉没啥问题啊,每运行完一次,就多了一个线程,这个神奇了啊,那么源代码撸起来。

https://github.com/EasyNetQ/EasyNetQ

哎呀,我老大写Bug啦——记一次MessageQueue的优化

翻来覆去,看到这个Consume方法,继承的是IDisposable接口,得勒,知道咋回事了。

哎呀,我老大写Bug啦——记一次MessageQueue的优化

Consume.Dispose(); 多个消费者的情况下,用完请记得主动释放啊。

这回真的可以浪了。

 哎呀,我老大写Bug啦——记一次MessageQueue的优化

总结

  遇到问题,冷静下来,耐得了寂寞才行。线上的问题优先解决,然后再慢慢Debug,解决不了,看源码,再解决不了,降级处理,欢迎共同探讨。同时也感谢一下技术群里的兄弟给的一些建议,并帮忙查找资料,还好EasyNetQ是开源了,不然也打算说先不用了,毕竟一开始没什么用户量,所以没必要整那么麻烦,加班加点的弄这个问题。不过最终都完美的解决了,心里还是挺美滋滋的,程序猿随之而来的成就感。

  别看我们在工位上默不作声,我们可能在拯救世界呢!老板,该加工资啦!

                                                                                             哎呀,我老大写Bug啦——记一次MessageQueue的优化

 补充

2018-12-25  鉴于大伙私信我想看看原来的bug修复后的情况,毕竟是公司代码不适合完全开源,我单独把例子源码做过修改的发布出来,思路都差不多的,对比一下文章中原来的有问题的代码就可以了吧。因为都已经修复掉了,修改后的在这里。?


本文链接:https://www.kkkliao.cn/?id=133 转载需授权!

分享到:

添加博主微信共同交流探讨信息差网赚项目: 19528888767 , 请猛戳这里→点我添加

版权声明:本文由廖万里的博客发布,如需转载请注明出处。

“哎呀,我老大写Bug啦——记一次MessageQueue的优化” 的相关文章

不打游戏只看视频,骁龙和天玑竟然能拉开这么大差距?

不打游戏只看视频,骁龙和天玑竟然能拉开这么大差距?

事情是这样的。最近托尼有位同事因为之前被使用三星 4nm 工艺的骁龙 8 Gen 1 折腾怕了,所以他在把原来的旧手机卖了之后,转手换了台搭载天玑 9000 的手机。一开始他对这台手机可以说非常满意,打游戏时发热终于没那么严重了,然而时间一长,他发现手机电量貌似掉的有点快,续航并没有想象中那么顶。本...

6G专利申请量比拼:美国占比35.2%,日本占比9.9%,中国排第几?

6G专利申请量比拼:美国占比35.2%,日本占比9.9%,中国排第几?

5G对于现在的人们来说已经不是什么神秘的存在,很多国家通过近些年的研究都已经逐步掌握了5G技术。不过就5G技术发展的成熟度、设备完善程度以及信号铺设范围来说,我国都是当之无愧的世界第一。但是随着科技技术的不断发展,对于6G的技术研发也被提上日程。不少国家都开始攻克6G技术,并取得了各项专利。根据有关...

步步高创始人段永平,高手有所为有所不为,35条深度思考值得收藏

步步高创始人段永平,高手有所为有所不为,35条深度思考值得收藏

段永平,一个注定在商业史无法被忽视的存在。段永平的经历可谓传奇。他是国内第一个拍下来股神巴菲特午餐的男人,那时候他还带上了现在拼多多的创始人黄铮。而这个一手创办了小霸王、步步高等著名企业,并与Vivo、OPPO、一加和拼多多有着千丝万缕联系的企业家,这位通过投资网易、腾讯和苹果而获利颇丰的投资者,也...

这是一篇狗屁不通文章生成的文章

莎士比亚说过一句富有哲理的话,人的一生是短的,但如果卑劣地过这一生,就太长了。这启发了我, 所谓匿名信一封云来信网赚赚钱项目投资SEO百度专业收录关键词中国文章,关键是匿名信一封云来信网赚赚钱项目投资SEO百度专业收录关键词中国文章需要如何写。 我们一般认为,抓住了问题的关键,其他一切则会迎刃而解。...

一直瘦的人有什么共同的习惯?

一直瘦的人有什么共同的习惯?

抹拉汪09月06日关注我从170斤减到了118斤,维持了两年多到现在了,心得颇多。1、每天早上上称,数据不会骗人。2、每天固定8杯白开水,保证自己喝到2500ML的水量,促进代谢。3、一天三顿饭,尤其是早饭,一定会好好对待,白煮蛋、牛奶和一个小馒头,再配点两口能吃完的水果。4、尽量每天自己带饭去单位...

win10中文输入法不显示文字怎么办 win10中文输入法不显示文字的解决方法

win10中文输入法不显示文字怎么办 win10中文输入法不显示文字的解决方法

现在大家肯定都是通过中文输入法的拼音来输入文字,不过最近有很多用户反映在使用win10系统的中文输入法总是不显示文字,打不出汉字了,那么遇到这种情况的话,要怎么解决呢?下面小编就给大家带来了win10中文输入法不显示文字的解决方法,感兴趣的朋友快来了解下吧。   win10中文输入法不显示...

发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。