]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd: dispatch MWatchNotify on CEPH_OSD_OP_NOTIFY.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Fri, 27 Dec 2019 15:25:38 +0000 (16:25 +0100)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Thu, 13 Feb 2020 23:11:40 +0000 (00:11 +0100)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/CMakeLists.txt
src/crimson/osd/object_context.h
src/crimson/osd/ops_executer.cc
src/crimson/osd/watch.cc [new file with mode: 0644]
src/crimson/osd/watch.h

index ef4fc716743c4b182b3b07cfa84fb2ba873be7fd..cfd7dc722a7e649b68e8a3d2f4e66e92dd0bdc3f 100644 (file)
@@ -30,6 +30,7 @@ add_executable(crimson-osd
   ${PROJECT_SOURCE_DIR}/src/osd/MissingLoc.cc
   ${PROJECT_SOURCE_DIR}/src/osd/PGLog.cc
   ${PROJECT_SOURCE_DIR}/src/osd/osd_perf_counters.cc
+  watch.cc
   )
 target_link_libraries(crimson-osd
   crimson-common crimson-os crimson fmt::fmt)
index e0822e1c5bc22073b7f62cb3a6b34d3fb779500d..2d61963e645319997f70f98e859b77d24d21448b 100644 (file)
@@ -4,6 +4,7 @@
 #pragma once
 
 #include <seastar/core/shared_future.hh>
+#include <seastar/core/shared_ptr.hh>
 
 #include <boost/intrusive_ptr.hpp>
 #include <boost/intrusive/list.hpp>
 #include "osd/object_state.h"
 #include "crimson/common/config_proxy.h"
 #include "crimson/osd/osd_operation.h"
-#include "crimson/osd/watch.h"
 
 namespace crimson::osd {
 
+class Watch;
+
 template <typename OBC>
 struct obc_to_hoid {
   using type = hobject_t;
@@ -40,7 +42,7 @@ public:
   // frequented paths. std::map is used mostly because of developer's
   // convenience.
   using watch_key_t = std::pair<uint64_t, entity_name_t>;
-  std::map<watch_key_t, crimson::osd::WatchRef> watchers;
+  std::map<watch_key_t, seastar::shared_ptr<crimson::osd::Watch>> watchers;
 
   ObjectContext(const hobject_t &hoid) : obs(hoid), loaded(false) {}
 
@@ -229,4 +231,4 @@ public:
                           const std::set <std::string> &changed) final;
 };
 
-}
+} // namespace crimson::osd
index c1191f0c817a789897c6bad83a8e6da4bbf11fdb..39ff6d320f32d3b1af100c45f75730d6b92e8a0f 100644 (file)
@@ -16,6 +16,7 @@
 #include <seastar/core/thread.hh>
 
 #include "crimson/osd/exceptions.h"
+#include "crimson/osd/watch.h"
 #include "osd/ClassHandler.h"
 
 namespace {
@@ -156,7 +157,8 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch(
     [] (auto&& ctx, ObjectContextRef obc) {
       auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
       if (emplaced) {
-        it->second = crimson::osd::Watch::create();
+        const auto& cookie = ctx.key.first;
+        it->second = crimson::osd::Watch::create(ctx.info, obc);
         logger().info("op_effect: added new watcher: {}", ctx.key);
       } else {
         logger().info("op_effect: found existing watcher: {}", ctx.key);
@@ -245,6 +247,7 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch(
   ObjectState& os,
   ceph::os::Transaction& txn)
 {
+  logger().debug("{}", __func__);
   if (!os.exists) {
     return crimson::ct_error::enoent::make();
   }
@@ -265,33 +268,51 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch(
   return crimson::ct_error::invarg::make();
 }
 
+static uint64_t get_next_notify_id(epoch_t e)
+{
+  // FIXME
+  static std::uint64_t next_notify_id = 0;
+  return (((uint64_t)e) << 32) | ((uint64_t)(next_notify_id++));
+}
+
 OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify(
   OSDOp& osd_op,
   const ObjectState& os)
 {
+  logger().debug("{}, msg epoch: {}", __func__, get_message().get_map_epoch());
+
   if (!os.exists) {
     return crimson::ct_error::enoent::make();
   }
-  return with_effect_on_obc(notify_info_t{},
+  struct notify_ctx_t {
+    notify_info_t ninfo;
+    const uint64_t client_gid;
+    const epoch_t epoch;
+
+    notify_ctx_t(const MOSDOp& msg)
+      : client_gid(msg.get_reqid().name.num()),
+        epoch(msg.get_map_epoch()) {
+    }
+  };
+  return with_effect_on_obc(notify_ctx_t{ get_message() },
     [&] (auto& ctx) {
       try {
         auto bp = osd_op.indata.cbegin();
         uint32_t ver; // obsolete
         ceph::decode(ver, bp);
-        ceph::decode(ctx.timeout, bp);
-        ceph::decode(ctx.bl, bp);
+        ceph::decode(ctx.ninfo.timeout, bp);
+        ceph::decode(ctx.ninfo.bl, bp);
       } catch (const buffer::error&) {
-        ctx.timeout = 0;
+        ctx.ninfo.timeout = 0;
       }
-      if (!ctx.timeout) {
+      if (!ctx.ninfo.timeout) {
         using crimson::common::local_conf;
-        ctx.timeout = local_conf()->osd_default_notify_timeout;
+        ctx.ninfo.timeout = local_conf()->osd_default_notify_timeout;
       }
-      // FIXME
-      ctx.notify_id = 42; //osd->get_next_id(get_osdmap_epoch());
-      ctx.cookie = osd_op.op.notify.cookie;
+      ctx.ninfo.notify_id = get_next_notify_id(ctx.epoch);
+      ctx.ninfo.cookie = osd_op.op.notify.cookie;
       // return our unique notify id to the client
-      ceph::encode(ctx.notify_id, osd_op.outdata);
+      ceph::encode(ctx.ninfo.notify_id, osd_op.outdata);
       return seastar::now();
     },
     [] (auto&& ctx, ObjectContextRef obc) {
@@ -303,7 +324,10 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify(
         });
       return crimson::osd::Notify::create_n_propagate(
         std::begin(alive_watchers),
-        std::end(alive_watchers));
+        std::end(alive_watchers),
+        ctx.ninfo,
+        ctx.client_gid,
+        obc->obs.oi.user_version);
   });
 }
 
@@ -311,6 +335,8 @@ OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify_ack(
   OSDOp& osd_op,
   const ObjectState& os)
 {
+  logger().debug("{}", __func__);
+
   struct notifyack_ctx_t {
     const entity_name_t entity;
     uint64_t watch_cookie;
diff --git a/src/crimson/osd/watch.cc b/src/crimson/osd/watch.cc
new file mode 100644 (file)
index 0000000..84d65f9
--- /dev/null
@@ -0,0 +1,54 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
+#include "crimson/osd/watch.h"
+#include "messages/MWatchNotify.h"
+
+
+namespace {
+  seastar::logger& logger() {
+    return crimson::get_logger(ceph_subsys_osd);
+  }
+}
+
+namespace crimson::osd {
+
+bool Watch::NotifyCmp::operator()(NotifyRef lhs, NotifyRef rhs) const
+{
+  ceph_assert(lhs);
+  ceph_assert(rhs);
+  return lhs->get_id() < rhs->get_id();
+}
+
+seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
+{
+  if (this->conn == conn) {
+    logger().debug("conn={} already connected", conn);
+  }
+
+  this->conn = std::move(conn);
+  return seastar::now();
+}
+
+seastar::future<> Watch::send_notify_msg(NotifyRef notify)
+{
+  logger().info("{} for notify(id={})", __func__, notify->ninfo.notify_id);
+  return conn->send(make_message<MWatchNotify>(
+    winfo.cookie,
+    notify->user_version,
+    notify->ninfo.notify_id,
+    CEPH_WATCH_EVENT_NOTIFY,
+    notify->ninfo.bl,
+    notify->client_gid));
+}
+
+seastar::future<> Watch::start_notify(NotifyRef notify)
+{
+  logger().info("{} adding &notify={}", __func__, notify.get());
+  auto [ it, emplaced ] = in_progress_notifies.emplace(std::move(notify));
+  ceph_assert(emplaced);
+  ceph_assert(is_alive());
+  return is_connected() ? send_notify_msg(*it) : seastar::now();
+}
+
+} // namespace crimson::osd
index f26c9cb1620fc7e7d7ddbc332f1e7f804a3ba39e..3db6471f6fdb0ff0c323a2e2e01bdbd4c32c5ab4 100644 (file)
@@ -9,6 +9,7 @@
 #include <seastar/core/shared_ptr.hh>
 
 #include "crimson/net/Connection.h"
+#include "crimson/osd/object_context.h"
 
 namespace crimson::osd {
 
@@ -34,6 +35,9 @@ class Watch : public seastar::enable_shared_from_this<Watch> {
   };
   std::set<NotifyRef, NotifyCmp> in_progress_notifies;
   crimson::net::ConnectionRef conn;
+  crimson::osd::ObjectContextRef obc;
+
+  watch_info_t winfo;
 
   seastar::future<> start_notify(NotifyRef);
   seastar::future<> send_notify_msg(NotifyRef);
@@ -41,7 +45,11 @@ class Watch : public seastar::enable_shared_from_this<Watch> {
   friend Notify;
 
 public:
-  Watch(private_ctag_t) {
+  Watch(private_ctag_t,
+        const watch_info_t& winfo,
+        crimson::osd::ObjectContextRef obc)
+    : winfo(winfo),
+      obc(std::move(obc)) {
   }
 
   seastar::future<> connect(crimson::net::ConnectionRef, bool);
@@ -77,12 +85,19 @@ using WatchRef = seastar::shared_ptr<Watch>;
 
 class Notify {
   std::set<WatchRef> watchers;
+  notify_info_t ninfo;
+  uint64_t client_gid;
+  uint64_t user_version;
 
-  uint64_t get_id() const { return 0; }
-  void propagate() {}
+  uint64_t get_id() const { return ninfo.notify_id; }
+  seastar::future<> propagate() { return seastar::now(); }
 
   template <class WatchIteratorT>
-  Notify(WatchIteratorT begin, WatchIteratorT end);
+  Notify(WatchIteratorT begin,
+         WatchIteratorT end,
+         const notify_info_t& ninfo,
+         const uint64_t client_gid,
+         const uint64_t user_version);
   // this is a private tag for the public constructor that turns it into
   // de facto private one. The motivation behind the hack is make_shared
   // used by create_n_propagate factory.
@@ -95,31 +110,44 @@ public:
   Notify(private_ctag_t, Args&&... args) : Notify(std::forward<Args>(args)...) {
   }
 
-  template <class WatchIteratorT>
+  template <class WatchIteratorT, class... Args>
   static seastar::future<> create_n_propagate(
     WatchIteratorT begin,
-    WatchIteratorT end);
+    WatchIteratorT end,
+    Args&&... args);
 };
 
 
 template <class WatchIteratorT>
-Notify::Notify(WatchIteratorT begin, WatchIteratorT end)
-  : watchers(begin, end) {
+Notify::Notify(WatchIteratorT begin,
+               WatchIteratorT end,
+               const notify_info_t& ninfo,
+               const uint64_t client_gid,
+               const uint64_t user_version)
+  : watchers(begin, end),
+    ninfo(ninfo),
+    client_gid(client_gid),
+    user_version(user_version) {
 }
 
-template <class WatchIteratorT>
+template <class WatchIteratorT, class... Args>
 seastar::future<> Notify::create_n_propagate(
   WatchIteratorT begin,
-  WatchIteratorT end)
+  WatchIteratorT end,
+  Args&&... args)
 {
   static_assert(
     std::is_same_v<typename std::iterator_traits<WatchIteratorT>::value_type,
                    crimson::osd::WatchRef>);
-  auto notify = seastar::make_shared<Notify>(private_ctag_t{}, begin, end);
-  seastar::do_for_each(begin, end, [=] (auto& watchref) {
+  auto notify = seastar::make_shared<Notify>(
+    private_ctag_t{},
+    begin,
+    end,
+    std::forward<Args>(args)...);
+  return seastar::do_for_each(begin, end, [=] (auto& watchref) {
     return watchref->start_notify(notify);
-  }).then([notify = std::move(notify)] {;
-    notify->propagate();
+  }).then([notify = std::move(notify)] {
+    return notify->propagate();
   });
 }