vector<OSDOp> opv;
Context *oncommit = NULL;
info->watch_lock.get_read(); // just to read registered status
+ bufferlist *poutbl = NULL;
if (info->registered && info->is_watch) {
ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" << dendl;
opv.push_back(OSDOp());
} else {
ldout(cct, 15) << "send_linger " << info->linger_id << " register" << dendl;
opv = info->ops;
- oncommit = new C_Linger_Commit(this, info);
+ C_Linger_Commit *c = new C_Linger_Commit(this, info);
+ if (!info->is_watch) {
+ info->notify_id = 0;
+ poutbl = &c->outbl;
+ }
+ oncommit = c;
}
info->watch_lock.put_read();
Op *o = new Op(info->target.base_oid, info->target.base_oloc,
NULL, NULL,
info->pobjver);
o->oncommit_sync = oncommit;
+ o->outbl = poutbl;
o->snapid = info->snap;
o->snapc = info->snapc;
o->mtime = info->mtime;
logger->inc(l_osdc_linger_send);
}
-void Objecter::_linger_commit(LingerOp *info, int r)
+void Objecter::_linger_commit(LingerOp *info, int r, bufferlist& outbl)
{
RWLock::WLocker wl(info->watch_lock);
ldout(cct, 10) << "_linger_commit " << info->linger_id << dendl;
// only tell the user the first time we do this
info->registered = true;
info->pobjver = NULL;
+
+ if (!info->is_watch) {
+ // make note of the notify_id
+ bufferlist::iterator p = outbl.begin();
+ try {
+ ::decode(info->notify_id, p);
+ ldout(cct, 10) << "_linger_commit notify_id=" << info->notify_id << dendl;
+ }
+ catch (buffer::error& e) {
+ }
+ }
}
struct C_DoWatchError : public Context {
} else if (!info->is_watch) {
// we have CEPH_WATCH_EVENT_NOTIFY_COMPLETE; we can do this inline since
// we know the only user (librados) is safe to call in fast-dispatch context
- assert(info->on_notify_finish);
- info->notify_result_bl->claim(m->get_data());
- info->on_notify_finish->complete(m->return_code);
+ if (info->notify_id &&
+ info->notify_id != m->notify_id) {
+ ldout(cct, 10) << __func__ << " reply notify " << m->notify_id
+ << " != " << info->notify_id << ", ignoring" << dendl;
+ } else {
+ assert(info->on_notify_finish);
+ info->notify_result_bl->claim(m->get_data());
+ info->on_notify_finish->complete(m->return_code);
+ }
} else {
finisher->queue(new C_DoWatchNotify(this, info, m));
_linger_callback_queue();
// we trigger these from an async finisher
Context *on_notify_finish;
bufferlist *notify_result_bl;
+ uint64_t notify_id;
WatchContext *watch_context;
on_reg_commit(NULL),
on_notify_finish(NULL),
notify_result_bl(NULL),
+ notify_id(0),
watch_context(NULL),
session(NULL),
register_tid(0),
struct C_Linger_Commit : public Context {
Objecter *objecter;
LingerOp *info;
+ bufferlist outbl; // used for notify only
C_Linger_Commit(Objecter *o, LingerOp *l) : objecter(o), info(l) {
info->get();
}
info->put();
}
void finish(int r) {
- objecter->_linger_commit(info, r);
+ objecter->_linger_commit(info, r, outbl);
}
};
void _linger_submit(LingerOp *info);
void _send_linger(LingerOp *info);
- void _linger_commit(LingerOp *info, int r);
+ void _linger_commit(LingerOp *info, int r, bufferlist& outbl);
void _linger_reconnect(LingerOp *info, int r);
void _send_linger_ping(LingerOp *info);
void _linger_ping(LingerOp *info, int r, utime_t sent, uint32_t register_gen);