昨天收到一个任务,有一个连接到ActiveMQ的生产者程序时不时崩溃,对方怀疑是主题消费速度太慢,消息堆积,导致程序的崩溃,让我这边监控一下主题的消费情况。虽然我觉得不太可能是这个原因,但是要求到还是监控一下。
  ActiveMQ提供了一个admin的管理页面,页面有主题的出队、入队数据,不过要获取这里的数据就比较麻烦,需要爬取页面。于是查找ActiveMQ是否提供了api可以获取到这样的数据,在官网http://activemq.apache.org/rest.html上看,5.8版本后提供了REST management API,并且提供了两个wget的参考例子,其中一个例子:
  wget --user admin --password admin --header "Origin: http://localhost" --auth-no-challenge http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost
  用浏览器请求例子里的url,返回内容列出了ActiveMQ所有主题信息,根据返回的内容,可推断出请求主题信息的url应该是: http://localhost:8161/api/jolokia/read/org.apache.activemq:brokerName=localhost,destinationName=TopicName,destinationType=Topic,type=Broker
  api找到了,接下就是请求api,然后获取对应的数据,代码如下:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Yujichang

import requests
import base64
import logging
import logging.handlers


class MonitorLog(object):
    def __init__(self):
        self.logger = logging.getLogger("monitorLog")

        stream_handler = logging.StreamHandler()
        file_handler = logging.handlers.TimedRotatingFileHandler("activemq_monitor_topic.log", when="D", interval=1, backupCount=7)
        file_handler.suffix = "%Y-%m-%d.log"

        self.logger.setLevel(logging.INFO)
        stream_handler.setLevel(logging.INFO)
        file_handler.setLevel(logging.INFO)

        formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
        stream_handler.setFormatter(formatter)
        file_handler.setFormatter(formatter)

        self.logger.addHandler(stream_handler)
        self.logger.addHandler(file_handler)

    def info(self, msg):
        self.logger.info(msg)

    def warning(self, msg):
        self.logger.warning(msg)

    def error(self, msg):
        self.logger.error(msg)

    def critical(self, msg):
        self.logger.critical(msg)


def get_topic_info(mq_ip, broker_name, topic_name, basic_auth):

    """
    获取主题的队列信息,主要获取队列未消费数(QueueSize)、入队数(EnqueueCount)、出队数(DequeueCount)
    """

    # activemq 默认将 : 转换为 _
    topic_name = topic_name.replace(':', '_')

    url = "http://{0}:8161/api/jolokia/read/org.apache.activemq:brokerName={1},destinationName={2}," \
          "destinationType=Topic,type=Broker".format(mq_ip, broker_name, topic_name)
    headers = {
        'authorization': "Basic {}".format(basic_auth),
        'cache-control': "no-cache",
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/89.0.4389.82 Safari/537.36'
    }

    try:
        response = requests.get(url, headers=headers, timeout=5)
    except:
        return 500, (0, 0, 0)

    if response.json()['status'] == 200:
        data = (response.json()['value']['QueueSize'],
                response.json()['value']['EnqueueCount'],
                response.json()['value']['DequeueCount']
                )
        return 200, data

    return 500, (0, 0, 0)


if __name__ == '__main__':
    ip = 'localhost'
    broker = 'localhost'
    topic = 'test'

    user = 'admin'
    password = 'admin'
    basic_auth = base64.b64encode('{0}:{1}'.format(user, password).encode()).decode()

    log = MonitorLog()
    status, data = get_topic_info(ip, broker, topic, basic_auth)

    log.info('主题(topic): {0}, 入队总数(EnqueueCount): {1}, 出队总数(DequeueCount): {2}, '
             '未消费总数(QueueSize): {3}'.format(topic, data[1], data[2], data[0]))

  最后将代码放到服务器上使用crontab每分钟执行,观察一段时间主题的消费情况。