前面分析了keystone服务的启动工作,那启动后我们是怎么通过WSGI接口访问其中的服务的呢?
keystone-paste.ini配置文件最下面
[composite:main]
use = egg:Paste#urlmap
/v2.0 = public_api
[composite:admin]
use = egg:Paste#urlmap
/v2.0 = admin_api
一般调用keystone v2.0版本的api的请求路径为http://192.168.1.x:5000/v2.0/xxxxxxx(public endpoint)和http://192.168.1.x:35357/v2.0/xxxxx(admin port)
admin和public的处理方式都是一样的,只不过权限不同,这里只分析public_api的流程,keystone-paste.ini的流程如下
[pipeline:public_api]
pipeline = access_log sizelimit url_normalize token_auth admin_token_auth xml_body json_body ec2_extension user_crud_extension public_service
pastedeploy过滤流程如下
[filter:access_log]
paste.filter_factory = keystone.contrib.access:AccessLogMiddleware.factory
[filter:sizelimit]
paste.filter_factory = keystone.middleware:RequestBodySizeLimiter.factory
[filter:url_normalize]
paste.filter_factory = keystone.middleware:NormalizingFilter.factory
[filter:token_auth]
paste.filter_factory = keystone.middleware:TokenAuthMiddleware.factory
[filter:admin_token_auth]
paste.filter_factory = keystone.middleware:AdminTokenAuthMiddleware.factory
[filter:xml_body]
paste.filter_factory = keystone.middleware:XmlBodyMiddleware.factory
[filter:json_body]
paste.filter_factory = keystone.middleware:JsonBodyMiddleware.factory
[filter:ec2_extension]
paste.filter_factory = keystone.contrib.ec2:Ec2Extension.factory
[filter:user_crud_extension]
paste.filter_factory = keystone.contrib.user_crud:CrudExtension.factory
[app:public_service]
paste.app_factory = keystone.service:public_app_factory
一个一个来分析
首先是keystone.contrib.access:AccessLogMiddleware.factory
keystone.contrib.access模块下有两个文件init.py,core.py,init.py只有一行代码
from keystone.contrib.access.core import *
显而易见,关键代码在core.py中,core.py代码如下
CONF = config.CONF
LOG = logging.getLogger('access')
APACHE_TIME_FORMAT = '%d/%b/%Y:%H:%M:%S'
APACHE_LOG_FORMAT = (
'%(remote_addr)s - %(remote_user)s [%(datetime)s] "%(method)s %(url)s '
'%(http_version)s" %(status)s %(content_length)s')
class AccessLogMiddleware(wsgi.Middleware):
"""Writes an access log to INFO."""
@webob.dec.wsgify
def __call__(self, request):
data = {
'remote_addr': request.remote_addr,
'remote_user': request.remote_user or '-',
'method': request.method,
'url': request.url,
'http_version': request.http_version,
'status': 500,
'content_length': '-'}
try:
response = request.get_response(self.application)
data['status'] = response.status_int
data['content_length'] = len(response.body) or '-'
finally:
# must be calculated *after* the application has been called
now = timeutils.utcnow()
# timeutils may not return UTC, so we can't hardcode +0000
data['datetime'] = '%s %s' % (now.strftime(APACHE_TIME_FORMAT),
now.strftime('%z') or '+0000')
LOG.info(APACHE_LOG_FORMAT % data)
return response
#wsgi.Middleware类在keystone.commone下
class Middleware(Application): “””Base WSGI middleware. These classes require an application to be initialized that will be called next. By default the middleware will simply call its wrapped app, or you can override __call__ to customize its behavior. “”” @classmethod def factory(cls, global_config, **local_config): “””Used for paste app factories in paste.deploy config files. Any local configuration (that is, values under the [filter:APPNAME] section of the paste config) will be passed into the `__init__` method as kwargs. A hypothetical configuration would look like: [filter:analytics] redis_host = 127.0.0.1 paste.filter_factory = keystone.analytics:Analytics.factory which would result in a call to the `Analytics` class as import keystone.analytics keystone.analytics.Analytics(app, redis_host=’127.0.0.1′) You could of course re-implement the `factory` method in subclasses, but using the kwarg passing it shouldn’t be necessary. “”” def _factory(app): conf = global_config.copy() conf.update(local_config) return cls(app, **local_config)#__init__ here application=app return _factory
def __init__(self, application):
self.application = application
AccessLogMiddleware类下的__call__函数就核心,用web.dec.wsgify装饰过,参数request为Webob封装的Request对象,因为是filter,向下传递还需要一个app对象,可以看到,通过request.get_response(self.application)来获取app,向下过滤,直到找到不是filter的app对象。
那这个self.application哪来呢,毫无疑问,一定是初始化来的,这个类没有初始化函数,进入基类,wsgi,Middleware果不其然,不仅有__init__函数,还有factroy函数,pastedeploy调用factroy函数,其中
cls(app, **local_config)
初始化app,由于factory返回的是cls类本身,所以调用了__call__从而完成filter的功能。
access_log实现对请求的log记录,代码很明了。
access_log后面是sizelimit
class RequestBodySizeLimiter(wsgi.Middleware):
"""Limit the size of an incoming request."""
def __init__(self, *args, **kwargs):
super(RequestBodySizeLimiter, self).__init__(*args, **kwargs)
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
if req.content_length > CONF.max_request_body_size:
raise exception.RequestTooLarge()
if req.content_length is None and req.is_body_readable:
limiter = utils.LimitingReader(req.body_file,
CONF.max_request_body_size)
req.body_file = limiter
return self.application
判断request大小,如果大于最大值,则raise Excetion报异常,否则继续向下传递,url_normalize
class NormalizingFilter(wsgi.Middleware):
"""Middleware filter to handle URL normalization."""
def process_request(self, request):
"""Normalizes URLs."""
# Removes a trailing slash from the given path, if any.
if (len(request.environ['PATH_INFO']) > 1 and
request.environ['PATH_INFO'][-1] == '/'):
request.environ['PATH_INFO'] = request.environ['PATH_INFO'][:-1]
# Rewrites path to root if no path is given.
elif not request.environ['PATH_INFO']:
request.environ['PATH_INFO'] = '/'
这个没有直接在NormalizingFilter内部实现__call__功能,而是重载了process_request函数覆盖其基类Middleware的process_request函数并调用其__call__函数,如下,这个函数的功能应该是重新定义request的路径,使其规范化。
@webob.dec.wsgify(RequestClass=Request)
def __call__(self, request):
try:
response = self.process_request(request)
if response:
return response
response = request.get_response(self.application)
return self.process_response(request, response)
except exception.Error as e:
LOG.warning(e)
return render_exception(e,
user_locale=request.best_match_language())
except TypeError as e:
LOG.exception(e)
return render_exception(exception.ValidationError(e),
user_locale=request.best_match_language())
except Exception as e:
LOG.exception(e)
return render_exception(exception.UnexpectedError(exception=e),
user_locale=request.best_match_language())
并继续向下调用,token_auth,一样的,处理request的public auth数据
class TokenAuthMiddleware(wsgi.Middleware):
def process_request(self, request):
token = request.headers.get(AUTH_TOKEN_HEADER)
context = request.environ.get(CONTEXT_ENV, {})
context['token_id'] = token
if SUBJECT_TOKEN_HEADER in request.headers:
context['subject_token_id'] = (
request.headers.get(SUBJECT_TOKEN_HEADER))
request.environ[CONTEXT_ENV] = context
再往下是admin_token_auth,处理admin auth数据
class AdminTokenAuthMiddleware(wsgi.Middleware):
"""A trivial filter that checks for a pre-defined admin token.
Sets 'is_admin' to true in the context, expected to be checked by
methods that are admin-only.
"""
def process_request(self, request):
token = request.headers.get(AUTH_TOKEN_HEADER)
context = request.environ.get(CONTEXT_ENV, {})
context['is_admin'] = (token == CONF.admin_token)
request.environ[CONTEXT_ENV] = context
然后xml_body,xml to json
class XmlBodyMiddleware(wsgi.Middleware):
"""De/serializes XML to/from JSON."""
def process_request(self, request):
"""Transform the request from XML to JSON."""
incoming_xml = 'application/xml' in str(request.content_type)
if incoming_xml and request.body:
request.content_type = 'application/json'
try:
request.body = jsonutils.dumps(
serializer.from_xml(request.body))
except Exception:
LOG.exception('Serializer failed')
e = exception.ValidationError(attribute='valid XML',
target='request body')
return wsgi.render_exception(e)
然后 json_body,对post的json格式数据进行解析
class JsonBodyMiddleware(wsgi.Middleware):
"""Middleware to allow method arguments to be passed as serialized JSON.
Accepting arguments as JSON is useful for accepting data that may be more
complex than simple primitives.
In this case we accept it as urlencoded data under the key 'json' as in
json=<urlencoded_json> but this could be extended to accept raw JSON
in the POST body.
Filters out the parameters `self`, `context` and anything beginning with
an underscore.
"""
def process_request(self, request):
# Abort early if we don't have any work to do
params_json = request.body
if not params_json:
return
# Reject unrecognized content types. Empty string indicates
# the client did not explicitly set the header
if request.content_type not in ('application/json', ''):
e = exception.ValidationError(attribute='application/json',
target='Content-Type header')
return wsgi.render_exception(e)
params_parsed = {}
try:
params_parsed = jsonutils.loads(params_json)
except ValueError:
e = exception.ValidationError(attribute='valid JSON',
target='request body')
return wsgi.render_exception(e)
finally:
if not params_parsed:
params_parsed = {}
params = {}
for k, v in params_parsed.iteritems():
if k in ('self', 'context'):
continue
if k.startswith('_'):
continue
params[k] = v
request.environ[PARAMS_ENV] = params
然后ec2_extension,s3_extension,这两个过滤filter实现的是Routes的机制,完成路径的匹配,如果在路径中没有找到,再调用self.application,如果找到,直接返回结果,代码如下;
class Ec2Extension(wsgi.ExtensionRouter):
def add_routes(self, mapper):
ec2_controller = controllers.Ec2Controller()
# validation
mapper.connect(
'/ec2tokens',
controller=ec2_controller,
action='authenticate',
conditions=dict(method=['POST']))
# crud
mapper.connect(
'/users/{user_id}/credentials/OS-EC2',
controller=ec2_controller,
action='create_credential',
conditions=dict(method=['POST']))
mapper.connect(
'/users/{user_id}/credentials/OS-EC2',
controller=ec2_controller,
action='get_credentials',
conditions=dict(method=['GET']))
mapper.connect(
'/users/{user_id}/credentials/OS-EC2/{credential_id}',
controller=ec2_controller,
action='get_credential',
conditions=dict(method=['GET']))
mapper.connect(
'/users/{user_id}/credentials/OS-EC2/{credential_id}',
controller=ec2_controller,
action='delete_credential',
conditions=dict(method=['DELETE']))
如果在这些路径中没有找到,则在其基类ExtensionRouter中继续向下传递,如下所示:mapper.connect(xxxxxxxxxx,contoller=self.application),也实现了factory和上面一样调用自身的__call__,在其基类Router中返回self._router.
class ExtensionRouter(Router):
"""A router that allows extensions to supplement or overwrite routes.
Expects to be subclassed.
"""
def __init__(self, application, mapper=None):
if mapper is None:
mapper = routes.Mapper()
self.application = application
self.add_routes(mapper)
mapper.connect('{path_info:.*}', controller=self.application)
super(ExtensionRouter, self).__init__(mapper)
def add_routes(self, mapper):
pass
@classmethod
def factory(cls, global_config, **local_config):
"""Used for paste app factories in paste.deploy config files.
Any local configuration (that is, values under the [filter:APPNAME]
section of the paste config) will be passed into the `__init__` method
as kwargs.
A hypothetical configuration would look like:
[filter:analytics]
redis_host = 127.0.0.1
paste.filter_factory = keystone.analytics:Analytics.factory
which would result in a call to the `Analytics` class as
import keystone.analytics
keystone.analytics.Analytics(app, redis_host='127.0.0.1')
You could of course re-implement the `factory` method in subclasses,
but using the kwarg passing it shouldn't be necessary.
"""
def _factory(app):
conf = global_config.copy()
conf.update(local_config)
return cls(app, **local_config)
return _factory
ec2_extension除了实现上述功能,在其core函数中实现了如下代码:
EXTENSION_DATA = {
'name': 'OpenStack EC2 API',
'namespace': 'http://docs.openstack.org/identity/api/ext/'
'OS-EC2/v1.0',
'alias': 'OS-EC2',
'updated': '2013-07-07T12:00:0-00:00',
'description': 'OpenStack EC2 Credentials backend.',
'links': [
{
'rel': 'describedby',
# TODO(ayoung): needs a description
'type': 'text/html',
'href': 'https://github.com/openstack/identity-api',
}
]}
extension.register_admin_extension(EXTENSION_DATA['alias'], EXTENSION_DATA)
extension.register_public_extension(EXTENSION_DATA['alias'], EXTENSION_DATA)
注册这个是有用的,在pipeline的最后一步会遇到。这里先放在着。S3_extension和ec2也是类似的实现了一个S3Extension,S3Controller和一个extension.registerxxxx(),user_crud也是类似。
最后一步是public_app_factroy,OK,找到他,在service.py下面
@fail_gracefully
def public_app_factory(global_conf, **local_conf):
controllers.register_version('v2.0')
conf = global_conf.copy()
conf.update(local_conf)
return wsgi.ComposingRouter(routes.Mapper(),
[identity.routers.Public(),
token.routers.Router(),
routers.VersionV2('public'),
routers.Extension(False)])
配置完成后,返回了ComposingRouter对象,这个类下面做了什么呢,如下
class ComposingRouter(Router):
def __init__(self, mapper=None, routers=None):
if mapper is None:
mapper = routes.Mapper()
if routers is None:
routers = []
for router in routers:
router.add_routes(mapper)
super(ComposingRouter, self).__init__(mapper)
其实也没做什么,主要就是这一句,routers.add_routers(mapper),就是调用indentity.routers.Public(), token.routers.Router(),routers.VersionV2(‘public’),routers.Extension(False)类下的add_routes函数,并把mapper作为参数传进去了。继续看,这里注意routers.Extension这个类,跟进去
class Extension(wsgi.ComposableRouter):
def __init__(self, is_admin=True):
if is_admin:
self.controller = controllers.AdminExtensions()
else:
self.controller = controllers.PublicExtensions()
def add_routes(self, mapper):
extensions_controller = self.controller
mapper.connect('/extensions',
controller=extensions_controller,
action='get_extensions_info',
conditions=dict(method=['GET']))
mapper.connect('/extensions/{extension_alias}',
controller=extensions_controller,
action='get_extension_info',
conditions=dict(method=['GET']))
add_routes实现了Routes,这不奇怪,其他的三个类也是实现了上述功能,关键是他的controller,跟到PublicExtension下面
class PublicExtensions(Extensions):
@property
def extensions(self):
return extension.PUBLIC_EXTENSIONS
返回extension.PUBLIC_EXTENSION,还记得上面的
extension.register_public_extension(EXTENSION_DATA['alias'], EXTENSION_DATA)
def register_public_extension(url_prefix, extension_data):
"""Same as register_admin_extension but for public extensions."""
PUBLIC_EXTENSIONS[url_prefix] = extension_data
就是给PUBLI_EXTENSION赋值,因此,这里指向了前面的EC2Controller,S3Controller,CRUDController。OK,这样就可以根据注册的服务,来完成映射的分发,调用public_api的流程基本完成。
问题“
1.Controller是怎么调用Manager的,Manager又是怎么调用Dirver的?这是根据不同Controller下的dependency装饰器来实现的。
2.Keystone下notification机制
bluefire1991
2013.11.2
19:47:54