cinder-volume是如何实现rpc消息到对应方法的

rpc消息到manage method的过程是oslo.messaging实现的
OpenStack的oslo.messaging wiki十分详细的解释了这个流程,也没有比这个来得更加正式和官方
https://wiki.openstack.org/wiki/Oslo/Messaging#Server_Side_API
Openstack中其他组件的rpc server实现方式基本相同

cinder-volume的初始化代码在cinder/service.py Service.start

    def start(self):
...
        target = messaging.Target(topic=self.topic, server=self.host)
        endpoints = [self.manager]
        self.rpcserver = rpc.get_server(target, endpoints)
        self.rpcserver.start()
...

get_server调用到oslo_messaging/rpc/server.py get_rpc_server

def get_rpc_server(transport, target, endpoints,
                   executor='blocking', serializer=None):
    """Construct an RPC server.

    The executor parameter controls how incoming messages will be received and
    dispatched. By default, the most simple executor is used - the blocking
    executor.

    If the eventlet executor is used, the threading and time library need to be
    monkeypatched.

    :param transport: the messaging transport
    :type transport: Transport
    :param target: the exchange, topic and server to listen on
    :type target: Target
    :param endpoints: a list of endpoint objects
    :type endpoints: list
    :param executor: name of a message executor - for example
                     'eventlet', 'blocking'
    :type executor: str
    :param serializer: an optional entity serializer
    :type serializer: Serializer
    """
    dispatcher = rpc_dispatcher.RPCDispatcher(target, endpoints, serializer)
    return msg_server.MessageHandlingServer(transport, dispatcher, executor)

MessageHandlingServe类如下:

class MessageHandlingServer(object):
    """Server for handling messages.

    Connect a transport to a dispatcher that knows how to process the
    message using an executor that knows how the app wants to create
    new tasks.
    """

    def __init__(self, transport, dispatcher, executor='blocking'):
        """Construct a message handling server.

        The dispatcher parameter is a callable which is invoked with context
        and message dictionaries each time a message is received.

        The executor parameter controls how incoming messages will be received
        and dispatched. By default, the most simple executor is used - the
        blocking executor.

        :param transport: the messaging transport
        :type transport: Transport
        :param dispatcher: a callable which is invoked for each method
        :type dispatcher: callable
        :param executor: name of message executor - for example
                         'eventlet', 'blocking'
        :type executor: str
        """
        self.conf = transport.conf

        self.transport = transport
        self.dispatcher = dispatcher
        self.executor = executor

        try:
            mgr = driver.DriverManager('oslo.messaging.executors',
                                       self.executor)
        except RuntimeError as ex:
            raise ExecutorLoadFailure(self.executor, ex)
        else:
            self._executor_cls = mgr.driver
            self._executor = None

        super(MessageHandlingServer, self).__init__()

    def start(self):
        """Start handling incoming messages.

        This method causes the server to begin polling the transport for
        incoming messages and passing them to the dispatcher. Message
        processing will continue until the stop() method is called.

        The executor controls how the server integrates with the applications
        I/O handling strategy - it may choose to poll for messages in a new
        process, thread or co-operatively scheduled coroutine or simply by
        registering a callback with an event loop. Similarly, the executor may
        choose to dispatch messages in a new thread, coroutine or simply the
        current thread.
        """
        if self._executor is not None:
            return
        try:
            listener = self.dispatcher._listen(self.transport)
        except driver_base.TransportDriverError as ex:
            raise ServerListenError(self.target, ex)

        self._executor = self._executor_cls(self.conf, listener,
                                            self.dispatcher)
        self._executor.start()

cinder-volume使用eventlet类型的executor,所以消息分发在oslo.messaging/oslo_messaging/_executors/impl_eventlet.py中实现,入口函数为start()
每个消息,cinder-volume就会孵化一个协程去执行dispatcher方法,如下代码

    def _dispatch(self, incoming):
        spawn_with(ctxt=self.dispatcher(incoming), pool=self._greenpool)

    def start(self):
        if self._thread is not None:
            return

        @excutils.forever_retry_uncaught_exceptions
        def _executor_thread():
            try:
                while self._running:
                    incoming = self.listener.poll()
                    if incoming is not None:
                        self._dispatch(incoming)
            except greenlet.GreenletExit:
                return

        self._running = True
        self._thread = eventlet.spawn(_executor_thread)

dispatcher对应oslo.messaging.rpc.RPCDispatcher,其中的\_\_call\_\_方法先回复rabbitmq的ack消息,然后再解析消息参数,获得指定的方法名,最后调用方法

@contextlib.contextmanager
    def __call__(self, incoming, executor_callback=None):
        incoming.acknowledge()
        yield lambda: self._dispatch_and_reply(incoming, executor_callback)

    def _dispatch_and_reply(self, incoming, executor_callback):
        try:
            incoming.reply(self._dispatch(incoming.ctxt,
                                          incoming.message,
                                          executor_callback))
        except ExpectedException as e:
            LOG.debug(u'Expected exception during message handling (%s)',
                      e.exc_info[1])
            incoming.reply(failure=e.exc_info, log_failure=False)
        except Exception as e:
            # sys.exc_info() is deleted by LOG.exception().
            exc_info = sys.exc_info()
            LOG.error(_('Exception during message handling: %s'), e,
                      exc_info=exc_info)
            incoming.reply(failure=exc_info)
            # NOTE(dhellmann): Remove circular object reference
            # between the current stack frame and the traceback in
            # exc_info.
            del exc_info

    def _dispatch(self, ctxt, message, executor_callback=None):
        """Dispatch an RPC message to the appropriate endpoint method.

        :param ctxt: the request context
        :type ctxt: dict
        :param message: the message payload
        :type message: dict
        :raises: NoSuchMethod, UnsupportedVersion
        """
        method = message.get('method')
        args = message.get('args', {})
        namespace = message.get('namespace')
        version = message.get('version', '1.0')

        found_compatible = False
        for endpoint in self.endpoints:
            target = getattr(endpoint, 'target', None)
            if not target:
                target = self._default_target

            if not (self._is_namespace(target, namespace) and
                    self._is_compatible(target, version)):
                continue

            if hasattr(endpoint, method):
                localcontext.set_local_context(ctxt)
                try:
                    return self._do_dispatch(endpoint, method, ctxt, args,
                                             executor_callback)
                finally:
                    localcontext.clear_local_context()

            found_compatible = True

        if found_compatible:
            raise NoSuchMethod(method)
        else:
            raise UnsupportedVersion(version, method=method)
    原文作者:研发小峰
    原文地址: https://www.jianshu.com/p/4600dd8c655a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞