});
}
+OpsExecuter::watch_errorator::future<> OpsExecuter::do_op_notify_ack(
+ OSDOp& osd_op,
+ const ObjectState& os)
+{
+ struct notifyack_ctx_t {
+ const entity_name_t entity;
+ std::optional<uint64_t> watch_cookie;
+ uint64_t notify_id;
+ ceph::bufferlist reply_bl;
+
+ notifyack_ctx_t(const MOSDOp& msg) : entity(msg.get_reqid().name) {
+ }
+ };
+ return with_effect_on_obc(notifyack_ctx_t{ get_message() },
+ [&] (auto& ctx) {
+ try {
+ auto bp = osd_op.indata.cbegin();
+ uint64_t wc = 0;
+ ceph::decode(ctx.notify_id, bp);
+ ceph::decode(wc, bp);
+ ctx.watch_cookie = wc;
+ if (!bp.end()) {
+ ceph::decode(ctx.reply_bl, bp);
+ }
+ } catch (const buffer::error&) {
+ // op.watch.cookie is actually the notify_id for historical reasons
+ ctx.notify_id = osd_op.op.watch.cookie;
+ }
+ return seastar::now();
+ },
+ [] (auto&& ctx, ObjectContextRef obc) {
+ if (ctx.watch_cookie) {
+ logger().info("notify_ack watch_cookie={}, notify_id={}",
+ *ctx.watch_cookie, ctx.notify_id);
+ } else {
+ logger().info("notify_ack notify_id={}", ctx.notify_id);
+ }
+ return seastar::do_for_each(obc->watchers,
+ [ctx=std::move(ctx)] (auto& kv) {
+ const auto& [key, watchp] = kv;
+ auto& [cookie, entity] = key;
+ if (ctx.entity != entity) {
+ return seastar::now();
+ }
+ if (ctx.watch_cookie && *ctx.watch_cookie != cookie) {
+ return seastar::now();
+ }
+ logger().info("acking notify on watch {}", key);
+ return watchp->notify_ack(ctx.notify_id, ctx.reply_bl);
+ });
+ });
+}
+
static inline std::unique_ptr<const PGLSFilter> get_pgls_filter(
const std::string& type,
bufferlist::const_iterator& iter)
return do_read_op([this, &osd_op] (auto&, const auto& os) {
return do_op_notify(osd_op, os);
});
+ case CEPH_OSD_OP_NOTIFY_ACK:
+ return do_read_op([this, &osd_op] (auto&, const auto& os) {
+ return do_op_notify_ack(osd_op, os);
+ });
default:
logger().warn("unknown op {}", ceph_osd_op_name(op.op));