Listen to OpenStack Notification
2016-09-30 by muzi在许多应用场景下,需要监听OpenStack的消息来做一些操作,从而实现事件驱动/消息驱动的业务。本文将介绍如何使用kombu库来监听OpenStack的消息,包括neutron,nova等相关类型的notification。
Kombu, AMQP, RabbitMQ
Kombu是Python的消息库,封装来许多消息的报文,支持包括AMQP等多种消息协议。而在OpenStack端,Notification的发布系统由RabbitMQ实现。为了监听OpenStack发出的Notification, 我们需要在本地用Kombu库建立一个connection, 连接到OpenStack的消息发布系统。
Terminology
在学习过程中,会遇到Exchange, Queue等术语,此处将简要介绍这些概念:
-
Producers
消息生产者,产生消息,并发送到交换器。
-
Exchanges
消息交换器,接受生产者发送过来的消息,根据对应的routing_key,来将消息路由到对应的队列。
-
Queues
队列接收来自交换器发来的消息,队列由消费者定义,自然也为消费者使用,用于存储消息。
-
Consumers
消费者从队列中读取消息,并进行处理。消费者声明和定义队列,并将队列绑定到对应的exchange上。
-
Routing keys
每一种消息都有路由键(routing_key),可以被exchange用来判定如何路由消息到对应的队列。根据交换的类型不用,routing_key的解析过程不同。
Exchange type
AMQP协议中主要定义了3种exchange type,包括:
-
Direct exchange
根据routing_key的值,将匹配成功的消息发送到指定的队列。
-
Fan-out exchange
将消息发送到所有队列,和交换机的flood操作类似。
-
Topic exchange
根据给定topic以及匹配规则来实现消息的路由。比如匹配的pattern为*.muzixing.#, 则hello.muzixing.info匹配成功,而muzixing.info匹配失败。

Kombu Example
首先,需要先安装Kombu。安装之后,可以通过以下的示例代码来连接到OpenStack。注意需要将user,pwd,host和port修改成对应的OpenStack消息服务器的用户名,登陆密码,ip地址和传输层端口号。 完成之后,运行该python文件,即可监听OpenStack的通知。
from kombu import Queue, Exchange
from kombu.log import get_logger
from kombu.mixins import ConsumerMixin
logger = get_logger(__name__)
class Worker(ConsumerMixin):
event_queues = [
Queue('notification.nova',
Exchange('nova', 'topic', durable=False),
durable=False, routing_key='#'),
Queue('notifications.neutron',
Exchange('neutron', 'topic', durable=False),
durable=False, routing_key='#')
]
def __init__(self, connection):
self.connection = connection
def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.event_queues,
accept=['json'],
callbacks=[self.process_task])]
def process_task(self, body, message):
print("Receive message: %r" % (body, ))
message.ack()
if __name__ == '__main__':
from kombu import Connection
from kombu.utils.debug import setup_logging
# setup root logger
setup_logging(loglevel='DEBUG', loggers=[''])
connect_url = 'amqp://' + user + ':' + pwd + '@' + host + ':' + port + '//'
with Connection(connect_url) as conn:
try:
print(conn)
worker = Worker(conn)
worker.run()
except KeyboardInterrupt:
print('Stopped')
以上示例代码中有两个地方需要注意。首先是需要将用户名等信息修改正确,其次是Queue的定义。在Worker类中,定义了event_queues列表,列表中是对应的Queue,用来接收️Notification。为了接收nova的信息,需要构造一个Exchange instance作为Queue的参数,其中第一个参数‘nova’是exchange的名字,代表着这个队列将绑定到nova的消息exchange上。同样的,为了接受neutron的消息,我们还定义了另一个队列,队列绑定到了名字叫‘neutron’的exchange上。同理,若希望绑定到对应的exchange,继续添加Queue即可。Routing的参数类型这里设置为topic, durable参数表示消息数据的持久化特性。routing_key则是路由的键值。此处接受所有来自对应名称exchange的消息。event_queue将作为Consumer类初始化实例的参数,用于实例化消费者。
class Worker(ConsumerMixin):
event_queues = [
Queue('notification.nova',
Exchange('nova', 'topic', durable=False),
durable=False, routing_key='#'),
Queue('notifications.neutron',
Exchange('neutron', 'topic', durable=False),
durable=False, routing_key='#')
]
总结
OpenStack目前在云环境中应用十分广泛,是非常值得喜欢云计算和SDN的同学去学习和研究的。作为一个大型的项目,OpenStack采用了AMQP来分发事件。作者在工作过程中需要使用OpenStack的事件,因此总结来这一篇文章。特别感谢谷歌给予的大力支持,没有谷歌我就查不到解决问题的正确姿势[1]。希望能给读者带来一些帮助。
References
[1] "Listen to OpenStack Neutron Messages from RabbitMQ using Kombu messaging library", http://thetaooftech.blogspot.com/2014/04/listen-to-openstack-neutron-messages.html