#include "crimson/osd/osd_operations/peering_event.h"
#include "crimson/osd/pg_recovery.h"
#include "crimson/osd/replicated_recovery_backend.h"
+#include "crimson/osd/watch.h"
using std::ostream;
using std::set;
}
}
+void PG::check_blocklisted_obc_watchers(
+ ObjectContextRef &obc)
+{
+ if (obc->watchers.empty()) {
+ for (auto &[src, winfo] : obc->obs.oi.watchers) {
+ auto watch = crimson::osd::Watch::create(
+ obc, winfo, src.second, this);
+ watch->disconnect();
+ auto [it, emplaced] = obc->watchers.emplace(src, std::move(watch));
+ assert(emplaced);
+ logger().debug("added watch for obj {}, client {}",
+ obc->get_oid(), src.second);
+ }
+ }
+}
+
PG::load_obc_iertr::future<>
PG::with_locked_obc(const hobject_t &hobj,
const OpInfo &op_info,
throw crimson::common::system_shutdown_exception();
}
const hobject_t oid = get_oid(hobj);
+ auto wrapper = [f=std::move(f), this](auto obc) {
+ check_blocklisted_obc_watchers(obc);
+ return f(obc);
+ };
switch (get_lock_type(op_info)) {
case RWState::RWREAD:
- return obc_loader.with_obc<RWState::RWREAD>(oid, std::move(f));
+ return obc_loader.with_obc<RWState::RWREAD>(oid, std::move(wrapper));
case RWState::RWWRITE:
- return obc_loader.with_obc<RWState::RWWRITE>(oid, std::move(f));
+ return obc_loader.with_obc<RWState::RWWRITE>(oid, std::move(wrapper));
case RWState::RWEXCL:
- return obc_loader.with_obc<RWState::RWEXCL>(oid, std::move(f));
+ return obc_loader.with_obc<RWState::RWEXCL>(oid, std::move(wrapper));
default:
ceph_abort();
};
return seastar::now();
}
+void Watch::disconnect()
+{
+ ceph_assert(!conn);
+ timeout_timer.cancel();
+ timeout_timer.arm(std::chrono::seconds{winfo.timeout_seconds});
+}
+
seastar::future<> Watch::send_notify_msg(NotifyRef notify)
{
logger().info("{} for notify(id={})", __func__, notify->ninfo.notify_id);