From 4070a7d5577395f796289afcb466159e5931fcc8 Mon Sep 17 00:00:00 2001 From: Radoslaw Zarzynski Date: Mon, 15 Mar 2021 11:59:54 +0000 Subject: [PATCH] crimson/osd: wire up handling of watch timeouts. Signed-off-by: Radoslaw Zarzynski --- src/crimson/osd/ops_executer.cc | 5 ++- src/crimson/osd/pg.h | 1 + src/crimson/osd/watch.cc | 72 +++++++++++++++++++++++++++++++++ src/crimson/osd/watch.h | 17 +++++--- 4 files changed, 88 insertions(+), 7 deletions(-) diff --git a/src/crimson/osd/ops_executer.cc b/src/crimson/osd/ops_executer.cc index 9ad50d3668c..955e977932a 100644 --- a/src/crimson/osd/ops_executer.cc +++ b/src/crimson/osd/ops_executer.cc @@ -180,11 +180,12 @@ OpsExecuter::watch_ierrorator::future<> OpsExecuter::do_op_watch_subop_watch( } return seastar::now(); }, - [] (auto&& ctx, ObjectContextRef obc, Ref) { + [] (auto&& ctx, ObjectContextRef obc, Ref pg) { auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr); if (emplaced) { const auto& [cookie, entity] = ctx.key; - it->second = crimson::osd::Watch::create(obc, ctx.info, entity); + it->second = crimson::osd::Watch::create( + obc, ctx.info, entity, std::move(pg)); logger().info("op_effect: added new watcher: {}", ctx.key); } else { logger().info("op_effect: found existing watcher: {}", ctx.key); diff --git a/src/crimson/osd/pg.h b/src/crimson/osd/pg.h index fede88ec933..b3d24eb5487 100644 --- a/src/crimson/osd/pg.h +++ b/src/crimson/osd/pg.h @@ -735,6 +735,7 @@ private: friend class BackfillRecovery; friend struct PGFacade; friend class InternalClientRequest; + friend class WatchTimeoutRequest; private: seastar::future find_unfound() { return seastar::make_ready_future(true); diff --git a/src/crimson/osd/watch.cc b/src/crimson/osd/watch.cc index bdceaf68682..ccdec827712 100644 --- a/src/crimson/osd/watch.cc +++ b/src/crimson/osd/watch.cc @@ -7,6 +7,8 @@ #include #include "crimson/osd/watch.h" +#include "crimson/osd/osd_operations/internal_client_request.h" + #include "messages/MWatchNotify.h" @@ -18,6 +20,58 @@ namespace { namespace crimson::osd { +// a watcher can remove itself if it has not seen a notification after a period of time. +// in the case, we need to drop it also from the persisted `ObjectState` instance. +// this operation resembles a bit the `_UNWATCH` subop. +class WatchTimeoutRequest final : public InternalClientRequest { +public: + WatchTimeoutRequest(WatchRef watch, Ref pg) + : InternalClientRequest(std::move(pg)), + watch(std::move(watch)) { + } + + const hobject_t& get_target_oid() const final; + PG::do_osd_ops_params_t get_do_osd_ops_params() const final; + std::vector create_osd_ops() final; + +private: + WatchRef watch; +}; + +const hobject_t& WatchTimeoutRequest::get_target_oid() const +{ + assert(watch->obc); + return watch->obc->get_oid(); +} + +PG::do_osd_ops_params_t +WatchTimeoutRequest::get_do_osd_ops_params() const +{ + PG::do_osd_ops_params_t params; + params.conn = watch->conn; + params.reqid.name = watch->entity_name; + // as in the classical's simple_opc_create() + params.mtime = ceph_clock_now(); + params.map_epoch = get_pg().get_osdmap_epoch(); + params.orig_source_inst = { watch->entity_name, watch->winfo.addr }; + //entity_inst_t orig_source_inst; + params.features = 0; + logger().debug("{}: params.reqid={}", __func__, params.reqid); + return params; +} + +std::vector WatchTimeoutRequest::create_osd_ops() +{ + logger().debug("{}", __func__); + assert(watch); + OSDOp osd_op; + osd_op.op.op = CEPH_OSD_OP_WATCH; + osd_op.op.flags = 0; + osd_op.op.watch.op = CEPH_OSD_WATCH_OP_UNWATCH; + osd_op.op.watch.cookie = watch->winfo.cookie; + return std::vector{std::move(osd_op)}; +} + Watch::~Watch() { logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie()); @@ -27,8 +81,10 @@ seastar::future<> Watch::connect(crimson::net::ConnectionRef conn, bool) { if (this->conn == conn) { logger().debug("conn={} already connected", conn); + timeout_timer.cancel(); } + timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds}); this->conn = std::move(conn); return seastar::now(); } @@ -87,6 +143,16 @@ void Watch::discard_state() { ceph_assert(obc); in_progress_notifies.clear(); + timeout_timer.cancel(); +} + +void Watch::got_ping(utime_t) +{ + if (is_connected()) { + // using cancel() + arm() as rearm() has no overload for time delta. + timeout_timer.cancel(); + timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds}); + } } seastar::future<> Watch::remove(const bool send_disconnect) @@ -113,6 +179,12 @@ void Watch::cancel_notify(const uint64_t notify_id) in_progress_notifies.erase(it); } +void Watch::do_watch_timeout(Ref pg) +{ + pg->get_shard_services().start_operation( + shared_from_this(), pg); +} + bool notify_reply_t::operator<(const notify_reply_t& rhs) const { // comparing std::pairs to emphasize our legacy. ceph-osd stores diff --git a/src/crimson/osd/watch.h b/src/crimson/osd/watch.h index cc58d939031..fd5b0d1a6eb 100644 --- a/src/crimson/osd/watch.h +++ b/src/crimson/osd/watch.h @@ -11,6 +11,7 @@ #include "crimson/net/Connection.h" #include "crimson/osd/object_context.h" +#include "crimson/osd/pg.h" #include "include/denc.h" namespace crimson::osd { @@ -39,21 +40,29 @@ class Watch : public seastar::enable_shared_from_this { watch_info_t winfo; entity_name_t entity_name; + seastar::timer timeout_timer; + seastar::future<> start_notify(NotifyRef); seastar::future<> send_notify_msg(NotifyRef); seastar::future<> send_disconnect_msg(); void discard_state(); + void do_watch_timeout(Ref pg); friend Notify; + friend class WatchTimeoutRequest; public: Watch(private_ctag_t, crimson::osd::ObjectContextRef obc, const watch_info_t& winfo, - const entity_name_t& entity_name) + const entity_name_t& entity_name, + Ref pg) : obc(std::move(obc)), winfo(winfo), - entity_name(entity_name) { + entity_name(entity_name), + timeout_timer([this, pg=std::move(pg)] { + return do_watch_timeout(pg); + }) { } ~Watch(); @@ -64,9 +73,7 @@ public: bool is_connected() const { return static_cast(conn); } - void got_ping(utime_t) { - // NOP - } + void got_ping(utime_t); seastar::future<> remove(bool send_disconnect); -- 2.39.5