oslo库的缺点是需要的背景知识比较多,英文文档写的又很简单,要真正用起来,没有几个demo会寸步难行。本文的目的就是通过demo,降低大家使用oslo库的难度。
1. oslo常用组件的一览表
库名作用背景知识oslo.config配置文件无oslo.utils工具库无oslo.service带ssl的REST服务器wsgioslo.log + oslo.context带调用链的日志系统无oslo.messagingRPC调用amqposlo.db数据库sqlalchemyoslo.rootwrapLinux的sudo无oslo.serialization序列化无oslo.i18n国际化无oslo.policy权限系统deploy pasteoslo.middlewarepipelinedeploy pastekeystonemiddleware用户系统deploy paste + keystoneoslo_test测试unittest2. 配置文件 oslo.config
它把配置项直接融入你的代码内,例子如下:
app.conf
[python] view plain copy [DEFAULT] username=app [rabbit] host = 192.168.1.7 port = 5672 myconfig.py [python] view plain copy # -*- coding: utf-8 -*- import sys from oslo_config import cfg #默认组的配置项 service_opts = [ cfg.StrOpt('username', default='default', help='user name'), cfg.StrOpt('password', help='password') ] #自定义配置组 rabbit_group = cfg.OptGroup( name='rabbit', title='RabbitMQ options' ) # 配置组中的多配置项模式 rabbit_Opts = [ cfg.StrOpt('host', default='localhost', help='IP/hostname to listen on.'), cfg.IntOpt('port', default=5672, help='Port number to listen on.') ] CONF = cfg.CONF #注册默认组的配置项 CONF.register_opts(service_opts) #配置组必须在其组件被注册前注册! CONF.register_group(rabbit_group) #注册配置组中含有多个配置项的模式,必须指明配置组 CONF.register_opts(rabbit_Opts, rabbit_group) #设置默认的日志文件名 CONF(sys.argv[1:], default_config_files=['app.conf']) #使用配置项 print ("username=%s rabbitmq.host=%s " % (CONF.username, CONF.rabbit.host))3. 工具库 oslo.utils
函数名作用oslo_utils.encodeutils.exception_to_unicode(exc)异常消息转unicodeoslo_utils.encodeutils.safe_decode(text, incoming=None, errors='strict')其他编码转unicodeoslo_utils.encodeutils.safe_encode(text, incoming=None, encoding='utf-8', errors='strict')unicode转其他编码,默认utf-8oslo_utils.encodeutils.to_utf8(text)unicode转utf-8oslo_utils.eventletutils.fetch_current_thread_functor()获取当前线程的结构体oslo_utils.fileutils.delete_if_exists(path)删除文件oslo_utils.fileutils.ensure_tree(path, mode=511)创建文件夹oslo_utils.fileutils.remove_path_on_error(path)删除文件夹oslo_utils.fileutils.write_to_tempfile(content, path=None, suffix='', prefix='tmp')写入临时文件oslo_utils.importutils.import_any(module, *modules)动态导入一个python包oslo_utils.importutils.import_class(import_str)动态导入一个python类oslo_utils.importutils.import_object(import_str, *args, **kwargs)动态导入一个python对象oslo_utils.importutils.try_import(import_str, default=None)尝试导入一个包,失败了用defaultoslo_utils.netutils.get_my_ipv4()获取本地的ipv4地址oslo_utils.netutils.is_ipv6_enabled()查看本地网络是否允许ipv6oslo_utils.netutils.is_valid_cidr(address)判断一个地址是否合法oslo_utils.netutils.is_valid_ip(address)判断ip是否合法oslo_utils.netutils.is_valid_ipv4(address)判断是否是合法的ipv4地址oslo_utils.netutils.is_valid_ipv6(address)判断是否是合法的ipv6地址oslo_utils.netutils.urlsplit(url, scheme='', allow_fragments=True)类似urlparse.urlsplit(),切分urloslo_utils.reflection.accepts_kwargs(function)查看函数是否接受kwargs类似的参数oslo_utils.reflection.get_class_name(obj, fully_qualified=True)获取对象的类名oslo_utils.reflection.get_all_class_names(obj, up_to=<type 'object'>)获取父类名字oslo_utils.reflection.get_callable_args(function, required_only=False)获取函数能传的参数oslo_utils.reflection.get_member_names(obj, exclude_hidden=True)获取对象的属性名oslo_utils.reflection.get_members(obj, exclude_hidden=True)获取对象的属性oslo_utils.reflection.get_method_self(method)获取函数的selfoslo_utils.reflection.is_subclass(obj, cls)obj是否是cls的子类oslo_utils.strutils.bool_from_string(subject, strict=False, default=False)str转booloslo_utils.strutils.check_string_length(value, name=None, min_length=0, max_length=None)检查字符串长度oslo_utils.strutils.int_from_bool_as_string(subject)bool转intoslo_utils.strutils.is_int_like(val)检查是否是数字oslo_utils.strutils.mask_dict_password(dictionary, secret='***')将字符串中的password替换掉oslo_utils.strutils.mask_password(message, secret='***')将字符串中的password替换掉oslo_utils.strutils.string_to_bytes(text, unit_system='IEC', return_int=False)str转bytesoslo_utils.timeutils.delta_seconds(before, after)计算时间差oslo_utils.timeutils.is_newer_than(after, seconds)比较时间oslo_utils.timeutils.isotime(at=None, subsecond=False)时间转iso格式oslo_utils.timeutils.parse_strtime(timestr, fmt='%Y-%m-%dT%H:%M:%S.%f')字符串转时间oslo_utils.timeutils.strtime(at=None, fmt='%Y-%m-%dT%H:%M:%S.%f')时间转字符串oslo_utils.timeutils.utcnow(with_timezone=False)获取当前时间oslo_utils.uuidutils.generate_uuid()产生一个uuidoslo_utils.uuidutils.is_uuid_like(val)检查字符串是否是uuidoslo_utils.versionutils.convert_version_to_int(version)version转intoslo_utils.versionutils.convert_version_to_str(version_int)version转字符串4. REST服务器 oslo.service
oslo.service比较负责,因为它透传了很多wsgi的参数,这些其实是开发者不希望直接看到的。下面的例子在oslo.service的基础上再封装了一个小的MiniService,这样用起来会比较方便。
[python] view plain copy # -*- coding: utf-8 -*- import sys from webob import Request #引入配置文件 from oslo_config import cfg #引入带调用链的日志 from oslo_log import log as logging from oslo_context import context #引入REST服务 from oslo_service import service from oslo_service import wsgi CONF = cfg.CONF LOG = logging.getLogger(__name__) logging.register_options(CONF) logging.setup(CONF, "m19k") #mini服务 class MiniService: def __init__(self, host = "0.0.0.0", port = "9000", workers = 1, use_ssl = False, cert_file = None, ca_file = None): self.host = host self.port = port self.workers = workers self.use_ssl = use_ssl self.cert_file = cert_file self.ca_file = ca_file self._actions = {} def add_action(self, url_path, action): if (url_path.lower() == "default") or (url_path == "/") or (url_path == ""): url_path = "default" elif (not url_path.startswith("/")): url_path = "/" + url_path self._actions[url_path] = action def _app(self, environ, start_response): context.RequestContext() LOG.debug("start action.") request = Request(environ) action = self._actions.get(environ['PATH_INFO']) if action == None: action = self._actions.get("default") if action != None: result = action(environ, request.method, request.path_info, request.query_string, request.body) try: result[1] except Exception,e: result = ('200 OK', str(result)) start_response(result[0], [('Content-Type', 'text/plain')]) return result[1] start_response("200 OK",[('Content-type', 'text/html')]) return "mini service is ok\n" def start(self): self.server = wsgi.Server(CONF, "m19k", self._app, host = self.host, port = self.port, use_ssl = self.use_ssl) launcher = service.ProcessLauncher(CONF) launcher.launch_service(self.server, workers = self.workers) LOG.debug("launch service (%s:%s)." % (self.host, self.port)) launcher.wait() 使用上述miniserver即可创建一个REST服务器,代码如下 [python] view plain copy # -*- coding: utf-8 -*- import sys from oslo_config import cfg from oslo_log import log as logging import miniservice CONF = cfg.CONF LOG = logging.getLogger(__name__) def default_action(env, method, path, query, body): LOG.info("demo action (method:%s, path:%s, query:%s, body:%s)" % (method, path, query, body)) return ("200 OK", "default") def test_action(env, method, path, query, body): LOG.info("test (method:%s, path:%s, query:%s, body:%s)" % (method, path, query, body)) return ("200 OK", "test") if __name__ == "__main__": CONF(sys.argv[1:]) host = getattr(CONF, "host", "0.0.0.0") port = getattr(CONF, "port", "8001") service = miniservice.MiniService(host, port) service.add_action("", default_action) service.add_action("test", test_action) service.start()通过curl即可测试
[python] view plain copy curl http://localhost:8001/test -H "content-type:application/json" -X POST -d "{'a':'b', 'c':'1'}" 当然还可以通过自定义的python的httpclient,代码如下: [python] view plain copy # -*- coding: utf-8 -*- import uuid import socket import functools import requests from oslo_config import cfg from oslo_log import log as logging from oslo_serialization import jsonutils client_opts = [ cfg.BoolOpt('debug', default=False, help="Print log in every request"), ] CONF = cfg.CONF CONF.register_opts(client_opts) LOG = logging.getLogger(__name__) class HttpClient(object): def __init__(self, cert=None, timeout=None, session=None): self.cert = cert self.timeout = None if not session: session = requests.Session() # Use TCPKeepAliveAdapter to fix bug 1323862 for scheme in list(session.adapters): session.mount(scheme, TCPKeepAliveAdapter()) self.session = session def request(self, url, method, json=None, connect_retries=0, **kwargs): #设置Http头,一般用于存储认证信息和格式信息 headers = kwargs.setdefault('headers', dict()) if self.cert: kwargs.setdefault('cert', self.cert) if self.timeout is not None: kwargs.setdefault('timeout', self.timeout) user_agent = headers.setdefault('User-Agent', uuid.uuid4().hex) if json is not None: headers['Content-Type'] = 'application/json' kwargs['data'] = jsonutils.dumps(json) #设置重试 send = functools.partial(self._send_request, url, method, connect_retries) #获取response resp = send(**kwargs) return resp def _send_request(self, url, method, connect_retries, connect_retry_delay=0.5, **kwargs): try: if CONF.debug: LOG.debug("REQ:{url:%s, method:%s}" % (url, method)) resp = self.session.request(method, url, **kwargs) except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e: if connect_retries <= 0: raise time.sleep(connect_retry_delay) return self._send_request( url, method, connect_retries=connect_retries - 1, connect_retry_delay=connect_retry_delay * 2, **kwargs) if CONF.debug: LOG.debug("RESP:{url:%s, method:%s, status:%s}" % (url, method, resp.status_code)) return resp def head(self, url, **kwargs): return self.request(url, 'HEAD', **kwargs) def get(self, url, **kwargs): return self.request(url, 'GET', **kwargs) def post(self, url, **kwargs): return self.request(url, 'POST', **kwargs) def put(self, url, **kwargs): return self.request(url, 'PUT', **kwargs) def delete(self, url, **kwargs): return self.request(url, 'DELETE', **kwargs) def patch(self, url, **kwargs): return self.request(url, 'PATCH', **kwargs) #用于解决TCP Keep-Alive的补丁 class TCPKeepAliveAdapter(requests.adapters.HTTPAdapter): def init_poolmanager(self, *args, **kwargs): if 'socket_options' not in kwargs: socket_options = [ # Keep Nagle's algorithm off (socket.IPPROTO_TCP, socket.TCP_NODELAY, 1), # Turn on TCP Keep-Alive (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), ] if hasattr(socket, 'TCP_KEEPIDLE'): socket_options += [ # Wait 60 seconds before sending keep-alive probes (socket.IPPROTO_TCP, socket.TCP_KEEPIDLE, 60) ] if hasattr(socket, 'TCP_KEEPCNT'): socket_options += [ # Set the maximum number of keep-alive probes (socket.IPPROTO_TCP, socket.TCP_KEEPCNT, 4) ] if hasattr(socket, 'TCP_KEEPINTVL'): socket_options += [ # Send keep-alive probes every 15 seconds (socket.IPPROTO_TCP, socket.TCP_KEEPINTVL, 15) ] kwargs['socket_options'] = socket_options super(TCPKeepAliveAdapter, self).init_poolmanager(*args, **kwargs) httpclient = HttpClient() print httpclient.request("http://localhost:8001/test", "POST", "{'a':'b'}") 5. 日志和调用链 oslo.log + oslo.context 纯粹的oslo.log是很容易使用的,参见下面的例子: [python] view plain copy from oslo_config import cfg from oslo_log import log as logging LOG = logging.getLogger(__name__) CONF = cfg.CONF DOMAIN = "demo" logging.register_options(CONF) logging.setup(CONF, DOMAIN) # Oslo Logging uses INFO as default LOG.info("Oslo Logging") LOG.warning("Oslo Logging") LOG.error("Oslo Logging") 而oslo.context(所谓的调用链),指的是每个Rest请求里面,在打印日志的时候都会带一个不变的request_id,由此可以分离出单次操作的日志。 在上述miniservice中,在REST的入口处,通过 [python] view plain copy context.RequestContext() 即生成了这样的request_id,之后每次log都会自动带上它。6. RPC调用 oslo.messaging
一个服务对外是REST接口,而服务内部的多个组件走的是RPC。Openstack中,RPC一般用rabbitmq来实现,oslo.messaging就是封装它的。可惜的是,它也要让读者有amqp的背景知识。
server.py
[python] view plain copy from oslo_config import cfg import oslo_messaging from oslo_log import log as logging import time CONF = cfg.CONF LOG = logging.getLogger(__name__) logging.register_options(CONF) logging.setup(CONF, "myservice") CONF(default_config_files=['app.conf']) class ServerControlEndpoint(object): target = oslo_messaging.Target(namespace='control', version='2.0') def __init__(self, server): self.server = server def stop(self, ctx): if self.server: self.server.stop() class TestEndpoint(object): def test(self, ctx, arg): print "test" print arg return arg transport = oslo_messaging.get_transport(cfg.CONF) target = oslo_messaging.Target(topic='test123', server='server1') endpoints = [ ServerControlEndpoint(None), TestEndpoint(), ] server = oslo_messaging.get_rpc_server(transport, target, endpoints, executor='blocking') try: server.start() while True: time.sleep(1) except KeyboardInterrupt: print("Stopping server") server.stop() server.wait() client.py [python] view plain copy import oslo_messaging as messaging from oslo_context import context from oslo_config import cfg from oslo_log import log as logging CONF = cfg.CONF LOG = logging.getLogger(__name__) logging.register_options(CONF) logging.setup(CONF, "myservice") CONF(default_config_files=['app.conf']) ctxt = {} arg = {'a':'b'} transport = messaging.get_transport(cfg.CONF) target = messaging.Target(topic='test123') client = messaging.RPCClient(transport, target) client.call(ctxt, 'test', arg=arg)