昨天收到一个任务,有一个连接到ActiveMQ的生产者程序时不时崩溃,对方怀疑是主题消费速度太慢,消息堆积,导致程序的崩溃,让我这边监控一下主题的消费情况。虽然我觉得不太可能是这个原因,但是要求到还是监控一下。
ActiveMQ提供了一个admin的管理页面,页面有主题的出队、入队数据,不过要获取这里的数据就比较麻烦,需要爬取页面。于是查找ActiveMQ是否提供了api可以获取到这样的数据,在官网http://activemq.apache.org/rest.html上看,5.8版本后提供了REST management API,并且提供了两个wget的参考例子,其中一个例子:
wget --user admin --password admin --header "Origin: http://localhost" --auth-no-challenge http://localhost:8161/api/jolokia/read/org.apache.activemq:type=Broker,brokerName=localhost
用浏览器请求例子里的url,返回内容列出了ActiveMQ所有主题信息,根据返回的内容,可推断出请求主题信息的url应该是: http://localhost:8161/api/jolokia/read/org.apache.activemq:brokerName=localhost,destinationName=TopicName,destinationType=Topic,type=Broker
api找到了,接下就是请求api,然后获取对应的数据,代码如下:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author: Yujichang
import requests
import base64
import logging
import logging.handlers
class MonitorLog(object):
def __init__(self):
self.logger = logging.getLogger("monitorLog")
stream_handler = logging.StreamHandler()
file_handler = logging.handlers.TimedRotatingFileHandler("activemq_monitor_topic.log", when="D", interval=1, backupCount=7)
file_handler.suffix = "%Y-%m-%d.log"
self.logger.setLevel(logging.INFO)
stream_handler.setLevel(logging.INFO)
file_handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(name)s %(levelname)s %(message)s")
stream_handler.setFormatter(formatter)
file_handler.setFormatter(formatter)
self.logger.addHandler(stream_handler)
self.logger.addHandler(file_handler)
def info(self, msg):
self.logger.info(msg)
def warning(self, msg):
self.logger.warning(msg)
def error(self, msg):
self.logger.error(msg)
def critical(self, msg):
self.logger.critical(msg)
def get_topic_info(mq_ip, broker_name, topic_name, basic_auth):
"""
获取主题的队列信息,主要获取队列未消费数(QueueSize)、入队数(EnqueueCount)、出队数(DequeueCount)
"""
# activemq 默认将 : 转换为 _
topic_name = topic_name.replace(':', '_')
url = "http://{0}:8161/api/jolokia/read/org.apache.activemq:brokerName={1},destinationName={2}," \
"destinationType=Topic,type=Broker".format(mq_ip, broker_name, topic_name)
headers = {
'authorization': "Basic {}".format(basic_auth),
'cache-control': "no-cache",
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/89.0.4389.82 Safari/537.36'
}
try:
response = requests.get(url, headers=headers, timeout=5)
except:
return 500, (0, 0, 0)
if response.json()['status'] == 200:
data = (response.json()['value']['QueueSize'],
response.json()['value']['EnqueueCount'],
response.json()['value']['DequeueCount']
)
return 200, data
return 500, (0, 0, 0)
if __name__ == '__main__':
ip = 'localhost'
broker = 'localhost'
topic = 'test'
user = 'admin'
password = 'admin'
basic_auth = base64.b64encode('{0}:{1}'.format(user, password).encode()).decode()
log = MonitorLog()
status, data = get_topic_info(ip, broker, topic, basic_auth)
log.info('主题(topic): {0}, 入队总数(EnqueueCount): {1}, 出队总数(DequeueCount): {2}, '
'未消费总数(QueueSize): {3}'.format(topic, data[1], data[2], data[0]))
最后将代码放到服务器上使用crontab每分钟执行,观察一段时间主题的消费情况。