From: Radoslaw Zarzynski Date: Fri, 27 Dec 2019 15:25:38 +0000 (+0100) Subject: crimson/osd: dispatch MWatchNotify on CEPH_OSD_OP_NOTIFY. X-Git-Tag: v15.1.1~415^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=8adb6e7de0b80112b1ff4e00724f8fd1fd0a8f6b;p=ceph.git crimson/osd: dispatch MWatchNotify on CEPH_OSD_OP_NOTIFY. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/crimson/osd/CMakeLists.txt b/src/crimson/osd/CMakeLists.txt index ef4fc716743c..cfd7dc722a7e 100644 --- a/src/crimson/osd/CMakeLists.txt +++ b/src/crimson/osd/CMakeLists.txt @@ -30,6 +30,7 @@ add_executable(crimson-osd ${PROJECT_SOURCE_DIR}/src/osd/MissingLoc.cc ${PROJECT_SOURCE_DIR}/src/osd/PGLog.cc ${PROJECT_SOURCE_DIR}/src/osd/osd_perf_counters.cc + watch.cc ) target_link_libraries(crimson-osd crimson-common crimson-os crimson fmt::fmt) diff --git a/src/crimson/osd/object_context.h b/src/crimson/osd/object_context.h index e0822e1c5bc2..2d61963e6453 100644 --- a/src/crimson/osd/object_context.h +++ b/src/crimson/osd/object_context.h @@ -4,6 +4,7 @@ #pragma once #include +#include #include #include @@ -14,10 +15,11 @@ #include "osd/object_state.h" #include "crimson/common/config_proxy.h" #include "crimson/osd/osd_operation.h" -#include "crimson/osd/watch.h" namespace crimson::osd { +class Watch; + template struct obc_to_hoid { using type = hobject_t; @@ -40,7 +42,7 @@ public: // frequented paths. std::map is used mostly because of developer's // convenience. using watch_key_t = std::pair; - std::map watchers; + std::map> watchers; ObjectContext(const hobject_t &hoid) : obs(hoid), loaded(false) {} @@ -229,4 +231,4 @@ public: const std::set &changed) final; }; -} +} // namespace crimson::osd diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index c1191f0c817a..39ff6d320f32 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -16,6 +16,7 @@ #include #include "crimson/osd/exceptions.h" +#include "crimson/osd/watch.h" #include "osd/ClassHandler.h" namespace { @@ -156,7 +157,8 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch( [] (auto&& ctx, ObjectContextRef obc) { auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr); if (emplaced) { - it->second = crimson::osd::Watch::create(); + const auto& cookie = ctx.key.first; + it->second = crimson::osd::Watch::create(ctx.info, obc); logger().info("op_effect: added new watcher: {}", ctx.key); } else { logger().info("op_effect: found existing watcher: {}", ctx.key); @@ -245,6 +247,7 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch( ObjectState& os, ceph::os::Transaction& txn) { + logger().debug("{}", __func__); if (!os.exists) { return crimson::ct_error::enoent::make(); } @@ -265,33 +268,51 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch( return crimson::ct_error::invarg::make(); } +static uint64_t get_next_notify_id(epoch_t e) +{ + // FIXME + static std::uint64_t next_notify_id = 0; + return (((uint64_t)e) << 32) | ((uint64_t)(next_notify_id++)); +} + OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify( OSDOp& osd_op, const ObjectState& os) { + logger().debug("{}, msg epoch: {}", __func__, get_message().get_map_epoch()); + if (!os.exists) { return crimson::ct_error::enoent::make(); } - return with_effect_on_obc(notify_info_t{}, + struct notify_ctx_t { + notify_info_t ninfo; + const uint64_t client_gid; + const epoch_t epoch; + + notify_ctx_t(const MOSDOp& msg) + : client_gid(msg.get_reqid().name.num()), + epoch(msg.get_map_epoch()) { + } + }; + return with_effect_on_obc(notify_ctx_t{ get_message() }, [&] (auto& ctx) { try { auto bp = osd_op.indata.cbegin(); uint32_t ver; // obsolete ceph::decode(ver, bp); - ceph::decode(ctx.timeout, bp); - ceph::decode(ctx.bl, bp); + ceph::decode(ctx.ninfo.timeout, bp); + ceph::decode(ctx.ninfo.bl, bp); } catch (const buffer::error&) { - ctx.timeout = 0; + ctx.ninfo.timeout = 0; } - if (!ctx.timeout) { + if (!ctx.ninfo.timeout) { using crimson::common::local_conf; - ctx.timeout = local_conf()->osd_default_notify_timeout; + ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout; } - // FIXME - ctx.notify_id = 42; //osd->get_next_id(get_osdmap_epoch()); - ctx.cookie = osd_op.op.notify.cookie; + ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch); + ctx.ninfo.cookie = osd_op.op.notify.cookie; // return our unique notify id to the client - ceph::encode(ctx.notify_id, osd_op.outdata); + ceph::encode(ctx.ninfo.notify_id, osd_op.outdata); return seastar::now(); }, [] (auto&& ctx, ObjectContextRef obc) { @@ -303,7 +324,10 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify( }); return crimson::osd::Notify::create_n_propagate( std::begin(alive_watchers), - std::end(alive_watchers)); + std::end(alive_watchers), + ctx.ninfo, + ctx.client_gid, + obc->obs.oi.user_version); }); } @@ -311,6 +335,8 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify_ack( OSDOp& osd_op, const ObjectState& os) { + logger().debug("{}", __func__); + struct notifyack_ctx_t { const entity_name_t entity; uint64_t watch_cookie; diff --git a/src/crimson/osd/watch.cc b/src/crimson/osd/watch.cc new file mode 100644 index 000000000000..84d65f9d06a9 --- /dev/null +++ b/src/crimson/osd/watch.cc @@ -0,0 +1,54 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include "crimson/osd/watch.h" +#include "messages/MWatchNotify.h" + + +namespace { + seastar::logger& logger() { + return crimson::get_logger(ceph_subsys_osd); + } +} + +namespace crimson::osd { + +bool Watch::NotifyCmp::operator()(NotifyRef lhs, NotifyRef rhs) const +{ + ceph_assert(lhs); + ceph_assert(rhs); + return lhs->get_id() < rhs->get_id(); +} + +seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool) +{ + if (this->conn == conn) { + logger().debug("conn={} already connected", conn); + } + + this->conn = std::move(conn); + return seastar::now(); +} + +seastar::future<> Watch::send_notify_msg(NotifyRef notify) +{ + logger().info("{} for notify(id={})", __func__, notify->ninfo.notify_id); + return conn->send(make_message( + winfo.cookie, + notify->user_version, + notify->ninfo.notify_id, + CEPH_WATCH_EVENT_NOTIFY, + notify->ninfo.bl, + notify->client_gid)); +} + +seastar::future<> Watch::start_notify(NotifyRef notify) +{ + logger().info("{} adding ¬ify={}", __func__, notify.get()); + auto [ it, emplaced ] = in_progress_notifies.emplace(std::move(notify)); + ceph_assert(emplaced); + ceph_assert(is_alive()); + return is_connected() ? send_notify_msg(*it) : seastar::now(); +} + +} // namespace crimson::osd diff --git a/src/crimson/osd/watch.h b/src/crimson/osd/watch.h index f26c9cb1620f..3db6471f6fdb 100644 --- a/src/crimson/osd/watch.h +++ b/src/crimson/osd/watch.h @@ -9,6 +9,7 @@ #include #include "crimson/net/Connection.h" +#include "crimson/osd/object_context.h" namespace crimson::osd { @@ -34,6 +35,9 @@ class Watch : public seastar::enable_shared_from_this { }; std::set in_progress_notifies; crimson::net::ConnectionRef conn; + crimson::osd::ObjectContextRef obc; + + watch_info_t winfo; seastar::future<> start_notify(NotifyRef); seastar::future<> send_notify_msg(NotifyRef); @@ -41,7 +45,11 @@ class Watch : public seastar::enable_shared_from_this { friend Notify; public: - Watch(private_ctag_t) { + Watch(private_ctag_t, + const watch_info_t& winfo, + crimson::osd::ObjectContextRef obc) + : winfo(winfo), + obc(std::move(obc)) { } seastar::future<> connect(crimson::net::ConnectionRef, bool); @@ -77,12 +85,19 @@ using WatchRef = seastar::shared_ptr; class Notify { std::set watchers; + notify_info_t ninfo; + uint64_t client_gid; + uint64_t user_version; - uint64_t get_id() const { return 0; } - void propagate() {} + uint64_t get_id() const { return ninfo.notify_id; } + seastar::future<> propagate() { return seastar::now(); } template - Notify(WatchIteratorT begin, WatchIteratorT end); + Notify(WatchIteratorT begin, + WatchIteratorT end, + const notify_info_t& ninfo, + const uint64_t client_gid, + const uint64_t user_version); // this is a private tag for the public constructor that turns it into // de facto private one. The motivation behind the hack is make_shared // used by create_n_propagate factory. @@ -95,31 +110,44 @@ public: Notify(private_ctag_t, Args&&... args) : Notify(std::forward(args)...) { } - template + template static seastar::future<> create_n_propagate( WatchIteratorT begin, - WatchIteratorT end); + WatchIteratorT end, + Args&&... args); }; template -Notify::Notify(WatchIteratorT begin, WatchIteratorT end) - : watchers(begin, end) { +Notify::Notify(WatchIteratorT begin, + WatchIteratorT end, + const notify_info_t& ninfo, + const uint64_t client_gid, + const uint64_t user_version) + : watchers(begin, end), + ninfo(ninfo), + client_gid(client_gid), + user_version(user_version) { } -template +template seastar::future<> Notify::create_n_propagate( WatchIteratorT begin, - WatchIteratorT end) + WatchIteratorT end, + Args&&... args) { static_assert( std::is_same_v::value_type, crimson::osd::WatchRef>); - auto notify = seastar::make_shared(private_ctag_t{}, begin, end); - seastar::do_for_each(begin, end, [=] (auto& watchref) { + auto notify = seastar::make_shared( + private_ctag_t{}, + begin, + end, + std::forward(args)...); + return seastar::do_for_each(begin, end, [=] (auto& watchref) { return watchref->start_notify(notify); - }).then([notify = std::move(notify)] {; - notify->propagate(); + }).then([notify = std::move(notify)] { + return notify->propagate(); }); }