From ba1c17c6bb2d5e817882c72203a016964d50b53a Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Wed, 27 Nov 2019 23:37:41 +0100 Subject: [PATCH] crimson: implement CEPH_OSD_OP_NOTIFY_ACK. Signed-off-by: Radoslaw Zarzynski --- src/crimson/osd/ops_executer.cc | 57 +++++++++++++++++++++++++++++++++ src/crimson/osd/ops_executer.h | 3 ++ src/crimson/osd/watch.h | 7 ++++ 3 files changed, 67 insertions(+) diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index b3dfbe294d355..1eb29ad55ece4 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -297,6 +297,59 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify( }); } +OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify_ack( + OSDOp& osd_op, + const ObjectState& os) +{ + struct notifyack_ctx_t { + const entity_name_t entity; + std::optional watch_cookie; + uint64_t notify_id; + ceph::bufferlist reply_bl; + + notifyack_ctx_t(const MOSDOp& msg) : entity(msg.get_reqid().name) { + } + }; + return with_effect_on_obc(notifyack_ctx_t{ get_message() }, + [&] (auto& ctx) { + try { + auto bp = osd_op.indata.cbegin(); + uint64_t wc = 0; + ceph::decode(ctx.notify_id, bp); + ceph::decode(wc, bp); + ctx.watch_cookie = wc; + if (!bp.end()) { + ceph::decode(ctx.reply_bl, bp); + } + } catch (const buffer::error&) { + // op.watch.cookie is actually the notify_id for historical reasons + ctx.notify_id = osd_op.op.watch.cookie; + } + return seastar::now(); + }, + [] (auto&& ctx, ObjectContextRef obc) { + if (ctx.watch_cookie) { + logger().info("notify_ack watch_cookie={}, notify_id={}", + *ctx.watch_cookie, ctx.notify_id); + } else { + logger().info("notify_ack notify_id={}", ctx.notify_id); + } + return seastar::do_for_each(obc->watchers, + [ctx=std::move(ctx)] (auto& kv) { + const auto& [key, watchp] = kv; + auto& [cookie, entity] = key; + if (ctx.entity != entity) { + return seastar::now(); + } + if (ctx.watch_cookie && *ctx.watch_cookie != cookie) { + return seastar::now(); + } + logger().info("acking notify on watch {}", key); + return watchp->notify_ack(ctx.notify_id, ctx.reply_bl); + }); + }); +} + static inline std::unique_ptr get_pgls_filter( const std::string& type, bufferlist::const_iterator& iter) @@ -636,6 +689,10 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op) return do_read_op([this, &osd_op] (auto&, const auto& os) { return do_op_notify(osd_op, os); }); + case CEPH_OSD_OP_NOTIFY_ACK: + return do_read_op([this, &osd_op] (auto&, const auto& os) { + return do_op_notify_ack(osd_op, os); + }); default: logger().warn("unknown op {}", ceph_osd_op_name(op.op)); diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index 617db0ac42739..a66df94a50298 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -123,6 +123,9 @@ private: watch_errorator::future<> do_op_notify( class OSDOp& osd_op, const class ObjectState& os); + watch_errorator::future<> do_op_notify_ack( + class OSDOp& osd_op, + const class ObjectState& os); hobject_t &get_target() const { return obc->obs.oi.soid; diff --git a/src/crimson/osd/watch.h b/src/crimson/osd/watch.h index 3b03a81bae886..ed8f4a84b1535 100644 --- a/src/crimson/osd/watch.h +++ b/src/crimson/osd/watch.h @@ -41,6 +41,13 @@ public: return seastar::now(); } + /// Call when notify_ack received on notify_id + seastar::future<> notify_ack( + uint64_t notify_id, ///< [in] id of acked notify + const ceph::bufferlist& reply_bl) { ///< [in] notify reply buffer + return seastar::now(); + } + template static seastar::shared_ptr create(Args&&... args) { return seastar::make_shared(private_ctag_t{}, -- 2.39.5