RYU学习:eventlet

2014-12-10 by muzi

前言

从OpenDaylight转到RYU以来一直都没有机会好好学习RYU的源码,只学会了编写简单的Application。但是如果要熟悉一个控制器,就要熟悉它的运行原理,熟悉它数据结构,熟悉它的设计模式等等。最近终于有时间好好看RYU的代码,但在看代码的过程中却发现RYU并不简单,其编码风格也非常优雅,非常值得学习。本篇博文主要讲述RYU中使用到的eventlet。

从RYU开始

运行ryu的时候,命令是:ryu-manager app.py。第一个要找到就是ryu-manager到底会触发什么程序。在/cmd中没有找到之后,在/bin中找到了两个可执行文件:ryu和ryu-manager。打开ryu-manager,显示如下:

from ryu.cmd.manager import main
main()

找到/ryu/cmd/manager.py,发现这个文件中的main()函数是整个ryu的入口函数。

def main(args=None, prog=None):
    try:
        CONF(args=args, prog=prog,
             project='ryu', version='ryu-manager %s' % version,
             default_config_files=['/usr/local/etc/ryu/ryu.conf'])
    except cfg.ConfigFilesNotFoundError:
        CONF(args=args, prog=prog,
             project='ryu', version='ryu-manager %s' % version)

    log.init_log()

    if CONF.pid_file:
        import os
        with open(CONF.pid_file, 'w') as pid_file:
            pid_file.write(str(os.getpid()))

    app_lists = CONF.app_lists + CONF.app
    # keep old behaivor, run ofp if no application is specified.
    if not app_lists:
        app_lists = ['ryu.controller.ofp_handler']

    app_mgr = AppManager.get_instance()
    app_mgr.load_apps(app_lists)
    contexts = app_mgr.create_contexts()
    services = []
    services.extend(app_mgr.instantiate_apps(**contexts))

    webapp = wsgi.start_service(app_mgr)
    if webapp:
        thr = hub.spawn(webapp)
        services.append(thr)

    try:
        hub.joinall(services)
    finally:
        app_mgr.close()

这个main()函数的内容主要是完成了RYU的初始化配置和启动。Configure使用了oslo,这个在后续的博文中应该会提到。初始化的构成主要包括将app_list里面的内容加入App_Manager的列表中,然后开启协程去协调这些APP完成工作。hub是from ryu.lib import hub的。继续查看ryu/lib/hub.py。最终找到许多关于eventlet的内容。在hub.py中定义了Event,StreamServer和WSGIServer等类,还有一些重要的重要函数如spawn()等。为了更好地学习RYU,学习coroutine和eventlet就非常有必要了。

Coroutine

协程[coroutine]是一个程序组件。相比subroutine, coroutine更一般。coroutine相对与thread而言,又不一样。thread是资源抢占式的存在,而coroutine是通过yield来转移执行权,协程之间是平等的,没有等级关系。multi-thread一旦开始运行,就无法确定某一时刻到底是哪一个thread在占用cpu,临界资源也要加互斥锁。而coroutine则是需要程序员自己决定程序如何运行,同时也需要自己负责程序的风险。协程和线程一样,只共享堆,不共享栈。

Eventlet

eventlet是一个可以提供高性能并发处理能力的python库。我们可以在/usr/lib/python2.7/dist-packages/eventlet中找到对应的文件。

Installation

pip install eventlet

Examples

为了更好的理解eventlet的内容,我花了半天认真地抄了一遍官网的例子。具体实例举例如下。

GreenPile

"""Spawn multiple workers and collect their results.

Demonstrates how to use the eventlet.green.socket module.
"""

import eventlet
from eventlet.green import socket


def geturl(url):
    con = socket.socket()
    ip = socket.gethostbyname(url)
    con.connect((ip, 80))
    print('%s connected' % url)
    con.sendall('GET /\r\n\r\n')
    return con.recv(1024)


urls = ['www.muzixing.com', 'www.baidu.com', 'www.python.org']
pile = eventlet.GreenPile()
for x in urls:
    pile.spawn(geturl, x)

for url, result in zip(urls, pile):
    print('%s: %s' % (url, repr(result)[:100]))

以上的代码对指定url发送了GET请求。重点在与eventlet.GreenPile()的使用。GreenPile类源码如下所示:

class GreenPile(object):
    def __init__(self, size_or_pool=1000):
        if isinstance(size_or_pool, GreenPool):
            self.pool = size_or_pool
        else:
            self.pool = GreenPool(size_or_pool)
        self.waiters = queue.LightQueue()
        self.used = False
        self.counter = 0

    def spawn(self, func, *args, **kw):
        """Runs *func* in its own green thread, with the result available by
        iterating over the GreenPile object."""
        self.used =  True
        self.counter += 1
        try:
            gt = self.pool.spawn(func, *args, **kw)
            self.waiters.put(gt)
        except:
            self.counter -= 1
            raise

    def __iter__(self):
        return self

    def next(self):
        """Wait for the next result, suspending the current greenthread until it
        is available.  Raises StopIteration when there are no more results."""
        if self.counter == 0 and self.used:
            raise StopIteration()
        try:
            return self.waiters.get().wait()
        finally:
            self.counter -= 1

从__init__函数可以看出,GreenPile内部有一个GreenPool对象和一个Queue对象:waiters。GreenPool的作用相当与线程池的作用,这点后续会继续介绍。上述例子用到的spawn函数完成了协程(被称之为green thread)的启动。可以看出spawn函数的参数是(函数,参数),在上述例子中为: pile.spawn(geturl, x)。从spawn函数中,也可以看出spawn()方法的返回值被保存在waiters队列中。next()方法的实现使其具有迭代性质。

GreenPool

下面的例子使用到了GreenPool类,完成了一个非常暴力的迭代爬虫,理论上,如果你让他去爬取某一个网站,然后不去管它,它会从这个网站出发,找到所有的链接,然后跳到各自的链接,然后继续迭代,直到最后把整个互联网的网站都爬一遍。而且,它不尊重你网站的robot.txt,这意味这它什么都会爬取。

from __future__ import with_statement
from eventlet.green import urllib2
import eventlet
import re


# http://daringfireball.net/2009/11/liberal_regex_for_matching_urls
url_regex = re.compile(r'\b(([\w-]+://?|www[.])[^\s()<>]+(?:\([\w\d]+\)|([^[:punct:]\s]|/)))')


def fetch(url, seen, pool):
    '''Fetch A url, stick any found urls into the seen set,
    and dispatch any new  ones to te pool.'''
    print "fetching", url
    data = ''
    with eventlet.Timeout(5, False):
        data = urllib2.urlopen(url).read()
    for url_match in url_regex.finditer(data):
        new_url = url_match.group(0)
        # You can only send requests to muzixing.com so as not to destroy internet
        if new_url not in seen:  # and ’muzixing.com' in new_url:
            seen.add(new_url)
            # While this seems stack-recursive, it is actually not.
            # Spawned greenthreads start their own stacks
            pool.spawn_n(fetch, new_url, seen, pool)


def crawl(start_url):
    '''Recrusively crawl starting from *start_url*.Return a set of
    urls that were found.
    '''
    pool = eventlet.GreenPool()
    seen = set()
    fetch(start_url, seen, pool)
    pool.waitall()
    return seen

seen = crawl("http://www.muzixing.com")
print "I saw there urls:", seen
# print '\n'.join(seen)

首先爬虫从http://www.muzixing.com网站开始搜索url。然后继续迭代寻找url,不断扩大查找范围。实验结果如下所示:

图1:迭代爬虫显示信息

从上图可以看到爬虫抓取了www.muzixing.com的网页中存在的url如http://ikimi.net,然后我们可以看到爬虫又跳到了http://ikimi.net上爬取页面的其他url,如http://www.ikimi.net/wp-includes。如果将起始页面换成bbs.byr.cn会发现爬虫会以更快的速度在整个互联网蔓延开来!

上述例子中可以学习到GreenPool类的使用。GreenPool可以类比于线程池,这有利于理解。在GreenPool中的元素都是GreenThread。其中最重要的函数是spawn/spawn_n函数。

def spawn(self, function, *args, **kwargs):
"""Run the *function* with its arguments in its own green thread.
        Returns the :class:`GreenThread <eventlet.greenthread.GreenThread>`
        object that is running the function, which can be used to retrieve the
        results.
"""

该函数启动了一个GreenThread,参数是需要执行的function和function对应的参数。返回值是执行该函数的GreenThread类。

def spawn_n(self, function, *args, **kwargs):
        """Create a greenthread to run the *function*, the same as
        :meth:`spawn`.  The difference is that :meth:`spawn_n` returns
        None; the results of *function* are not retrievable.
        """

spawn_n函数功能上差不多,只是返回的是None。其他的函数举例简单说明如下:

  • waitall():等待所有greenthread执行完毕。

  • running(): 返回目前正在执行的greenthread。

  • imap():从迭代器中取出数据項作为func的参数去执行,并返回结果。

  • starmap(): 和imap类似,但是取参数的方式有所差异。从openstack nova 基础知识——eventlet中摘取举例如下:

    imap(pow, (2,3,10), (5,2,3)) --> 32 9 1000
    starmap(pow, [(2,5), (3,2), (10,3)]) --> 32 9 1000
    
  • free(): 返回当前可获取的greenthread的数目。

以上代码上的with语句是python中的一个非常方便的关键字。使用with关键字可以让代码更严谨且简洁。其封装了__enter__()函数和__exit__()函数,用于执行信息和退出处理。其等价于以下代码:

try:
    __enter__()
finally:
    __exit__()

上述是关于GreenPool类的使用案例,使用该类可以高效完成并发操作。

Convenience

接下来再介绍一个更好玩的程序,多人群聊程序,可以让我们在学习eventlet的时候充满成就感。代码如下:

import eventlet
from eventlet.green import socket

PORT = 3001
participants = set()


def read_chat_forever(writer, reader, address):
    line = reader.readline()
    while line:
        print('Chat:', line.strip())
        for p in participants:
            try:
                if p is not writer:  # Don't echo
                    msg = address[0] + ':'
                    msg += line
                    p.write(msg)
                    p.flush()
            except socket.error as e:
                # ignore broken pipes, they just mean the participant
                # closed its connection already
                if e[0] != 32:
                    raise
        line = reader.readline()
    participants.remove(writer)
    print("participant left chat")


try:
    print("ChatServer starting up on port %s" % PORT)
    server = eventlet.listen(('0.0.0.0', PORT))
    while True:
        new_connection, address = server.accept()
        print("Participant joined chat.")
        new_writer = new_connection.makefile('w')
        participants.add(new_writer)
        eventlet.spawn_n(
            read_chat_forever,
            new_writer,
            new_connection.makefile('r'),
            address)
except (KeyboardInterrupt, SystemExit):
    print("ChatServer exiting")

try语句块中完成了服务端socket的建立和监听。然后在while循环中完成了消息的处理。

首先关注第一个函数:eventlet.listen((addr,port))。在eventlet文件夹中,打开__init__文件可以查看到一些为了方便而初始化的定义,举例如下:

version_info = (0, 9, 16)
__version__ = ".".join(map(str, version_info))

try:
    from eventlet import greenthread
    from eventlet import greenpool
    from eventlet import queue
    from eventlet import timeout
    from eventlet import patcher
    from eventlet import convenience
    import greenlet



    GreenPool = greenpool.GreenPool
    GreenPile = greenpool.GreenPile

    Queue = queue.Queue

    import_patched = patcher.import_patched
    monkey_patch = patcher.monkey_patch

    connect = convenience.connect
    listen = convenience.listen
    serve = convenience.serve
    StopServe = convenience.StopServe
    wrap_ssl = convenience.wrap_ssl

所以我们直接可以使用eventlet.listen调用convenience.listen函数。listen函数完成了一个server socket的绑定和监听。

def listen(addr, family=socket.AF_INET, backlog=50):
    sock = socket.socket(family, socket.SOCK_STREAM)
    if sys.platform[:3] != "win":
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(addr)
    sock.listen(backlog)
    return sock

socket.accept()函数将返回一个(connection,address)元组。socket.makefile([mode[, bufsize]])返回一个文件对象用于读写缓存。

eventlet.spawn_n函数将read_chat_forever函数及其三个参数作为参数,创建GreenThread去执行任务。eventlet主要完成的工作就是帮助你如何去协调你的任务,而不是去实现你的任务,这一点在这里得到体现。其实对比于线程池就容易理解读多了。

试验结果截图如下:

图2:多人群聊server运行界面

图3:多人群聊client运行界面

从client运行界面可以看出不同的用户发送的信息会以IP:message的形式展示出来,代码很简单,但是非常有趣。

以上例子均可以在官网找到,读者可以到官网去查看更多案例。

Patcher

Patch是eventlet中的一个重要模块。用于替换系统自带的模块。其中有import_patched和monkey_patch两个函数,后者可以提供运行时替换。具体例子可以查看openstack nova 基础知识——eventlet

回到RYU

前两行代码调用了hub.patch()函数,查看hub.py中发现patch = eventlet.monkey_patch,实现了运行时替换模块。

from ryu.lib import hub
hub.patch()

接下来的CONF文件由于oslo的内容比较多,所以会在后续博文中详细介绍。首先关注main()函数的主要内容。

def main(args=None, prog=None):
    try:
        CONF(args=args, prog=prog,
             project='ryu', version='ryu-manager %s' % version,
             default_config_files=['/usr/local/etc/ryu/ryu.conf'])
    except cfg.ConfigFilesNotFoundError:
        CONF(args=args, prog=prog,
             project='ryu', version='ryu-manager %s' % version)

    log.init_log()

    if CONF.pid_file:
        import os
        with open(CONF.pid_file, 'w') as pid_file:
            pid_file.write(str(os.getpid()))

    app_lists = CONF.app_lists + CONF.app
    # keep old behaivor, run ofp if no application is specified.
    if not app_lists:
        app_lists = ['ryu.controller.ofp_handler']

    app_mgr = AppManager.get_instance()
    app_mgr.load_apps(app_lists)
    contexts = app_mgr.create_contexts()
    services = []
    services.extend(app_mgr.instantiate_apps(**contexts))

    webapp = wsgi.start_service(app_mgr)
    if webapp:
        thr = hub.spawn(webapp)
        services.append(thr)

    try:
        hub.joinall(services)
    finally:
        app_mgr.close()

从CONF文件中取出app信息,存在app_lists内,若没有启动其他app,则默认启动ofp_handler应用,用于处理基础的事件,如协议协商等。然后声明一个AppManager的类,用于初始化和管理APP。load_apps函数完成了APP的加载。最后try语句块中的joinall()使得进程需要等待所有的services完成之后才能退出。至此RYU初始运行学习完成,后续的博文将分别介绍:oslo, 事件处理机制,RYUAPP类以及RYU数据结构和API使用等内容。

后语

Evenlet是个不错的python库,简单却很高效。相比于thread,coroutine的行为是可控的,切换成本也要更小。在单核情况下,coroutine要比thread开销小,但是multithread可以在多CPU的情况下发挥更大的能力。RYU是使用Python编写的控制器,比同样使用Python编写的POX,无论从代码的规范,优雅度,还是从性能上,都有很大的优势,此外,这个纯SDN控制器对OpenFlow协议的支持可以说是最稳定,最全面的。虽然我还会继续研究ONOS,学习大型分布式框架。但是RYU会成为我开发Application的利器。相比之下,Java编写的ODL,过于复杂和不稳定。新生儿ONOS相比之下用户体验更好,且没有使用YANG,大大降低了学习难度。周一的时候,还在Docker中安装了ONOS,并使用Cbench测试对比了ONOS和RYU的吞吐量。同样环境下,单节点的ONOS性能几乎是RYU的两倍,这让我有些忧伤。也许匕首只适合敏捷作战,而大刀才是开疆扩土的利器吧。


Comments