case CEPH_OSD_OP_NOTIFY_ACK:
{
- ctx->notify_acks.push_back(op.watch.cookie);
+ try {
+ uint64_t notify_id = 0;
+ uint64_t watch_cookie = 0;
+ ::decode(notify_id, bp);
+ ::decode(watch_cookie, bp);
+ OpContext::NotifyAck ack(notify_id, watch_cookie);
+ ctx->notify_acks.push_back(ack);
+ } catch (const buffer::error &e) {
+ OpContext::NotifyAck ack(
+ // op.watch.cookie is actually the notify_id for historical reasons
+ op.watch.cookie
+ );
+ ctx->notify_acks.push_back(ack);
+ }
}
break;
notif->init();
}
- for (list<uint64_t>::iterator p = ctx->notify_acks.begin();
+ for (list<OpContext::NotifyAck>::iterator p = ctx->notify_acks.begin();
p != ctx->notify_acks.end();
++p) {
- dout(10) << "notify_ack " << *p << dendl;
+ dout(10) << "notify_ack " << make_pair(p->watch_cookie, p->notify_id) << dendl;
for (map<pair<uint64_t, entity_name_t>, WatchRef>::iterator i =
ctx->obc->watchers.begin();
i != ctx->obc->watchers.end();
++i) {
if (i->first.second != entity) continue;
+ if (p->watch_cookie &&
+ p->watch_cookie.get() != i->first.first) continue;
dout(10) << "acking notify on watch " << i->first << dendl;
- i->second->notify_ack(*p);
+ i->second->notify_ack(p->notify_id);
}
}
}
#ifndef CEPH_REPLICATEDPG_H
#define CEPH_REPLICATEDPG_H
+#include <boost/optional.hpp>
+
+#include "include/assert.h"
#include "PG.h"
#include "OSD.h"
bool watch_connect, watch_disconnect;
watch_info_t watch_info;
list<notify_info_t> notifies;
- list<uint64_t> notify_acks;
+ struct NotifyAck {
+ boost::optional<uint64_t> watch_cookie;
+ uint64_t notify_id;
+ NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
+ NotifyAck(uint64_t notify_id, uint64_t cookie)
+ : watch_cookie(cookie), notify_id(notify_id) {}
+ };
+ list<NotifyAck> notify_acks;
uint64_t bytes_written, bytes_read;