Openstack源代码分析之keystone部分(一)--WSGI接口流程分析

前面分析了keystone服务的启动工作,那启动后我们是怎么通过WSGI接口访问其中的服务的呢?

keystone-paste.ini配置文件最下面

[composite:main]
use = egg:Paste#urlmap
/v2.0 = public_api

[composite:admin]
use = egg:Paste#urlmap
/v2.0 = admin_api

一般调用keystone v2.0版本的api的请求路径为http://192.168.1.x:5000/v2.0/xxxxxxx(public endpoint)和http://192.168.1.x:35357/v2.0/xxxxx(admin port)

admin和public的处理方式都是一样的,只不过权限不同,这里只分析public_api的流程,keystone-paste.ini的流程如下

[pipeline:public_api]
pipeline = access_log sizelimit url_normalize token_auth admin_token_auth xml_body json_body ec2_extension user_crud_extension public_service

pastedeploy过滤流程如下

[filter:access_log]
paste.filter_factory = keystone.contrib.access:AccessLogMiddleware.factory

[filter:sizelimit]
paste.filter_factory = keystone.middleware:RequestBodySizeLimiter.factory

[filter:url_normalize]
paste.filter_factory = keystone.middleware:NormalizingFilter.factory

[filter:token_auth]
paste.filter_factory = keystone.middleware:TokenAuthMiddleware.factory

[filter:admin_token_auth]
paste.filter_factory = keystone.middleware:AdminTokenAuthMiddleware.factory

[filter:xml_body]
paste.filter_factory = keystone.middleware:XmlBodyMiddleware.factory

[filter:json_body]
paste.filter_factory = keystone.middleware:JsonBodyMiddleware.factory

[filter:ec2_extension]
paste.filter_factory = keystone.contrib.ec2:Ec2Extension.factory

[filter:user_crud_extension]
paste.filter_factory = keystone.contrib.user_crud:CrudExtension.factory

[app:public_service]
paste.app_factory = keystone.service:public_app_factory

一个一个来分析

首先是keystone.contrib.access:AccessLogMiddleware.factory

keystone.contrib.access模块下有两个文件init.py,core.py,init.py只有一行代码

from keystone.contrib.access.core import *

显而易见,关键代码在core.py中,core.py代码如下

CONF = config.CONF
LOG = logging.getLogger('access')
APACHE_TIME_FORMAT = '%d/%b/%Y:%H:%M:%S'
APACHE_LOG_FORMAT = (
    '%(remote_addr)s - %(remote_user)s [%(datetime)s] "%(method)s %(url)s '
    '%(http_version)s" %(status)s %(content_length)s')


class AccessLogMiddleware(wsgi.Middleware):
    """Writes an access log to INFO."""

    @webob.dec.wsgify
    def __call__(self, request):
        data = {
            'remote_addr': request.remote_addr,
            'remote_user': request.remote_user or '-',
            'method': request.method,
            'url': request.url,
            'http_version': request.http_version,
            'status': 500,
            'content_length': '-'}

        try:
            response = request.get_response(self.application)
            data['status'] = response.status_int
            data['content_length'] = len(response.body) or '-'
        finally:
            # must be calculated *after* the application has been called
            now = timeutils.utcnow()

            # timeutils may not return UTC, so we can't hardcode +0000
            data['datetime'] = '%s %s' % (now.strftime(APACHE_TIME_FORMAT),
                                          now.strftime('%z') or '+0000')

            LOG.info(APACHE_LOG_FORMAT % data)
        return response
#wsgi.Middleware类在keystone.commone下

class Middleware(Application): “””Base WSGI middleware. These classes require an application to be initialized that will be called next. By default the middleware will simply call its wrapped app, or you can override __call__ to customize its behavior. “”” @classmethod def factory(cls, global_config, **local_config): “””Used for paste app factories in paste.deploy config files. Any local configuration (that is, values under the [filter:APPNAME] section of the paste config) will be passed into the `__init__` method as kwargs. A hypothetical configuration would look like: [filter:analytics] redis_host = 127.0.0.1 paste.filter_factory = keystone.analytics:Analytics.factory which would result in a call to the `Analytics` class as import keystone.analytics keystone.analytics.Analytics(app, redis_host=’127.0.0.1′) You could of course re-implement the `factory` method in subclasses, but using the kwarg passing it shouldn’t be necessary. “”” def _factory(app): conf = global_config.copy() conf.update(local_config) return cls(app, **local_config)#__init__ here application=app return _factory

    def __init__(self, application):

        self.application = application

AccessLogMiddleware类下的__call__函数就核心,用web.dec.wsgify装饰过,参数request为Webob封装的Request对象,因为是filter,向下传递还需要一个app对象,可以看到,通过request.get_response(self.application)来获取app,向下过滤,直到找到不是filter的app对象。

那这个self.application哪来呢,毫无疑问,一定是初始化来的,这个类没有初始化函数,进入基类,wsgi,Middleware果不其然,不仅有__init__函数,还有factroy函数,pastedeploy调用factroy函数,其中

cls(app, **local_config)

初始化app,由于factory返回的是cls类本身,所以调用了__call__从而完成filter的功能。

access_log实现对请求的log记录,代码很明了。

access_log后面是sizelimit

class RequestBodySizeLimiter(wsgi.Middleware):
    """Limit the size of an incoming request."""

    def __init__(self, *args, **kwargs):
        super(RequestBodySizeLimiter, self).__init__(*args, **kwargs)

    @webob.dec.wsgify(RequestClass=wsgi.Request)
    def __call__(self, req):

        if req.content_length > CONF.max_request_body_size:
            raise exception.RequestTooLarge()
        if req.content_length is None and req.is_body_readable:
            limiter = utils.LimitingReader(req.body_file,
                                           CONF.max_request_body_size)
            req.body_file = limiter
        return self.application

判断request大小,如果大于最大值,则raise Excetion报异常,否则继续向下传递,url_normalize

class NormalizingFilter(wsgi.Middleware):
    """Middleware filter to handle URL normalization."""

    def process_request(self, request):
        """Normalizes URLs."""
        # Removes a trailing slash from the given path, if any.
        if (len(request.environ['PATH_INFO']) > 1 and
                request.environ['PATH_INFO'][-1] == '/'):
            request.environ['PATH_INFO'] = request.environ['PATH_INFO'][:-1]
        # Rewrites path to root if no path is given.
        elif not request.environ['PATH_INFO']:
            request.environ['PATH_INFO'] = '/'

这个没有直接在NormalizingFilter内部实现__call__功能,而是重载了process_request函数覆盖其基类Middleware的process_request函数并调用其__call__函数,如下,这个函数的功能应该是重新定义request的路径,使其规范化。

    @webob.dec.wsgify(RequestClass=Request)
    def __call__(self, request):
        try:
            response = self.process_request(request)
            if response:
                return response
            response = request.get_response(self.application)
            return self.process_response(request, response)
        except exception.Error as e:
            LOG.warning(e)
            return render_exception(e,
                                    user_locale=request.best_match_language())
        except TypeError as e:
            LOG.exception(e)
            return render_exception(exception.ValidationError(e),
                                    user_locale=request.best_match_language())
        except Exception as e:
            LOG.exception(e)
            return render_exception(exception.UnexpectedError(exception=e),
                                    user_locale=request.best_match_language())

并继续向下调用,token_auth,一样的,处理request的public auth数据

class TokenAuthMiddleware(wsgi.Middleware):
    def process_request(self, request):
        token = request.headers.get(AUTH_TOKEN_HEADER)
        context = request.environ.get(CONTEXT_ENV, {})
        context['token_id'] = token
        if SUBJECT_TOKEN_HEADER in request.headers:
            context['subject_token_id'] = (
                request.headers.get(SUBJECT_TOKEN_HEADER))
        request.environ[CONTEXT_ENV] = context

再往下是admin_token_auth,处理admin auth数据

class AdminTokenAuthMiddleware(wsgi.Middleware):
    """A trivial filter that checks for a pre-defined admin token.

    Sets 'is_admin' to true in the context, expected to be checked by
    methods that are admin-only.

    """

    def process_request(self, request):
        token = request.headers.get(AUTH_TOKEN_HEADER)
        context = request.environ.get(CONTEXT_ENV, {})
        context['is_admin'] = (token == CONF.admin_token)
        request.environ[CONTEXT_ENV] = context

然后xml_body,xml to json

class XmlBodyMiddleware(wsgi.Middleware):
    """De/serializes XML to/from JSON."""

    def process_request(self, request):
        """Transform the request from XML to JSON."""
        incoming_xml = 'application/xml' in str(request.content_type)
        if incoming_xml and request.body:
            request.content_type = 'application/json'
            try:
                request.body = jsonutils.dumps(
                    serializer.from_xml(request.body))
            except Exception:
                LOG.exception('Serializer failed')
                e = exception.ValidationError(attribute='valid XML',
                                              target='request body')
                return wsgi.render_exception(e)

然后 json_body,对post的json格式数据进行解析

class JsonBodyMiddleware(wsgi.Middleware):
    """Middleware to allow method arguments to be passed as serialized JSON.

    Accepting arguments as JSON is useful for accepting data that may be more
    complex than simple primitives.

    In this case we accept it as urlencoded data under the key 'json' as in
    json=<urlencoded_json> but this could be extended to accept raw JSON
    in the POST body.

    Filters out the parameters `self`, `context` and anything beginning with
    an underscore.

    """
    def process_request(self, request):
        # Abort early if we don't have any work to do
        params_json = request.body
        if not params_json:
            return

        # Reject unrecognized content types. Empty string indicates
        # the client did not explicitly set the header
        if request.content_type not in ('application/json', ''):
            e = exception.ValidationError(attribute='application/json',
                                          target='Content-Type header')
            return wsgi.render_exception(e)

        params_parsed = {}
        try:
            params_parsed = jsonutils.loads(params_json)
        except ValueError:
            e = exception.ValidationError(attribute='valid JSON',
                                          target='request body')
            return wsgi.render_exception(e)
        finally:
            if not params_parsed:
                params_parsed = {}

        params = {}
        for k, v in params_parsed.iteritems():
            if k in ('self', 'context'):
                continue
            if k.startswith('_'):
                continue
            params[k] = v

        request.environ[PARAMS_ENV] = params

然后ec2_extension,s3_extension,这两个过滤filter实现的是Routes的机制,完成路径的匹配,如果在路径中没有找到,再调用self.application,如果找到,直接返回结果,代码如下;

class Ec2Extension(wsgi.ExtensionRouter):
    def add_routes(self, mapper):
        ec2_controller = controllers.Ec2Controller()
        # validation
        mapper.connect(
            '/ec2tokens',
            controller=ec2_controller,
            action='authenticate',
            conditions=dict(method=['POST']))
        # crud
        mapper.connect(
            '/users/{user_id}/credentials/OS-EC2',
            controller=ec2_controller,
            action='create_credential',
            conditions=dict(method=['POST']))
        mapper.connect(
            '/users/{user_id}/credentials/OS-EC2',
            controller=ec2_controller,
            action='get_credentials',
            conditions=dict(method=['GET']))
        mapper.connect(
            '/users/{user_id}/credentials/OS-EC2/{credential_id}',
            controller=ec2_controller,
            action='get_credential',
            conditions=dict(method=['GET']))
        mapper.connect(
            '/users/{user_id}/credentials/OS-EC2/{credential_id}',
            controller=ec2_controller,
            action='delete_credential',
            conditions=dict(method=['DELETE']))

如果在这些路径中没有找到,则在其基类ExtensionRouter中继续向下传递,如下所示:mapper.connect(xxxxxxxxxx,contoller=self.application),也实现了factory和上面一样调用自身的__call__,在其基类Router中返回self._router.

class ExtensionRouter(Router):
    """A router that allows extensions to supplement or overwrite routes.

    Expects to be subclassed.
    """
    def __init__(self, application, mapper=None):
        if mapper is None:
            mapper = routes.Mapper()
        self.application = application
        self.add_routes(mapper)
        mapper.connect('{path_info:.*}', controller=self.application)
        super(ExtensionRouter, self).__init__(mapper)

    def add_routes(self, mapper):
        pass

    @classmethod
    def factory(cls, global_config, **local_config):
        """Used for paste app factories in paste.deploy config files.

        Any local configuration (that is, values under the [filter:APPNAME]
        section of the paste config) will be passed into the `__init__` method
        as kwargs.

        A hypothetical configuration would look like:

            [filter:analytics]
            redis_host = 127.0.0.1
            paste.filter_factory = keystone.analytics:Analytics.factory

        which would result in a call to the `Analytics` class as

            import keystone.analytics
            keystone.analytics.Analytics(app, redis_host='127.0.0.1')

        You could of course re-implement the `factory` method in subclasses,
        but using the kwarg passing it shouldn't be necessary.

        """
        def _factory(app):
            conf = global_config.copy()
            conf.update(local_config)
            return cls(app, **local_config)
        return _factory

ec2_extension除了实现上述功能,在其core函数中实现了如下代码:

EXTENSION_DATA = {
    'name': 'OpenStack EC2 API',
    'namespace': 'http://docs.openstack.org/identity/api/ext/'
                 'OS-EC2/v1.0',
    'alias': 'OS-EC2',
    'updated': '2013-07-07T12:00:0-00:00',
    'description': 'OpenStack EC2 Credentials backend.',
    'links': [
        {
            'rel': 'describedby',
            # TODO(ayoung): needs a description
            'type': 'text/html',
            'href': 'https://github.com/openstack/identity-api',
        }
    ]}
extension.register_admin_extension(EXTENSION_DATA['alias'], EXTENSION_DATA)
extension.register_public_extension(EXTENSION_DATA['alias'], EXTENSION_DATA)

注册这个是有用的,在pipeline的最后一步会遇到。这里先放在着。S3_extension和ec2也是类似的实现了一个S3Extension,S3Controller和一个extension.registerxxxx(),user_crud也是类似。

最后一步是public_app_factroy,OK,找到他,在service.py下面

@fail_gracefully
def public_app_factory(global_conf, **local_conf):
    controllers.register_version('v2.0')
    conf = global_conf.copy()
    conf.update(local_conf)
    return wsgi.ComposingRouter(routes.Mapper(),
                                [identity.routers.Public(),
                                 token.routers.Router(),
                                 routers.VersionV2('public'),
                                 routers.Extension(False)])

配置完成后,返回了ComposingRouter对象,这个类下面做了什么呢,如下

class ComposingRouter(Router):
    def __init__(self, mapper=None, routers=None):
        if mapper is None:
            mapper = routes.Mapper()
        if routers is None:
            routers = []
        for router in routers:
            router.add_routes(mapper)
        super(ComposingRouter, self).__init__(mapper)

其实也没做什么,主要就是这一句,routers.add_routers(mapper),就是调用indentity.routers.Public(), token.routers.Router(),routers.VersionV2(‘public’),routers.Extension(False)类下的add_routes函数,并把mapper作为参数传进去了。继续看,这里注意routers.Extension这个类,跟进去

class Extension(wsgi.ComposableRouter):
    def __init__(self, is_admin=True):
        if is_admin:
            self.controller = controllers.AdminExtensions()
        else:
            self.controller = controllers.PublicExtensions()

    def add_routes(self, mapper):
        extensions_controller = self.controller
        mapper.connect('/extensions',
                       controller=extensions_controller,
                       action='get_extensions_info',
                       conditions=dict(method=['GET']))
        mapper.connect('/extensions/{extension_alias}',
                       controller=extensions_controller,
                       action='get_extension_info',
                       conditions=dict(method=['GET']))

add_routes实现了Routes,这不奇怪,其他的三个类也是实现了上述功能,关键是他的controller,跟到PublicExtension下面

class PublicExtensions(Extensions):
    @property
    def extensions(self):
        return extension.PUBLIC_EXTENSIONS

返回extension.PUBLIC_EXTENSION,还记得上面的

extension.register_public_extension(EXTENSION_DATA['alias'], EXTENSION_DATA)
def register_public_extension(url_prefix, extension_data):
    """Same as register_admin_extension but for public extensions."""

    PUBLIC_EXTENSIONS[url_prefix] = extension_data

就是给PUBLI_EXTENSION赋值,因此,这里指向了前面的EC2Controller,S3Controller,CRUDController。OK,这样就可以根据注册的服务,来完成映射的分发,调用public_api的流程基本完成。

问题“

1.Controller是怎么调用Manager的,Manager又是怎么调用Dirver的?这是根据不同Controller下的dependency装饰器来实现的。

2.Keystone下notification机制

bluefire1991

2013.11.2

19:47:54

    原文作者:bluefire1991
    原文地址: https://blog.csdn.net/bluefire1991/article/details/14000633
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞