]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson: add execution units for WATCH and UNWATCH.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Tue, 26 Nov 2019 20:24:57 +0000 (21:24 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 13 Feb 2020 23:11:38 +0000 (00:11 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h

index 8d9db0d58e7b16d3d0d372f5d750d4a4bf539b1e..b69583d4826c447a3a91bad6f69b136edc65d827 100644 (file)
@@ -110,6 +110,122 @@ OpsExecuter::call_errorator::future<> OpsExecuter::do_op_call(OSDOp& osd_op)
   );
 }
 
+static watch_info_t create_watch_info(const auto& osd_op,
+                                      const auto& msg)
+{
+  using crimson::common::local_conf;
+  const uint32_t timeout =
+    osd_op.op.watch.timeout == 0 ? local_conf()->osd_client_watch_timeout
+                                 : osd_op.op.watch.timeout;
+  return {
+    osd_op.op.watch.cookie,
+    timeout,
+    msg.get_connection()->get_peer_addr()
+  };
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch(
+  OSDOp& osd_op,
+  ObjectState& os,
+  ceph::os::Transaction& txn)
+{
+  struct connect_ctx_t {
+    ObjectContext::watch_key_t key;
+    watch_info_t info;
+
+    connect_ctx_t(const OSDOp& osd_op, const MOSDOp& msg)
+      : key(osd_op.op.watch.cookie, msg.get_reqid().name),
+        info(create_watch_info(osd_op, msg)) {
+    }
+  };
+  return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() },
+    [&] (auto& ctx) {
+      const auto& entity = ctx.key.second;
+      auto [it, emplaced] =
+        os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
+      if (emplaced) {
+        logger().info("registered new watch {} by {}", it->second, entity);
+        txn.nop();
+      } else {
+        logger().info("found existing watch {} by {}", it->second, entity);
+      }
+      return seastar::now();
+    },
+    [] (auto&& ctx, ObjectContextRef obc) {
+      auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
+      if (emplaced) {
+        it->second = crimson::osd::Watch::create();
+        logger().info("op_effect: added new watcher: {}", ctx.key);
+      } else {
+        logger().info("op_effect: found existing watcher: {}", ctx.key);
+      }
+      return it->second->connect(nullptr /* conn */, true /* will_ping */);
+    });
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_unwatch(
+  OSDOp& osd_op,
+  ObjectState& os,
+  ceph::os::Transaction& txn)
+{
+  struct disconnect_ctx_t {
+    ObjectContext::watch_key_t key;
+    bool send_disconnect{ false };
+
+    disconnect_ctx_t(const OSDOp& osd_op, const MOSDOp& msg)
+      : key(osd_op.op.watch.cookie, msg.get_reqid().name) {
+    }
+  };
+  return with_effect_on_obc(disconnect_ctx_t{ osd_op, get_message() },
+    [&] (auto& ctx) {
+      const auto& entity = ctx.key.second;
+      if (auto nh = os.oi.watchers.extract(ctx.key); !nh.empty()) {
+        logger().info("removed watch {} by {}", nh.mapped(), entity);
+        txn.nop();
+      } else {
+        logger().info("can't remove: no watch by {}", entity);
+      }
+      return seastar::now();
+    },
+    [] (auto&& ctx, ObjectContextRef obc) {
+      if (auto nh = obc->watchers.extract(ctx.key); !nh.empty()) {
+        logger().info("op_effect: disconnect watcher {}", ctx.key);
+        return nh.mapped()->remove(ctx.send_disconnect);
+      } else {
+        logger().info("op_effect: disconnect failed to find watcher {}",
+                      ctx.key);
+        return seastar::now();
+      }
+    });
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch(
+  OSDOp& osd_op,
+  ObjectState& os,
+  ceph::os::Transaction& txn)
+{
+  if (!os.exists) {
+    return crimson::ct_error::enoent::make();
+  }
+  switch (osd_op.op.watch.op) {
+    case CEPH_OSD_WATCH_OP_WATCH:
+      return do_op_watch_subop_watch(osd_op, os, txn);
+    case CEPH_OSD_WATCH_OP_RECONNECT:
+      // TODO: implement reconnect
+      break;
+    case CEPH_OSD_WATCH_OP_PING:
+      // TODO: implement ping
+      break;
+    case CEPH_OSD_WATCH_OP_UNWATCH:
+      return do_op_watch_subop_unwatch(osd_op, os, txn);
+    case CEPH_OSD_WATCH_OP_LEGACY_WATCH:
+      logger().warn("ignoring CEPH_OSD_WATCH_OP_LEGACY_WATCH");
+      return crimson::ct_error::invarg::make();
+  }
+  logger().warn("unrecognized WATCH subop: {}", osd_op.op.watch.op);
+  return crimson::ct_error::invarg::make();
+}
+
 static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
   const std::string& type,
   bufferlist::const_iterator& iter)
@@ -442,9 +558,8 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op)
 
   // watch/notify
   case CEPH_OSD_OP_WATCH:
-    return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
-      logger().warn("CEPH_OSD_OP_WATCH is not implemented yet; ignoring");
-      return osd_op_errorator::now();
+    return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+      return do_op_watch(osd_op, os, txn);
     });
 
   default:
index fe4a56b5f994f41beea1ee1cdcac6152dcf88c39..d371fc6319bf7696efd6a719b5c611c23b8384f3 100644 (file)
@@ -46,6 +46,9 @@ class OpsExecuter {
     crimson::ct_error::input_output_error>;
   using read_errorator = PGBackend::read_errorator;
   using get_attr_errorator = PGBackend::get_attr_errorator;
+  using watch_errorator = crimson::errorator<
+    crimson::ct_error::enoent,
+    crimson::ct_error::invarg>;
 
 public:
   // because OpsExecuter is pretty heavy-weight object we want to ensure
@@ -58,6 +61,7 @@ public:
     call_errorator,
     read_errorator,
     get_attr_errorator,
+    watch_errorator,
     PGBackend::stat_errorator>;
 
 private:
@@ -94,6 +98,18 @@ private:
     EffectFunc&& effect_func);
 
   call_errorator::future<> do_op_call(class OSDOp& osd_op);
+  watch_errorator::future<> do_op_watch(
+    class OSDOp& osd_op,
+    class ObjectState& os,
+    ceph::os::Transaction& txn);
+  watch_errorator::future<> do_op_watch_subop_watch(
+    class OSDOp& osd_op,
+    class ObjectState& os,
+    ceph::os::Transaction& txn);
+  watch_errorator::future<> do_op_watch_subop_unwatch(
+    class OSDOp& osd_op,
+    class ObjectState& os,
+    ceph::os::Transaction& txn);
 
   hobject_t &get_target() const {
     return obc->obs.oi.soid;