From 60f5b50f476d58248d49c0728554f2825e46f918 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Fri, 11 Jan 2019 17:53:15 +0800 Subject: [PATCH] crimson: add ChainedDispatchers it will be used to glue multiple dispatchers for subscribing events emitted by messenger Signed-off-by: Kefu Chai --- src/crimson/osd/CMakeLists.txt | 3 +- src/crimson/osd/chained_dispatchers.cc | 75 ++++++++++++++++++++++++++ src/crimson/osd/chained_dispatchers.h | 32 +++++++++++ 3 files changed, 109 insertions(+), 1 deletion(-) create mode 100644 src/crimson/osd/chained_dispatchers.cc create mode 100644 src/crimson/osd/chained_dispatchers.h diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index 2c109366460..9121bc1025d 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -1,5 +1,6 @@ add_executable(crimson-osd main.cc - osd.cc) + osd.cc + chained_dispatchers.cc) target_link_libraries(crimson-osd crimson-common crimson-os crimson) diff --git a/src/crimson/osd/chained_dispatchers.cc b/src/crimson/osd/chained_dispatchers.cc new file mode 100644 index 00000000000..9f1b0f771ba --- /dev/null +++ b/src/crimson/osd/chained_dispatchers.cc @@ -0,0 +1,75 @@ +#include "chained_dispatchers.h" +#include "crimson/net/Connection.h" + + +seastar::future<> +ChainedDispatchers::ms_dispatch(ceph::net::ConnectionRef conn, + MessageRef m) { + return seastar::do_for_each(dispatchers, [conn, m](Dispatcher* dispatcher) { + return dispatcher->ms_dispatch(conn, m); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_accept(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_accept(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_connect(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_connect(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_reset(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_reset(conn); + }); +} + +seastar::future<> +ChainedDispatchers::ms_handle_remote_reset(ceph::net::ConnectionRef conn) { + return seastar::do_for_each(dispatchers, [conn](Dispatcher* dispatcher) { + return dispatcher->ms_handle_remote_reset(conn); + }); +} + +seastar::future> +ChainedDispatchers::ms_get_authorizer(peer_type_t peer_type, bool force_new) +{ + // since dispatcher returns a nullptr if it does not have the authorizer, + // let's use the chain-of-responsibility pattern here. + struct Params { + peer_type_t peer_type; + bool force_new; + std::deque::iterator first, last; + } params = {peer_type, force_new, + dispatchers.begin(), dispatchers.end()}; + return seastar::do_with(Params{params}, [this] (Params& params) { + using result_t = std::unique_ptr; + return seastar::repeat_until_value([&] () { + auto& first = params.first; + if (first == params.last) { + // just give up + return seastar::make_ready_future>(result_t{}); + } else { + return (*first)->ms_get_authorizer(params.peer_type, + params.force_new) + .then([&] (auto&& auth)-> std::optional { + if (auth) { + // hooray! + return std::move(auth); + } else { + // try next one + ++first; + return {}; + } + }); + } + }); + }); +} diff --git a/src/crimson/osd/chained_dispatchers.h b/src/crimson/osd/chained_dispatchers.h new file mode 100644 index 00000000000..a287e500e49 --- /dev/null +++ b/src/crimson/osd/chained_dispatchers.h @@ -0,0 +1,32 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:nil -*- +// vim: ts=8 sw=2 smarttab + +#pragma once + +#include +#include "crimson/net/Dispatcher.h" + +// in existing Messenger, dispatchers are put into a chain as described by +// chain-of-responsibility pattern. we could do the same to stop processing +// the message once any of the dispatchers claims this message, and prevent +// other dispatchers from reading it. but this change is more involved as +// it requires changing the ms_ methods to return a bool. so as an intermediate +// solution, we are using an observer dispatcher to notify all the interested +// or unintersted parties. +class ChainedDispatchers : public ceph::net::Dispatcher { + std::deque dispatchers; +public: + void push_front(Dispatcher* dispatcher) { + dispatchers.push_front(dispatcher); + } + void push_back(Dispatcher* dispatcher) { + dispatchers.push_back(dispatcher); + } + seastar::future<> ms_dispatch(ceph::net::ConnectionRef conn, MessageRef m) override; + seastar::future<> ms_handle_accept(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_connect(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_reset(ceph::net::ConnectionRef conn) override; + seastar::future<> ms_handle_remote_reset(ceph::net::ConnectionRef conn) override; + seastar::future> + ms_get_authorizer(peer_type_t peer_type, bool force_new) override; +}; -- 2.39.5