}
return seastar::now();
},
- [] (auto&& ctx, ObjectContextRef obc, Ref<PG>) {
+ [] (auto&& ctx, ObjectContextRef obc, Ref<PG> pg) {
auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
if (emplaced) {
const auto& [cookie, entity] = ctx.key;
- it->second = crimson::osd::Watch::create(obc, ctx.info, entity);
+ it->second = crimson::osd::Watch::create(
+ obc, ctx.info, entity, std::move(pg));
logger().info("op_effect: added new watcher: {}", ctx.key);
} else {
logger().info("op_effect: found existing watcher: {}", ctx.key);
#include <boost/range/algorithm_ext/insert.hpp>
#include "crimson/osd/watch.h"
+#include "crimson/osd/osd_operations/internal_client_request.h"
+
#include "messages/MWatchNotify.h"
namespace crimson::osd {
+// a watcher can remove itself if it has not seen a notification after a period of time.
+// in the case, we need to drop it also from the persisted `ObjectState` instance.
+// this operation resembles a bit the `_UNWATCH` subop.
+class WatchTimeoutRequest final : public InternalClientRequest {
+public:
+ WatchTimeoutRequest(WatchRef watch, Ref<PG> pg)
+ : InternalClientRequest(std::move(pg)),
+ watch(std::move(watch)) {
+ }
+
+ const hobject_t& get_target_oid() const final;
+ PG::do_osd_ops_params_t get_do_osd_ops_params() const final;
+ std::vector<OSDOp> create_osd_ops() final;
+
+private:
+ WatchRef watch;
+};
+
+const hobject_t& WatchTimeoutRequest::get_target_oid() const
+{
+ assert(watch->obc);
+ return watch->obc->get_oid();
+}
+
+PG::do_osd_ops_params_t
+WatchTimeoutRequest::get_do_osd_ops_params() const
+{
+ PG::do_osd_ops_params_t params;
+ params.conn = watch->conn;
+ params.reqid.name = watch->entity_name;
+ // as in the classical's simple_opc_create()
+ params.mtime = ceph_clock_now();
+ params.map_epoch = get_pg().get_osdmap_epoch();
+ params.orig_source_inst = { watch->entity_name, watch->winfo.addr };
+ //entity_inst_t orig_source_inst;
+ params.features = 0;
+ logger().debug("{}: params.reqid={}", __func__, params.reqid);
+ return params;
+}
+
+std::vector<OSDOp> WatchTimeoutRequest::create_osd_ops()
+{
+ logger().debug("{}", __func__);
+ assert(watch);
+ OSDOp osd_op;
+ osd_op.op.op = CEPH_OSD_OP_WATCH;
+ osd_op.op.flags = 0;
+ osd_op.op.watch.op = CEPH_OSD_WATCH_OP_UNWATCH;
+ osd_op.op.watch.cookie = watch->winfo.cookie;
+ return std::vector{std::move(osd_op)};
+}
+
Watch::~Watch()
{
logger().debug("{} gid={} cookie={}", __func__, get_watcher_gid(), get_cookie());
{
if (this->conn == conn) {
logger().debug("conn={} already connected", conn);
+ timeout_timer.cancel();
}
+ timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
this->conn = std::move(conn);
return seastar::now();
}
{
ceph_assert(obc);
in_progress_notifies.clear();
+ timeout_timer.cancel();
+}
+
+void Watch::got_ping(utime_t)
+{
+ if (is_connected()) {
+ // using cancel() + arm() as rearm() has no overload for time delta.
+ timeout_timer.cancel();
+ timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
+ }
}
seastar::future<> Watch::remove(const bool send_disconnect)
in_progress_notifies.erase(it);
}
+void Watch::do_watch_timeout(Ref<PG> pg)
+{
+ pg->get_shard_services().start_operation<WatchTimeoutRequest>(
+ shared_from_this(), pg);
+}
+
bool notify_reply_t::operator<(const notify_reply_t& rhs) const
{
// comparing std::pairs to emphasize our legacy. ceph-osd stores
#include "crimson/net/Connection.h"
#include "crimson/osd/object_context.h"
+#include "crimson/osd/pg.h"
#include "include/denc.h"
namespace crimson::osd {
watch_info_t winfo;
entity_name_t entity_name;
+ seastar::timer<seastar::lowres_clock> timeout_timer;
+
seastar::future<> start_notify(NotifyRef);
seastar::future<> send_notify_msg(NotifyRef);
seastar::future<> send_disconnect_msg();
void discard_state();
+ void do_watch_timeout(Ref<PG> pg);
friend Notify;
+ friend class WatchTimeoutRequest;
public:
Watch(private_ctag_t,
crimson::osd::ObjectContextRef obc,
const watch_info_t& winfo,
- const entity_name_t& entity_name)
+ const entity_name_t& entity_name,
+ Ref<PG> pg)
: obc(std::move(obc)),
winfo(winfo),
- entity_name(entity_name) {
+ entity_name(entity_name),
+ timeout_timer([this, pg=std::move(pg)] {
+ return do_watch_timeout(pg);
+ }) {
}
~Watch();
bool is_connected() const {
return static_cast<bool>(conn);
}
- void got_ping(utime_t) {
- // NOP
- }
+ void got_ping(utime_t);
seastar::future<> remove(bool send_disconnect);