RYU中WSGI学习笔记与RESTAPI开发
2015-05-13 by muzi另一篇博文中已经介绍如何使用RYU的RESTAPI,本篇将继续介绍相关内容,主要分为WSGI学习总结和以ofctl_rest.py为例的RESTAPI的实现与内部机制。由于第一次学习WSGI,还有许多地方不是特别理解,所学知识均来自Google。文中若有错误之处,敬请指出,谢谢。
WSGI
Web服务器网关接口(Python Web Server Gateway Interface,缩写为WSGI)是为Python语言定义的Web服务器和Web应用程序或框架之间的一种简单而通用的接口。为了理解WSGI,可以尝试一下的小例子。
from cgi import parse_qs
from cgi import escape
import logging
def hello_world(environ, start_response):
parameters = parse_qs(environ.get('QUERY_STRING', ''))
if 'subject' in parameters:
subject = escape(parameters['subject'][0])
else:
subject = 'World.'
start_response('200 OK', [('Context-Type', 'text/html')])
return ['''Hello %(subject)s
Hello %(subject)s!''' %{'subject': subject}]
if __name__ == '__main__':
from wsgiref.simple_server import make_server
IP = 'localhost'
port = 8080
server = make_server(IP, port, hello_world)
logging.basicConfig(level=logging.INFO)
LOG = logging.getLogger('wsgi')
LOG.info('listening on %s: %d'%(IP, port))
server.serve_forever()
运行之后,在浏览器地址栏输入:
http://localhost:8080/?subject=muzixing.com
可以观察到浏览器输出:
Hello muzixing.com
Hello muzixing.com!
在写这个小例子的时候,我遇到一个让我非常疑惑的地方,函数hello_world的参数在哪里赋值?为什么网上的例子参数都是environ和start_response,难道这两个名字是特殊的?在运行时会默认已经被赋值?经过一系列谷歌以及查看源码之后,我终于还是没有搞明白。
这不科学!!
于是我尝试修改一下形参的名字,果然,还是可以运行的。这验证了这并不是特殊的名字,那么只有一种可能就是在创建对象的时候,已经给赋值了。那么输出一下这两个变量的内容是不错的尝试。尝试之后发现后者是一个对象,前者是一些列的内容。这验证了谷歌出来的各种说法:environ和start_response,environ是一个字典包含了CGI中的环境变量,start_response也是一个callable,接受两个必须的参数,status(HTTP状态)和response_headers(响应消息的头)。而这个赋值过程并不需要开发者去开发,在初始化时已经完成赋值。
为了进一步验证想法,找到了ryu使用的eventlet相关的文件:/usr/lib/python2.7/dist-packages/eventlet/wsgi.py。在这个文件中定义了class HttpProtocol(BaseHTTPServer.BaseHTTPRequestHandler)。在该类中定义了函数: handle_one_request和handle_one_response。在handle_one_request函数中初始化了如下两个重要的内容(line:227):
self.environ = self.get_environ()
self.application = self.server.app
在handle_one_request函数中还调用了handle_one_response。在handle_one_response函数中,定义了函数start_response。查看代码时,终于发现了一句极为重要的语句(line:336):
result = self.application(self.environ, start_response)
start_restart函数在这句语句之前有定义(line:316)。至此,终于明白,为什么没有给形参赋值,实际上,这都是背后的故事。
wsgi.py文件中定义了Server类,用于开启一个服务端socket,处理socket通信。文件中还定义了一个重要的接口函数:server。server函数主要完成了功能是启动一个wsgi server去处理来自客户端的请求。启动之后将永久循环,直到被关闭。
Start up a wsgi server handling requests from the supplied server
socket. This function loops forever. The *sock* object will be closed after server exits,
but the underlying file descriptor will remain open, so if you have a dup() of *sock*,
it will remain usable.
在RYU中,同样也有一个wsgi.py文件。该文件定义了一系列的WSGI的类,用于实现WSGI,为webapp提供支持。此外,hub.py文件中也有对应的内容,这些内容的分析将在下一小节ofctl_rest模块进行分析。
Ofctl_rest.py
在ofctl_rest.py文件中定义了class RestStatsApi(app_manager.RyuApp)和class StatsController(ControllerBase)。其中class RestStatsApi(app_manager.RyuApp)是一个RYUAPP模块,实现了 statistic相关的相关RESTAPI; class StatsController则是具体的API运行实体。class RestStatsApi(app_manager.RyuApp)部分源码如下:
class RestStatsApi(app_manager.RyuApp):
OFP_VERSIONS = [ofproto_v1_0.OFP_VERSION,
ofproto_v1_2.OFP_VERSION,
ofproto_v1_3.OFP_VERSION]
_CONTEXTS = {
'dpset': dpset.DPSet,
'wsgi': WSGIApplication
}
def __init__(self, *args, **kwargs):
super(RestStatsApi, self).__init__(*args, **kwargs)
self.dpset = kwargs['dpset']
wsgi = kwargs['wsgi']
self.waiters = {}
self.data = {}
self.data['dpset'] = self.dpset
self.data['waiters'] = self.waiters
mapper = wsgi.mapper
wsgi.registory['StatsController'] = self.data
path = '/stats'
uri = path + '/switches'
OFP_VERSIONS指的是支持的OpenFlow的协议版本。 _CONTEXTS部分的内容是依赖的模块。dpset模块的DPSet类是一个RYUAPP类,会被当作一个service启动。DPSet类中主要完成了datapath链接的管理,比如dps字典内容的构建,交换机端口的信息收集,以及负责switches_features和port_status等消息的处理。具体细节,读者可自行查看/ryu/controller/dpset.py文件。wsgi模块负责完成了 请求路由的功能,相信内容直接查看wsgi.py介绍部分,此处不赘述。_CONTEXT中的'wsgi'值为WSGIApplication,所以在启动的时候需要将其作为RYU service启动。然而,这只是一个APPLICATION的类,启动WSGIServer另有玄机。很难发现到底在哪里启动了WSGIServer模块,我们只能在wsgi.py中找到一个没有被本文件使用的全局函数:start_service(app_mgr),一切线索似乎都断了。
山重水复疑无路,柳暗花明又一村。
这么好玩的源码解析,怎么能就这么结束了呢。从参数中我们发现app_mgr,难道!记忆深处,还记得大明湖畔的那个启动函数:
def main(args=None, prog=None):
'''
more code
'''
app_lists = CONF.app_lists + CONF.app
# keep old behaivor, run ofp if no application is specified.
if not app_lists:
app_lists = ['ryu.controller.ofp_handler']
app_mgr = AppManager.get_instance()
app_mgr.load_apps(app_lists)
contexts = app_mgr.create_contexts()
services = []
services.extend(app_mgr.instantiate_apps(**contexts))
webapp = wsgi.start_service(app_mgr) //here is the point!!!
if webapp:
thr = hub.spawn(webapp)
services.append(thr)
try:
hub.joinall(services)
finally:
app_mgr.close()
从代码中可以看到,启动RYU的时候,肯定会执行wsgi.start_service()函数:
def start_service(app_mgr):
for instance in app_mgr.contexts.values():
if instance.__class__ == WSGIApplication:
return WSGIServer(instance)
return None
返回了WSGIServer(WSGIServer instance)的对象,该对象作为一个模块在RYU中得到启动。class WSGIServer类的基类是hub.WSGIServer类,终于我们找到了在hub.WSGIServer中找到了eventlet.wsgi的实例。
#wsgi.WSGIServer
class WSGIServer(hub.WSGIServer):
def __init__(self, application, **config):
super(WSGIServer, self).__init__((CONF.wsapi_host, CONF.wsapi_port),
application, **config)
def __call__(self):
self.serve_forever()
#hub.WSGIServer
class WSGIServer(StreamServer):
def serve_forever(self):
self.logger = LoggingWrapper()
eventlet.wsgi.server(self.server, self.handle, self.logger)
至此函数调用链终于被发现。函数调用举例: ofctl_rest.py模块被运行,RestStatsApi被加载之前_CONTEXT的内容被当作service加载。启动RYU时,调用wsgi.start_service函数,因为WSGIApplication放到了app_list内,所以判断WSGIApplication成功,将WSGIServer加载。至此WSGIServer和WSGIApplication以及其他模块加载完成。
在WSGIApplication类中使用到了routes模块的Mapper和URLGenerator,前者用于URL的路由,后者用于URL的产生。RYU运行之后,WSGIServer负责完成请求到APPlication的分发。WSGIApplication收到请求之后,通过mapper,将对应的请求分发给制定的处理函数。处理函数解析请求,并回复请求。mapper在初始化的时候,添加的connect规则如下:
path='stats'
uri = path + '/flow/{dpid}'
mapper.connect('stats', uri,
controller=StatsController, action='get_flow_stats',
conditions=dict(method=['GET', 'POST']))
映射的分类属于stats分类,或者路径为stats。uri为/stats/flow/{dpid},dpid数值将在请求中被实例化为某一数值。交给的controller是StatsController,action是该类的get_flow_stats函数,请求的类型是GET或者POST,具体种类由请求明确。get_flow_stats函数具体如下:
def get_flow_stats(self, req, dpid, **_kwargs):
if req.body == '':
flow = {}
else:
try:
flow = ast.literal_eval(req.body)
except SyntaxError:
LOG.debug('invalid syntax %s', req.body)
return Response(status=400)
if type(dpid) == str and not dpid.isdigit():
LOG.debug('invalid dpid %s', dpid)
return Response(status=400)
dp = self.dpset.get(int(dpid))
if dp is None:
return Response(status=404)
_ofp_version = dp.ofproto.OFP_VERSION
_ofctl = supported_ofctl.get(_ofp_version, None)
if _ofctl is not None:
flows = _ofctl.get_flow_stats(dp, self.waiters, flow)
else:
LOG.debug('Unsupported OF protocol')
return Response(status=501)
body = json.dumps(flows)
return Response(content_type='application/json', body=body)
函数获取了req之后,进行解析。先获取了flow的信息,然后在调用_ofctl.get_flow_stats(dp, self.waiters, flow)函数获取到了flow的统计信息,然后使用json格式编码,最后返回一个Response.Response是webob的模块的一个类,用于返回一个WSGI的回应。详情可以查看webob文档。最后我们就可以在网页上查看到我们获取的信息了。
The webob.Response object contains everything necessary to make a WSGI response. Instances of it are in fact WSGI applications, but it can also represent the result of calling a WSGI application (as noted in Calling WSGI Applications). It can also be a way of accumulating a response in your WSGI application.
至此RYU中以ofctl_rest.py为例子的REST相关源码分析结束。
开发RESTAPI
本部分内容将以RYUBOOK上的一个简单案例介绍如何在RYU上开发RESTAPI。更多详细的内容大家可以点击原链接查看。
import json
import logging
from ryu.app import simple_switch_13
from webob import Response
from ryu.controller import ofp_event
from ryu.controller.handler import CONFIG_DISPATCHER
from ryu.controller.handler import set_ev_cls
from ryu.app.wsgi import ControllerBase, WSGIApplication, route
from ryu.lib import dpid as dpid_lib
simple_switch_instance_name = 'simple_switch_api_app'
url = '/simpleswitch/mactable/{dpid}'
class SimpleSwitchRest13(simple_switch_13.SimpleSwitch13):
_CONTEXTS = { 'wsgi': WSGIApplication }
def __init__(self, *args, **kwargs):
super(SimpleSwitchRest13, self).__init__(*args, **kwargs)
self.switches = {}
wsgi = kwargs['wsgi']
wsgi.register(SimpleSwitchController, {simple_switch_instance_name : self})
@set_ev_cls(ofp_event.EventOFPSwitchFeatures, CONFIG_DISPATCHER)
def switch_features_handler(self, ev):
super(SimpleSwitchRest13, self).switch_features_handler(ev)
datapath = ev.msg.datapath
self.switches[datapath.id] = datapath
self.mac_to_port.setdefault(datapath.id, {})
def set_mac_to_port(self, dpid, entry):
mac_table = self.mac_to_port.setdefault(dpid, {})
datapath = self.switches.get(dpid)
entry_port = entry['port']
entry_mac = entry['mac']
if datapath is not None:
parser = datapath.ofproto_parser
if entry_port not in mac_table.values():
for mac, port in mac_table.items():
# from known device to new device
actions = [parser.OFPActionOutput(entry_port)]
match = parser.OFPMatch(in_port=port, eth_dst=entry_mac)
self.add_flow(datapath, 1, match, actions)
# from new device to known device
actions = [parser.OFPActionOutput(port)]
match = parser.OFPMatch(in_port=entry_port, eth_dst=mac)
self.add_flow(datapath, 1, match, actions)
mac_table.update({entry_mac : entry_port})
return mac_table
class SimpleSwitchController(ControllerBase):
def __init__(self, req, link, data, **config):
super(SimpleSwitchController, self).__init__(req, link, data, **config)
self.simpl_switch_spp = data[simple_switch_instance_name]
@route('simpleswitch', url, methods=['GET'], requirements={'dpid': dpid_lib.DPID_PATTERN})
def list_mac_table(self, req, **kwargs):
simple_switch = self.simpl_switch_spp
dpid = dpid_lib.str_to_dpid(kwargs['dpid'])
if dpid not in simple_switch.mac_to_port:
return Response(status=404)
mac_table = simple_switch.mac_to_port.get(dpid, {})
body = json.dumps(mac_table)
return Response(content_type='application/json', body=body)
@route('simpleswitch', url, methods=['PUT'], requirements={'dpid': dpid_lib.DPID_PATTERN})
def put_mac_table(self, req, **kwargs):
simple_switch = self.simpl_switch_spp
dpid = dpid_lib.str_to_dpid(kwargs['dpid'])
new_entry = eval(req.body)
if dpid not in simple_switch.mac_to_port:
return Response(status=404)
try:
mac_table = simple_switch.set_mac_to_port(dpid, new_entry)
body = json.dumps(mac_table)
return Response(content_type='application/json', body=body)
except Exception as e:
return Response(status=500)
上述应用的代码中主要定义了两个类:SimpleSwitchRest13和SimpleSwitchController。其中SimpleSwitchRest13是SimpleSwitch13的派生类。此外,还需要启动一个WSGIApplication模块和WSGIServer模块提供服务。SimpleSwitchController类是作为WSGIApplication的controller类存在,用于实现对应的RESTAPI的内容。整个应用提供了两个RESTAPI的接口:
-
获取MAC地址表 API
获取Switching hub中MAC Table的内容,并以JSON格式返回MAC:PORT内容:
@route('simpleswitch', url, methods=['GET'], requirements={'dpid': dpid_lib.DPID_PATTERN})
def list_mac_table(self, req, **kwargs):
-
添加MAC地址表项 API
将指定的MAC:PORT信息加入到MAC Table中,同时根据更新后的MAC Table内容,添加对应的Flow enrties.
@route('simpleswitch', url, methods=['PUT'], requirements={'dpid': dpid_lib.DPID_PATTERN})
def put_mac_table(self, req, **kwargs):
对应的执行函数被route装饰器修饰,当route收取到对应的信息,如URL为:host:port/simpleswitch/mactable/{dpid},动作类型为GET时,就会调用list_mac_table函数,返回mac_table的信息。
运行验证
- 将以上的代码写入yourapp.py
- 然后使用ryu-manager运行yourapp.py
- 启动mininet连接控制器,并pingall
- 使用POSTMAN(或其他)下发RESTAPI请求验证。
实验截图如下:
RYU运行截图如下:
POSTMAN获取信息截图如下:
另一个验证不再贴图,以此类推即可。
总结
在学习RYU的过程中会接触到许多之前没有接触的技术,沉下心来认真读一读代码,越发感觉工程师在设计RYU时的精妙之处。写程序并不是逻辑的堆砌,而是一个half art, half science的存在。不仅需要追求性能上的优越,满足科学的要求,还需要注意到在实现过程中充满艺术感的设计过程。模块的划分,逻辑的抽象,以及系统结构的设计与搭建,都是非常重要的,直接影响到运行的效率。希望RYU源码分析之旅,能让我学会更多SDN的知识。当文章写得越来越偏向程序,代码分析的时候,就显得不够SDN,但是事实上,我们除了Network,以及实现SDN的OpenFlow协议以外,SDN的S也是值得学习的地方之一。希望我的学习记录能够帮助到更多的人。