]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson: implement CEPH_OSD_OP_NOTIFY_ACK.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 27 Nov 2019 22:37:41 +0000 (23:37 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 13 Feb 2020 23:11:38 +0000 (00:11 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/ops_executer.cc
src/crimson/osd/ops_executer.h
src/crimson/osd/watch.h

index b3dfbe294d355daa293e2227f752124fc0f6e53f..1eb29ad55ece4ad691d19c3ead457605f2440297 100644 (file)
@@ -297,6 +297,59 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify(
   });
 }
 
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify_ack(
+  OSDOp& osd_op,
+  const ObjectState& os)
+{
+  struct notifyack_ctx_t {
+    const entity_name_t entity;
+    std::optional<uint64_t> watch_cookie;
+    uint64_t notify_id;
+    ceph::bufferlist reply_bl;
+
+    notifyack_ctx_t(const MOSDOp& msg) : entity(msg.get_reqid().name) {
+    }
+  };
+  return with_effect_on_obc(notifyack_ctx_t{ get_message() },
+    [&] (auto& ctx) {
+      try {
+        auto bp = osd_op.indata.cbegin();
+        uint64_t wc = 0;
+        ceph::decode(ctx.notify_id, bp);
+        ceph::decode(wc, bp);
+        ctx.watch_cookie = wc;
+        if (!bp.end()) {
+          ceph::decode(ctx.reply_bl, bp);
+        }
+      } catch (const buffer::error&) {
+        // op.watch.cookie is actually the notify_id for historical reasons
+        ctx.notify_id = osd_op.op.watch.cookie;
+      }
+      return seastar::now();
+    },
+    [] (auto&& ctx, ObjectContextRef obc) {
+      if (ctx.watch_cookie) {
+        logger().info("notify_ack watch_cookie={}, notify_id={}",
+                      *ctx.watch_cookie, ctx.notify_id);
+      } else {
+        logger().info("notify_ack notify_id={}", ctx.notify_id);
+      }
+      return seastar::do_for_each(obc->watchers,
+        [ctx=std::move(ctx)] (auto& kv) {
+          const auto& [key, watchp] = kv;
+          auto& [cookie, entity] = key;
+          if (ctx.entity != entity) {
+            return seastar::now();
+          }
+          if (ctx.watch_cookie && *ctx.watch_cookie != cookie) {
+            return seastar::now();
+          }
+          logger().info("acking notify on watch {}", key);
+          return watchp->notify_ack(ctx.notify_id, ctx.reply_bl);
+        });
+  });
+}
+
 static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
   const std::string& type,
   bufferlist::const_iterator& iter)
@@ -636,6 +689,10 @@ OpsExecuter::execute_osd_op(OSDOp& osd_op)
     return do_read_op([this, &osd_op] (auto&, const auto& os) {
       return do_op_notify(osd_op, os);
     });
+  case CEPH_OSD_OP_NOTIFY_ACK:
+    return do_read_op([this, &osd_op] (auto&, const auto& os) {
+      return do_op_notify_ack(osd_op, os);
+    });
 
   default:
     logger().warn("unknown op {}", ceph_osd_op_name(op.op));
index 617db0ac42739c61cf96019fe9b6a47fd0ca40cc..a66df94a502986168c3bd8f1bd7ca894e3c2d584 100644 (file)
@@ -123,6 +123,9 @@ private:
   watch_errorator::future<> do_op_notify(
     class OSDOp& osd_op,
     const class ObjectState& os);
+  watch_errorator::future<> do_op_notify_ack(
+    class OSDOp& osd_op,
+    const class ObjectState& os);
 
   hobject_t &get_target() const {
     return obc->obs.oi.soid;
index 3b03a81bae886c44976ec128bdbdd9a22d88d0aa..ed8f4a84b153533348070ca5bb847c05df70d41d 100644 (file)
@@ -41,6 +41,13 @@ public:
     return seastar::now();
   }
 
+  /// Call when notify_ack received on notify_id
+  seastar::future<> notify_ack(
+    uint64_t notify_id, ///< [in] id of acked notify
+    const ceph::bufferlist& reply_bl) { ///< [in] notify reply buffer
+    return seastar::now();
+  }
+
   template <class... Args>
   static seastar::shared_ptr<Watch> create(Args&&... args) {
     return seastar::make_shared<Watch>(private_ctag_t{},