]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: implement the complete phase of Notify propagation.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 30 Dec 2019 13:40:37 +0000 (14:40 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 13 Feb 2020 23:11:40 +0000 (00:11 +0100)
Q: How to test?
A:  * `${CEPHBIN:-bin}/rados -p rbd touch moj_watch`
    * `${CEPHBIN:-bin}/rados -p rbd watch moj_watch`
    * `${CEPHBIN:-bin}/rados -p rbd notify moj_watch test2`

Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/ops_executer.cc
src/crimson/osd/watch.cc
src/crimson/osd/watch.h

index 39ff6d320f32d3b1af100c45f75730d6b92e8a0f..99a5f7cfcebdf68fb9d2075dd7e9c7368730f802 100644 (file)
@@ -157,8 +157,8 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch(
     [] (auto&& ctx, ObjectContextRef obc) {
       auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
       if (emplaced) {
-        const auto& cookie = ctx.key.first;
-        it->second = crimson::osd::Watch::create(ctx.info, obc);
+        const auto& [cookie, entity] = ctx.key;
+        it->second = crimson::osd::Watch::create(obc, ctx.info, entity);
         logger().info("op_effect: added new watcher: {}", ctx.key);
       } else {
         logger().info("op_effect: found existing watcher: {}", ctx.key);
@@ -285,12 +285,14 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify(
     return crimson::ct_error::enoent::make();
   }
   struct notify_ctx_t {
+    crimson::net::ConnectionRef conn;
     notify_info_t ninfo;
     const uint64_t client_gid;
     const epoch_t epoch;
 
     notify_ctx_t(const MOSDOp& msg)
-      : client_gid(msg.get_reqid().name.num()),
+      : conn(msg.get_connection()),
+        client_gid(msg.get_reqid().name.num()),
         epoch(msg.get_map_epoch()) {
     }
   };
@@ -325,6 +327,7 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify(
       return crimson::osd::Notify::create_n_propagate(
         std::begin(alive_watchers),
         std::end(alive_watchers),
+        std::move(ctx.conn),
         ctx.ninfo,
         ctx.client_gid,
         obc->obs.oi.user_version);
index 84d65f9d06a9be2d9d6985a7f923257d06f3d5e1..b5d13fd1ecaad90cb7bb32804d619e75f9c527ce 100644 (file)
@@ -51,4 +51,73 @@ seastar::future<> Watch::start_notify(NotifyRef notify)
   return is_connected() ? send_notify_msg(*it) : seastar::now();
 }
 
+seastar::future<> Watch::notify_ack(
+  const uint64_t notify_id,
+  const ceph::bufferlist& reply_bl)
+{
+  logger().info("{}", __func__);
+  return seastar::do_for_each(in_progress_notifies,
+    [this_shared=shared_from_this(), &reply_bl] (auto notify) {
+      return notify->complete_watcher(this_shared, reply_bl);
+    }
+  ).then([this] {
+    in_progress_notifies.clear();
+    return seastar::now();
+  });
+}
+
+bool notify_reply_t::operator<(const notify_reply_t& rhs) const
+{
+  // comparing std::pairs to emphasize our legacy. ceph-osd stores
+  // notify_replies as std::multimap<std::pair<gid, cookie>, bl>.
+  // unfortunately, what seems to be an implementation detail, got
+  // exposed as part of our public API (the `reply_buffer` parameter
+  // of the `rados_notify` family).
+  const auto lhsp = std::make_pair(watcher_gid, watcher_cookie);
+  const auto rhsp = std::make_pair(rhs.watcher_gid, rhs.watcher_cookie);
+  return lhsp < rhsp;
+}
+
+seastar::future<> Notify::complete_watcher(
+  WatchRef watch,
+  const ceph::bufferlist& reply_bl)
+{
+  if (discarded || complete) {
+    return seastar::now();
+  }
+  notify_replies.emplace(notify_reply_t{
+    watch->get_watcher_gid(),
+    watch->get_cookie(),
+    reply_bl});
+  watchers.erase(watch);
+  return maybe_send_completion();
+}
+
+seastar::future<> Notify::maybe_send_completion()
+{
+  logger().info("{} -- {} in progress watchers", __func__, watchers.size());
+  if (watchers.empty()) {
+    // prepare reply
+    ceph::bufferlist bl;
+    encode(notify_replies, bl);
+    // FIXME: this is just a stub
+    std::list<std::pair<uint64_t,uint64_t>> missed;
+    encode(missed, bl);
+
+    complete = true;
+
+    ceph::bufferlist empty;
+    auto reply = make_message<MWatchNotify>(
+      ninfo.cookie,
+      user_version,
+      ninfo.notify_id,
+      CEPH_WATCH_EVENT_NOTIFY_COMPLETE,
+      empty,
+      client_gid);
+    reply->set_data(bl);
+    return conn->send(std::move(reply));
+  }
+  return seastar::now();
+}
+
 } // namespace crimson::osd
index 3db6471f6fdb0ff0c323a2e2e01bdbd4c32c5ab4..bf9229028d404bdab1738c593de918f9946d0cff 100644 (file)
@@ -4,12 +4,14 @@
 #pragma once
 
 #include <iterator>
+#include <map>
 #include <set>
 
 #include <seastar/core/shared_ptr.hh>
 
 #include "crimson/net/Connection.h"
 #include "crimson/osd/object_context.h"
+#include "include/denc.h"
 
 namespace crimson::osd {
 
@@ -38,6 +40,7 @@ class Watch : public seastar::enable_shared_from_this<Watch> {
   crimson::osd::ObjectContextRef obc;
 
   watch_info_t winfo;
+  entity_name_t entity_name;
 
   seastar::future<> start_notify(NotifyRef);
   seastar::future<> send_notify_msg(NotifyRef);
@@ -46,10 +49,12 @@ class Watch : public seastar::enable_shared_from_this<Watch> {
 
 public:
   Watch(private_ctag_t,
+        crimson::osd::ObjectContextRef obc,
         const watch_info_t& winfo,
-        crimson::osd::ObjectContextRef obc)
-    : winfo(winfo),
-      obc(std::move(obc)) {
+        const entity_name_t& entity_name)
+    : obc(std::move(obc)),
+      winfo(winfo),
+      entity_name(entity_name) {
   }
 
   seastar::future<> connect(crimson::net::ConnectionRef, bool);
@@ -70,31 +75,58 @@ public:
   /// Call when notify_ack received on notify_id
   seastar::future<> notify_ack(
     uint64_t notify_id, ///< [in] id of acked notify
-    const ceph::bufferlist& reply_bl) { ///< [in] notify reply buffer
-    return seastar::now();
-  }
+    const ceph::bufferlist& reply_bl); ///< [in] notify reply buffer
 
   template <class... Args>
   static seastar::shared_ptr<Watch> create(Args&&... args) {
     return seastar::make_shared<Watch>(private_ctag_t{},
                                        std::forward<Args>(args)...);
   };
+
+  uint64_t get_watcher_gid() const {
+    return entity_name.num();
+  }
+  uint64_t get_cookie() const {
+    return winfo.cookie;
+  }
 };
 
 using WatchRef = seastar::shared_ptr<Watch>;
 
+struct notify_reply_t {
+  uint64_t watcher_gid;
+  uint64_t watcher_cookie;
+  ceph::bufferlist bl;
+
+  bool operator<(const notify_reply_t& rhs) const;
+  DENC(notify_reply_t, v, p) {
+    DENC_START(1, 1, p);
+    denc(v.watcher_gid, p);
+    denc(v.watcher_cookie, p);
+    denc(v.bl, p);
+    DENC_FINISH(p);
+  }
+};
+
 class Notify {
   std::set<WatchRef> watchers;
   notify_info_t ninfo;
+  crimson::net::ConnectionRef conn;
   uint64_t client_gid;
   uint64_t user_version;
+  bool complete = false;
+  bool discarded = false;
+
+  /// (gid,cookie) -> reply_bl for everyone who acked the notify
+  std::multiset<notify_reply_t> notify_replies;
 
   uint64_t get_id() const { return ninfo.notify_id; }
-  seastar::future<> propagate() { return seastar::now(); }
+  seastar::future<> maybe_send_completion();
 
   template <class WatchIteratorT>
   Notify(WatchIteratorT begin,
          WatchIteratorT end,
+         crimson::net::ConnectionRef conn,
          const notify_info_t& ninfo,
          const uint64_t client_gid,
          const uint64_t user_version);
@@ -115,17 +147,22 @@ public:
     WatchIteratorT begin,
     WatchIteratorT end,
     Args&&... args);
+
+  seastar::future<> complete_watcher(WatchRef watch,
+                                     const ceph::bufferlist& reply_bl);
 };
 
 
 template <class WatchIteratorT>
 Notify::Notify(WatchIteratorT begin,
                WatchIteratorT end,
+               crimson::net::ConnectionRef conn,
                const notify_info_t& ninfo,
                const uint64_t client_gid,
                const uint64_t user_version)
   : watchers(begin, end),
     ninfo(ninfo),
+    conn(std::move(conn)),
     client_gid(client_gid),
     user_version(user_version) {
 }
@@ -147,8 +184,10 @@ seastar::future<> Notify::create_n_propagate(
   return seastar::do_for_each(begin, end, [=] (auto& watchref) {
     return watchref->start_notify(notify);
   }).then([notify = std::move(notify)] {
-    return notify->propagate();
+    return notify->maybe_send_completion();
   });
 }
 
 } // namespace crimson::osd
+
+WRITE_CLASS_DENC(crimson::osd::notify_reply_t)