);
}
+static watch_info_t create_watch_info(const auto& osd_op,
+ const auto& msg)
+{
+ using crimson::common::local_conf;
+ const uint32_t timeout =
+ osd_op.op.watch.timeout == 0 ? local_conf()->osd_client_watch_timeout
+ : osd_op.op.watch.timeout;
+ return {
+ osd_op.op.watch.cookie,
+ timeout,
+ msg.get_connection()->get_peer_addr()
+ };
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_watch(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn)
+{
+ struct connect_ctx_t {
+ ObjectContext::watch_key_t key;
+ watch_info_t info;
+
+ connect_ctx_t(const OSDOp& osd_op, const MOSDOp& msg)
+ : key(osd_op.op.watch.cookie, msg.get_reqid().name),
+ info(create_watch_info(osd_op, msg)) {
+ }
+ };
+ return with_effect_on_obc(connect_ctx_t{ osd_op, get_message() },
+ [&] (auto& ctx) {
+ const auto& entity = ctx.key.second;
+ auto [it, emplaced] =
+ os.oi.watchers.try_emplace(ctx.key, std::move(ctx.info));
+ if (emplaced) {
+ logger().info("registered new watch {} by {}", it->second, entity);
+ txn.nop();
+ } else {
+ logger().info("found existing watch {} by {}", it->second, entity);
+ }
+ return seastar::now();
+ },
+ [] (auto&& ctx, ObjectContextRef obc) {
+ auto [it, emplaced] = obc->watchers.try_emplace(ctx.key, nullptr);
+ if (emplaced) {
+ it->second = crimson::osd::Watch::create();
+ logger().info("op_effect: added new watcher: {}", ctx.key);
+ } else {
+ logger().info("op_effect: found existing watcher: {}", ctx.key);
+ }
+ return it->second->connect(nullptr /* conn */, true /* will_ping */);
+ });
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch_subop_unwatch(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn)
+{
+ struct disconnect_ctx_t {
+ ObjectContext::watch_key_t key;
+ bool send_disconnect{ false };
+
+ disconnect_ctx_t(const OSDOp& osd_op, const MOSDOp& msg)
+ : key(osd_op.op.watch.cookie, msg.get_reqid().name) {
+ }
+ };
+ return with_effect_on_obc(disconnect_ctx_t{ osd_op, get_message() },
+ [&] (auto& ctx) {
+ const auto& entity = ctx.key.second;
+ if (auto nh = os.oi.watchers.extract(ctx.key); !nh.empty()) {
+ logger().info("removed watch {} by {}", nh.mapped(), entity);
+ txn.nop();
+ } else {
+ logger().info("can't remove: no watch by {}", entity);
+ }
+ return seastar::now();
+ },
+ [] (auto&& ctx, ObjectContextRef obc) {
+ if (auto nh = obc->watchers.extract(ctx.key); !nh.empty()) {
+ logger().info("op_effect: disconnect watcher {}", ctx.key);
+ return nh.mapped()->remove(ctx.send_disconnect);
+ } else {
+ logger().info("op_effect: disconnect failed to find watcher {}",
+ ctx.key);
+ return seastar::now();
+ }
+ });
+}
+
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_watch(
+ OSDOp& osd_op,
+ ObjectState& os,
+ ceph::os::Transaction& txn)
+{
+ if (!os.exists) {
+ return crimson::ct_error::enoent::make();
+ }
+ switch (osd_op.op.watch.op) {
+ case CEPH_OSD_WATCH_OP_WATCH:
+ return do_op_watch_subop_watch(osd_op, os, txn);
+ case CEPH_OSD_WATCH_OP_RECONNECT:
+ // TODO: implement reconnect
+ break;
+ case CEPH_OSD_WATCH_OP_PING:
+ // TODO: implement ping
+ break;
+ case CEPH_OSD_WATCH_OP_UNWATCH:
+ return do_op_watch_subop_unwatch(osd_op, os, txn);
+ case CEPH_OSD_WATCH_OP_LEGACY_WATCH:
+ logger().warn("ignoring CEPH_OSD_WATCH_OP_LEGACY_WATCH");
+ return crimson::ct_error::invarg::make();
+ }
+ logger().warn("unrecognized WATCH subop: {}", osd_op.op.watch.op);
+ return crimson::ct_error::invarg::make();
+}
+
static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
const std::string& type,
bufferlist::const_iterator& iter)
// watch/notify
case CEPH_OSD_OP_WATCH:
- return do_write_op([&osd_op] (auto& backend, auto& os, auto& txn) {
- logger().warn("CEPH_OSD_OP_WATCH is not implemented yet; ignoring");
- return osd_op_errorator::now();
+ return do_write_op([this, &osd_op] (auto& backend, auto& os, auto& txn) {
+ return do_op_watch(osd_op, os, txn);
});
default:
crimson::ct_error::input_output_error>;
using read_errorator = PGBackend::read_errorator;
using get_attr_errorator = PGBackend::get_attr_errorator;
+ using watch_errorator = crimson::errorator<
+ crimson::ct_error::enoent,
+ crimson::ct_error::invarg>;
public:
// because OpsExecuter is pretty heavy-weight object we want to ensure
call_errorator,
read_errorator,
get_attr_errorator,
+ watch_errorator,
PGBackend::stat_errorator>;
private:
EffectFunc&& effect_func);
call_errorator::future<> do_op_call(class OSDOp& osd_op);
+ watch_errorator::future<> do_op_watch(
+ class OSDOp& osd_op,
+ class ObjectState& os,
+ ceph::os::Transaction& txn);
+ watch_errorator::future<> do_op_watch_subop_watch(
+ class OSDOp& osd_op,
+ class ObjectState& os,
+ ceph::os::Transaction& txn);
+ watch_errorator::future<> do_op_watch_subop_unwatch(
+ class OSDOp& osd_op,
+ class ObjectState& os,
+ ceph::os::Transaction& txn);
hobject_t &get_target() const {
return obc->obs.oi.soid;