From: Radoslaw Zarzynski Date: Tue, 26 Nov 2019 20:24:57 +0000 (+0100) Subject: crimson: add execution units for WATCH and UNWATCH. X-Git-Tag: v15.1.1~415^2~16 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=fd67448bb644bc873d15e80e8a27b30d6a4b096d;p=ceph-ci.git crimson: add execution units for WATCH and UNWATCH. Signed-off-by: Radoslaw Zarzynski --- diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 8d9db0d58e7..b69583d4826 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -110,6 +110,122 @@ OpsExecuter::call_errorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op) ); } +static watch_info_t create_watch_info(const auto& osd_op, + const auto& msg) +{ + using crimson::common::local_conf; + const uint32_t timeout = + osd_op.op.watch.timeout == 0 ? local_conf()->osd_client_watch_timeout + : osd_op.op.watch.timeout; + return { + osd_op.op.watch.cookie, + timeout, + msg.get_connection()->get_peer_addr() + }; +} + +OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch( + OSDOp& osd_op, + ObjectState& os, + ceph::os::Transaction& txn) +{ + struct connect_ctx_t { + ObjectContext::watch_key_t key; + watch_info_t info; + + connect_ctx_t(const OSDOp& osd_op, const MOSDOp& msg) + : key(osd_op.op.watch.cookie, msg.get_reqid().name), + info(create_watch_info(osd_op, msg)) { + } + }; + return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() }, + [&] (auto& ctx) { + const auto& entity = ctx.key.second; + auto [it, emplaced] = + os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info)); + if (emplaced) { + logger().info("registered new watch {} by {}", it->second, entity); + txn.nop(); + } else { + logger().info("found existing watch {} by {}", it->second, entity); + } + return seastar::now(); + }, + [] (auto&& ctx, ObjectContextRef obc) { + auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr); + if (emplaced) { + it->second = crimson::osd::Watch::create(); + logger().info("op_effect: added new watcher: {}", ctx.key); + } else { + logger().info("op_effect: found existing watcher: {}", ctx.key); + } + return it->second->connect(nullptr /* conn */, true /* will_ping */); + }); +} + +OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_unwatch( + OSDOp& osd_op, + ObjectState& os, + ceph::os::Transaction& txn) +{ + struct disconnect_ctx_t { + ObjectContext::watch_key_t key; + bool send_disconnect{ false }; + + disconnect_ctx_t(const OSDOp& osd_op, const MOSDOp& msg) + : key(osd_op.op.watch.cookie, msg.get_reqid().name) { + } + }; + return with_effect_on_obc(disconnect_ctx_t{ osd_op, get_message() }, + [&] (auto& ctx) { + const auto& entity = ctx.key.second; + if (auto nh = os.oi.watchers.extract(ctx.key); !nh.empty()) { + logger().info("removed watch {} by {}", nh.mapped(), entity); + txn.nop(); + } else { + logger().info("can't remove: no watch by {}", entity); + } + return seastar::now(); + }, + [] (auto&& ctx, ObjectContextRef obc) { + if (auto nh = obc->watchers.extract(ctx.key); !nh.empty()) { + logger().info("op_effect: disconnect watcher {}", ctx.key); + return nh.mapped()->remove(ctx.send_disconnect); + } else { + logger().info("op_effect: disconnect failed to find watcher {}", + ctx.key); + return seastar::now(); + } + }); +} + +OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch( + OSDOp& osd_op, + ObjectState& os, + ceph::os::Transaction& txn) +{ + if (!os.exists) { + return crimson::ct_error::enoent::make(); + } + switch (osd_op.op.watch.op) { + case CEPH_OSD_WATCH_OP_WATCH: + return do_op_watch_subop_watch(osd_op, os, txn); + case CEPH_OSD_WATCH_OP_RECONNECT: + // TODO: implement reconnect + break; + case CEPH_OSD_WATCH_OP_PING: + // TODO: implement ping + break; + case CEPH_OSD_WATCH_OP_UNWATCH: + return do_op_watch_subop_unwatch(osd_op, os, txn); + case CEPH_OSD_WATCH_OP_LEGACY_WATCH: + logger().warn("ignoring CEPH_OSD_WATCH_OP_LEGACY_WATCH"); + return crimson::ct_error::invarg::make(); + } + logger().warn("unrecognized WATCH subop: {}", osd_op.op.watch.op); + return crimson::ct_error::invarg::make(); +} + static inline std::unique_ptr get_pgls_filter( const std::string& type, bufferlist::const_iterator& iter) @@ -442,9 +558,8 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op) // watch/notify case CEPH_OSD_OP_WATCH: - return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) { - logger().warn("CEPH_OSD_OP_WATCH is not implemented yet; ignoring"); - return osd_op_errorator::now(); + return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) { + return do_op_watch(osd_op, os, txn); }); default: diff --git a/src/crimson/osd/ops_executer.h b/src/crimson/osd/ops_executer.h index fe4a56b5f99..d371fc6319b 100644 --- a/src/crimson/osd/ops_executer.h +++ b/src/crimson/osd/ops_executer.h @@ -46,6 +46,9 @@ class OpsExecuter { crimson::ct_error::input_output_error>; using read_errorator = PGBackend::read_errorator; using get_attr_errorator = PGBackend::get_attr_errorator; + using watch_errorator = crimson::errorator< + crimson::ct_error::enoent, + crimson::ct_error::invarg>; public: // because OpsExecuter is pretty heavy-weight object we want to ensure @@ -58,6 +61,7 @@ public: call_errorator, read_errorator, get_attr_errorator, + watch_errorator, PGBackend::stat_errorator>; private: @@ -94,6 +98,18 @@ private: EffectFunc&& effect_func); call_errorator::future<> do_op_call(class OSDOp& osd_op); + watch_errorator::future<> do_op_watch( + class OSDOp& osd_op, + class ObjectState& os, + ceph::os::Transaction& txn); + watch_errorator::future<> do_op_watch_subop_watch( + class OSDOp& osd_op, + class ObjectState& os, + ceph::os::Transaction& txn); + watch_errorator::future<> do_op_watch_subop_unwatch( + class OSDOp& osd_op, + class ObjectState& os, + ceph::os::Transaction& txn); hobject_t &get_target() const { return obc->obs.oi.soid;