From f5d89210eaf822affc1059c01a3959d4148c2768 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Mon, 30 Dec 2019 14:40:37 +0100 Subject: [PATCH] crimson/osd: implement the complete phase of Notify propagation. Q: How to test? A: * `${CEPHBIN:-bin}/rados -p rbd touch moj_watch` * `${CEPHBIN:-bin}/rados -p rbd watch moj_watch` * `${CEPHBIN:-bin}/rados -p rbd notify moj_watch test2` Signed-off-by: Radoslaw Zarzynski --- src/crimson/osd/ops_executer.cc | 9 +++-- src/crimson/osd/watch.cc | 69 +++++++++++++++++++++++++++++++++ src/crimson/osd/watch.h | 55 ++++++++++++++++++++++---- 3 files changed, 122 insertions(+), 11 deletions(-) diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 39ff6d320f32..99a5f7cfcebd 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -157,8 +157,8 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch( [] (auto&& ctx, ObjectContextRef obc) { auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr); if (emplaced) { - const auto& cookie = ctx.key.first; - it->second = crimson::osd::Watch::create(ctx.info, obc); + const auto& [cookie, entity] = ctx.key; + it->second = crimson::osd::Watch::create(obc, ctx.info, entity); logger().info("op_effect: added new watcher: {}", ctx.key); } else { logger().info("op_effect: found existing watcher: {}", ctx.key); @@ -285,12 +285,14 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify( return crimson::ct_error::enoent::make(); } struct notify_ctx_t { + crimson::net::ConnectionRef conn; notify_info_t ninfo; const uint64_t client_gid; const epoch_t epoch; notify_ctx_t(const MOSDOp& msg) - : client_gid(msg.get_reqid().name.num()), + : conn(msg.get_connection()), + client_gid(msg.get_reqid().name.num()), epoch(msg.get_map_epoch()) { } }; @@ -325,6 +327,7 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify( return crimson::osd::Notify::create_n_propagate( std::begin(alive_watchers), std::end(alive_watchers), + std::move(ctx.conn), ctx.ninfo, ctx.client_gid, obc->obs.oi.user_version); diff --git a/src/crimson/osd/watch.cc b/src/crimson/osd/watch.cc index 84d65f9d06a9..b5d13fd1ecaa 100644 --- a/src/crimson/osd/watch.cc +++ b/src/crimson/osd/watch.cc @@ -51,4 +51,73 @@ seastar::future<> Watch::start_notify(NotifyRef notify) return is_connected() ? send_notify_msg(*it) : seastar::now(); } +seastar::future<> Watch::notify_ack( + const uint64_t notify_id, + const ceph::bufferlist& reply_bl) +{ + logger().info("{}", __func__); + return seastar::do_for_each(in_progress_notifies, + [this_shared=shared_from_this(), &reply_bl] (auto notify) { + return notify->complete_watcher(this_shared, reply_bl); + } + ).then([this] { + in_progress_notifies.clear(); + return seastar::now(); + }); +} + +bool notify_reply_t::operator<(const notify_reply_t& rhs) const +{ + // comparing std::pairs to emphasize our legacy. ceph-osd stores + // notify_replies as std::multimap, bl>. + // unfortunately, what seems to be an implementation detail, got + // exposed as part of our public API (the `reply_buffer` parameter + // of the `rados_notify` family). + const auto lhsp = std::make_pair(watcher_gid, watcher_cookie); + const auto rhsp = std::make_pair(rhs.watcher_gid, rhs.watcher_cookie); + return lhsp < rhsp; +} + +seastar::future<> Notify::complete_watcher( + WatchRef watch, + const ceph::bufferlist& reply_bl) +{ + if (discarded || complete) { + return seastar::now(); + } + notify_replies.emplace(notify_reply_t{ + watch->get_watcher_gid(), + watch->get_cookie(), + reply_bl}); + watchers.erase(watch); + return maybe_send_completion(); +} + +seastar::future<> Notify::maybe_send_completion() +{ + logger().info("{} -- {} in progress watchers", __func__, watchers.size()); + if (watchers.empty()) { + // prepare reply + ceph::bufferlist bl; + encode(notify_replies, bl); + // FIXME: this is just a stub + std::list> missed; + encode(missed, bl); + + complete = true; + + ceph::bufferlist empty; + auto reply = make_message( + ninfo.cookie, + user_version, + ninfo.notify_id, + CEPH_WATCH_EVENT_NOTIFY_COMPLETE, + empty, + client_gid); + reply->set_data(bl); + return conn->send(std::move(reply)); + } + return seastar::now(); +} + } // namespace crimson::osd diff --git a/src/crimson/osd/watch.h b/src/crimson/osd/watch.h index 3db6471f6fdb..bf9229028d40 100644 --- a/src/crimson/osd/watch.h +++ b/src/crimson/osd/watch.h @@ -4,12 +4,14 @@ #pragma once #include +#include #include #include #include "crimson/net/Connection.h" #include "crimson/osd/object_context.h" +#include "include/denc.h" namespace crimson::osd { @@ -38,6 +40,7 @@ class Watch : public seastar::enable_shared_from_this { crimson::osd::ObjectContextRef obc; watch_info_t winfo; + entity_name_t entity_name; seastar::future<> start_notify(NotifyRef); seastar::future<> send_notify_msg(NotifyRef); @@ -46,10 +49,12 @@ class Watch : public seastar::enable_shared_from_this { public: Watch(private_ctag_t, + crimson::osd::ObjectContextRef obc, const watch_info_t& winfo, - crimson::osd::ObjectContextRef obc) - : winfo(winfo), - obc(std::move(obc)) { + const entity_name_t& entity_name) + : obc(std::move(obc)), + winfo(winfo), + entity_name(entity_name) { } seastar::future<> connect(crimson::net::ConnectionRef, bool); @@ -70,31 +75,58 @@ public: /// Call when notify_ack received on notify_id seastar::future<> notify_ack( uint64_t notify_id, ///< [in] id of acked notify - const ceph::bufferlist& reply_bl) { ///< [in] notify reply buffer - return seastar::now(); - } + const ceph::bufferlist& reply_bl); ///< [in] notify reply buffer template static seastar::shared_ptr create(Args&&... args) { return seastar::make_shared(private_ctag_t{}, std::forward(args)...); }; + + uint64_t get_watcher_gid() const { + return entity_name.num(); + } + uint64_t get_cookie() const { + return winfo.cookie; + } }; using WatchRef = seastar::shared_ptr; +struct notify_reply_t { + uint64_t watcher_gid; + uint64_t watcher_cookie; + ceph::bufferlist bl; + + bool operator<(const notify_reply_t& rhs) const; + DENC(notify_reply_t, v, p) { + DENC_START(1, 1, p); + denc(v.watcher_gid, p); + denc(v.watcher_cookie, p); + denc(v.bl, p); + DENC_FINISH(p); + } +}; + class Notify { std::set watchers; notify_info_t ninfo; + crimson::net::ConnectionRef conn; uint64_t client_gid; uint64_t user_version; + bool complete = false; + bool discarded = false; + + /// (gid,cookie) -> reply_bl for everyone who acked the notify + std::multiset notify_replies; uint64_t get_id() const { return ninfo.notify_id; } - seastar::future<> propagate() { return seastar::now(); } + seastar::future<> maybe_send_completion(); template Notify(WatchIteratorT begin, WatchIteratorT end, + crimson::net::ConnectionRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version); @@ -115,17 +147,22 @@ public: WatchIteratorT begin, WatchIteratorT end, Args&&... args); + + seastar::future<> complete_watcher(WatchRef watch, + const ceph::bufferlist& reply_bl); }; template Notify::Notify(WatchIteratorT begin, WatchIteratorT end, + crimson::net::ConnectionRef conn, const notify_info_t& ninfo, const uint64_t client_gid, const uint64_t user_version) : watchers(begin, end), ninfo(ninfo), + conn(std::move(conn)), client_gid(client_gid), user_version(user_version) { } @@ -147,8 +184,10 @@ seastar::future<> Notify::create_n_propagate( return seastar::do_for_each(begin, end, [=] (auto& watchref) { return watchref->start_notify(notify); }).then([notify = std::move(notify)] { - return notify->propagate(); + return notify->maybe_send_completion(); }); } } // namespace crimson::osd + +WRITE_CLASS_DENC(crimson::osd::notify_reply_t) -- 2.47.3