]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
crimson/osd: wire up handling of watch timeouts.
authorRadoslaw Zarzynski <rzarzyns@redhat.com>
Mon, 15 Mar 2021 11:59:54 +0000 (11:59 +0000)
committerRadoslaw Zarzynski <rzarzyns@redhat.com>
Wed, 12 May 2021 13:29:28 +0000 (13:29 +0000)
Signed-off-by: Radoslaw Zarzynski <rzarzyns@redhat.com>
src/crimson/osd/ops_executer.cc
src/crimson/osd/pg.h
src/crimson/osd/watch.cc
src/crimson/osd/watch.h

index 9ad50d3668c78241ad92ebe26de02c2b816757d4..955e977932a3531bdb1506c6cc89f5196cbf68d0 100644 (file)
@@ -180,11 +180,12 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch(
       }
       return seastar::now();
     },
-    [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
+    [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
       auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
       if (emplaced) {
         const auto& [cookie, entity] = ctx.key;
-        it->second = crimson::osd::Watch::create(obc, ctx.info, entity);
+        it->second = crimson::osd::Watch::create(
+          obc, ctx.info, entity, std::move(pg));
         logger().info("op_effect: added new watcher: {}", ctx.key);
       } else {
         logger().info("op_effect: found existing watcher: {}", ctx.key);
index fede88ec933ba62c6dbc8c20e52829153a51c85e..b3d24eb5487b0f961efb87903d5406b7bacaa552 100644 (file)
@@ -735,6 +735,7 @@ private:
   friend class BackfillRecovery;
   friend struct PGFacade;
   friend class InternalClientRequest;
+  friend class WatchTimeoutRequest;
 private:
   seastar::future<bool> find_unfound() {
     return seastar::make_ready_future<bool>(true);
index bdceaf68682a80944806c36d2cd1bbba64f00b85..ccdec827712c83d1c65566817bb85d75f96de226 100644 (file)
@@ -7,6 +7,8 @@
 #include <boost/range/algorithm_ext/insert.hpp>
 
 #include "crimson/osd/watch.h"
+#include "crimson/osd/osd_operations/internal_client_request.h"
+
 #include "messages/MWatchNotify.h"
 
 
@@ -18,6 +20,58 @@ namespace {
 
 namespace crimson::osd {
 
+// a watcher can remove itself if it has not seen a notification after a period of time.
+// in the case, we need to drop it also from the persisted `ObjectState` instance.
+// this operation resembles a bit the `_UNWATCH` subop.
+class WatchTimeoutRequest final : public InternalClientRequest {
+public:
+  WatchTimeoutRequest(WatchRef watch, Ref<PG> pg)
+    : InternalClientRequest(std::move(pg)),
+      watch(std::move(watch)) {
+  }
+
+  const hobject_t& get_target_oid() const final;
+  PG::do_osd_ops_params_t get_do_osd_ops_params() const final;
+  std::vector<OSDOp> create_osd_ops() final;
+
+private:
+  WatchRef watch;
+};
+
+const hobject_t& WatchTimeoutRequest::get_target_oid() const
+{
+  assert(watch->obc);
+  return watch->obc->get_oid();
+}
+
+PG::do_osd_ops_params_t
+WatchTimeoutRequest::get_do_osd_ops_params() const
+{
+  PG::do_osd_ops_params_t params;
+  params.conn = watch->conn;
+  params.reqid.name = watch->entity_name;
+  // as in the classical's simple_opc_create()
+  params.mtime = ceph_clock_now();
+  params.map_epoch = get_pg().get_osdmap_epoch();
+  params.orig_source_inst = { watch->entity_name, watch->winfo.addr };
+  //entity_inst_t orig_source_inst;
+  params.features = 0;
+  logger().debug("{}: params.reqid={}", __func__, params.reqid);
+  return params;
+}
+
+std::vector<OSDOp> WatchTimeoutRequest::create_osd_ops()
+{
+  logger().debug("{}", __func__);
+  assert(watch);
+  OSDOp osd_op;
+  osd_op.op.op = CEPH_OSD_OP_WATCH;
+  osd_op.op.flags = 0;
+  osd_op.op.watch.op = CEPH_OSD_WATCH_OP_UNWATCH;
+  osd_op.op.watch.cookie = watch->winfo.cookie;
+  return std::vector{std::move(osd_op)};
+}
+
 Watch::~Watch()
 {
   logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
@@ -27,8 +81,10 @@ seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool)
 {
   if (this->conn == conn) {
     logger().debug("conn={} already connected", conn);
+    timeout_timer.cancel();
   }
 
+  timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
   this->conn = std::move(conn);
   return seastar::now();
 }
@@ -87,6 +143,16 @@ void Watch::discard_state()
 {
   ceph_assert(obc);
   in_progress_notifies.clear();
+  timeout_timer.cancel();
+}
+
+void Watch::got_ping(utime_t)
+{
+  if (is_connected()) {
+    // using cancel() + arm() as rearm() has no overload for time delta.
+    timeout_timer.cancel();
+    timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
+  }
 }
 
 seastar::future<> Watch::remove(const bool send_disconnect)
@@ -113,6 +179,12 @@ void Watch::cancel_notify(const uint64_t notify_id)
   in_progress_notifies.erase(it);
 }
 
+void Watch::do_watch_timeout(Ref<PG> pg)
+{
+  pg->get_shard_services().start_operation<WatchTimeoutRequest>(
+    shared_from_this(), pg);
+}
+
 bool notify_reply_t::operator<(const notify_reply_t& rhs) const
 {
   // comparing std::pairs to emphasize our legacy. ceph-osd stores
index cc58d93903156d4ef4c2bdc2aac578a16555b101..fd5b0d1a6eb814c0aeec5568223a5879a344641f 100644 (file)
@@ -11,6 +11,7 @@
 
 #include "crimson/net/Connection.h"
 #include "crimson/osd/object_context.h"
+#include "crimson/osd/pg.h"
 #include "include/denc.h"
 
 namespace crimson::osd {
@@ -39,21 +40,29 @@ class Watch : public seastar::enable_shared_from_this<Watch> {
   watch_info_t winfo;
   entity_name_t entity_name;
 
+  seastar::timer<seastar::lowres_clock> timeout_timer;
+
   seastar::future<> start_notify(NotifyRef);
   seastar::future<> send_notify_msg(NotifyRef);
   seastar::future<> send_disconnect_msg();
   void discard_state();
+  void do_watch_timeout(Ref<PG> pg);
 
   friend Notify;
+  friend class WatchTimeoutRequest;
 
 public:
   Watch(private_ctag_t,
         crimson::osd::ObjectContextRef obc,
         const watch_info_t& winfo,
-        const entity_name_t& entity_name)
+        const entity_name_t& entity_name,
+        Ref<PG> pg)
     : obc(std::move(obc)),
       winfo(winfo),
-      entity_name(entity_name) {
+      entity_name(entity_name),
+      timeout_timer([this, pg=std::move(pg)] {
+        return do_watch_timeout(pg);
+      }) {
   }
   ~Watch();
 
@@ -64,9 +73,7 @@ public:
   bool is_connected() const {
     return static_cast<bool>(conn);
   }
-  void got_ping(utime_t) {
-    // NOP
-  }
+  void got_ping(utime_t);
 
   seastar::future<> remove(bool send_disconnect);