If the notified send back reply payloads, pass them back to the notifier.
Note that we have changed the on-wire behavior of the watch completion
message a bit: instead of sending the original notify payload back to the
notifier, we send the map of notified to replies. Note that only users of
the new API will know what to do with the notify acknowledgement
information. At the same time, we stop sending the original payload.
However, the old API users never saw that data; we were uselessly sending
it over the wire.
Signed-off-by: Sage Weil <sage@redhat.com>
uint64_t watch_cookie = 0;
::decode(notify_id, bp);
::decode(watch_cookie, bp);
+ bufferlist reply_bl;
+ if (!bp.end()) {
+ ::decode(reply_bl, bp);
+ }
tracepoint(osd, do_osd_op_pre_notify_ack, soid.oid.name.c_str(), soid.snap.val, notify_id, watch_cookie, "Y");
- OpContext::NotifyAck ack(notify_id, watch_cookie);
+ OpContext::NotifyAck ack(notify_id, watch_cookie, reply_bl);
ctx->notify_acks.push_back(ack);
} catch (const buffer::error &e) {
tracepoint(osd, do_osd_op_pre_notify_ack, soid.oid.name.c_str(), soid.snap.val, op.watch.cookie, 0, "N");
if (p->watch_cookie &&
p->watch_cookie.get() != i->first.first) continue;
dout(10) << "acking notify on watch " << i->first << dendl;
- i->second->notify_ack(p->notify_id);
+ i->second->notify_ack(p->notify_id, p->reply_bl);
}
}
}
struct NotifyAck {
boost::optional<uint64_t> watch_cookie;
uint64_t notify_id;
+ bufferlist reply_bl;
NotifyAck(uint64_t notify_id) : notify_id(notify_id) {}
- NotifyAck(uint64_t notify_id, uint64_t cookie)
- : watch_cookie(cookie), notify_id(notify_id) {}
+ NotifyAck(uint64_t notify_id, uint64_t cookie, bufferlist& rbl)
+ : watch_cookie(cookie), notify_id(notify_id) {
+ reply_bl.claim(rbl);
+ }
};
list<NotifyAck> notify_acks;
watchers.insert(watch);
}
-void Notify::complete_watcher(WatchRef watch)
+void Notify::complete_watcher(WatchRef watch, bufferlist& reply_bl)
{
Mutex::Locker l(lock);
dout(10) << "complete_watcher" << dendl;
assert(in_progress_watchers > 0);
watchers.erase(watch);
--in_progress_watchers;
+ notify_replies[watch->get_watcher_gid()].claim(reply_bl);
+ maybe_complete_notify();
+}
+
+void Notify::complete_watcher_remove(WatchRef watch)
+{
+ Mutex::Locker l(lock);
+ dout(10) << __func__ << dendl;
+ if (is_discarded())
+ return;
+ assert(in_progress_watchers > 0);
+ watchers.erase(watch);
+ --in_progress_watchers;
maybe_complete_notify();
}
<< in_progress_watchers
<< " in progress watchers " << dendl;
if (!in_progress_watchers) {
+ bufferlist bl;
+ ::encode(notify_replies, bl);
+ bufferlist empty;
MWatchNotify *reply(new MWatchNotify(cookie, version, notify_id,
- WATCH_NOTIFY, payload));
+ WATCH_NOTIFY, empty));
+ reply->set_data(bl);
if (timed_out)
reply->return_code = -ETIMEDOUT;
osd->send_message_osd_client(reply, client.get());
for (map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.begin();
i != in_progress_notifies.end();
++i) {
- i->second->complete_watcher(self.lock());
+ i->second->complete_watcher_remove(self.lock());
}
discard_state();
}
osd->send_message_osd_client(notify_msg, conn.get());
}
-void Watch::notify_ack(uint64_t notify_id)
+void Watch::notify_ack(uint64_t notify_id, bufferlist& reply_bl)
{
dout(10) << "notify_ack" << dendl;
map<uint64_t, NotifyRef>::iterator i = in_progress_notifies.find(notify_id);
if (i != in_progress_notifies.end()) {
- i->second->complete_watcher(self.lock());
+ i->second->complete_watcher(self.lock(), reply_bl);
in_progress_notifies.erase(i);
}
}
CancelableContext *cb;
Mutex lock;
+ /// gid -> reply_bl for everyone who acked the notify
+ map<uint64_t,bufferlist> notify_replies;
/// true if this notify is being discarded
bool is_discarded() {
/// Called once per NotifyAck
void complete_watcher(
+ WatchRef watcher, ///< [in] watcher to complete
+ bufferlist& reply_bl ///< [in] reply buffer from the notified watcher
+ );
+ /// Called when a watcher unregisters or times out
+ void complete_watcher_remove(
WatchRef watcher ///< [in] watcher to complete
);
/// NOTE: must be called with pg lock held
~Watch();
+ uint64_t get_watcher_gid() const {
+ return entity.num();
+ }
+
string gen_dbg_prefix();
static WatchRef makeWatchRef(
ReplicatedPG *pg, OSDService *osd,
/// Call when notify_ack received on notify_id
void notify_ack(
- uint64_t notify_id ///< [in] id of acked notify
+ uint64_t notify_id, ///< [in] id of acked notify
+ bufferlist& reply_bl ///< [in] notify reply buffer
);
};