lock->Lock();
WatchContext *wc = new WatchContext(this, oid, ctx);
- client->register_watcher(wc, oid, ctx, cookie);
+ client->register_watcher(wc, cookie);
prepare_assert_ops(&rd);
rd.watch(*cookie, ver, 1);
bufferlist bl;
/* this is called with IoCtxImpl::lock held */
-int librados::IoCtxImpl::_notify_ack(const object_t& oid,
- uint64_t notify_id, uint64_t ver)
+int librados::IoCtxImpl::_notify_ack(
+ const object_t& oid,
+ uint64_t notify_id, uint64_t ver,
+ uint64_t cookie)
{
::ObjectOperation rd;
prepare_assert_ops(&rd);
- rd.notify_ack(notify_id, ver);
+ rd.notify_ack(notify_id, ver, cookie);
objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
return 0;
lock->Lock();
WatchContext *wc = new WatchContext(this, oid, ctx);
- client->register_watcher(wc, oid, ctx, &cookie);
+ client->register_watcher(wc, &cookie);
uint32_t prot_ver = 1;
uint32_t timeout = notify_timeout;
::encode(prot_ver, inbl);
librados::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_,
const object_t& _oc,
librados::WatchCtx *_ctx)
- : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0)
+ : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0), cookie(0)
{
io_ctx_impl->get();
}
ctx->notify(opcode, ver, payload);
if (opcode != WATCH_NOTIFY_COMPLETE) {
client_lock->Lock();
- io_ctx_impl->_notify_ack(oid, notify_id, ver);
+ io_ctx_impl->_notify_ack(oid, notify_id, ver, cookie);
client_lock->Unlock();
}
}
int watch(const object_t& oid, uint64_t ver, uint64_t *cookie, librados::WatchCtx *ctx);
int unwatch(const object_t& oid, uint64_t cookie);
int notify(const object_t& oid, uint64_t ver, bufferlist& bl);
- int _notify_ack(const object_t& oid, uint64_t notify_id, uint64_t ver);
+ int _notify_ack(
+ const object_t& oid, uint64_t notify_id, uint64_t ver,
+ uint64_t cookie);
eversion_t last_version();
void set_assert_version(uint64_t ver);
const object_t oid;
librados::WatchCtx *ctx;
uint64_t linger_id;
+ uint64_t cookie;
WatchContext(IoCtxImpl *io_ctx_impl_,
const object_t& _oc,
return r;
}
-void librados::RadosClient::register_watcher(WatchContext *wc,
- const object_t& oid,
- librados::WatchCtx *ctx,
- uint64_t *cookie)
+void librados::RadosClient::register_watcher(WatchContext *wc, uint64_t *cookie)
{
assert(lock.is_locked());
- *cookie = ++max_watch_cookie;
- watchers[*cookie] = wc;
+ wc->cookie = *cookie = ++max_watch_cookie;
+ watchers[wc->cookie] = wc;
}
void librados::RadosClient::unregister_watcher(uint64_t cookie)
uint64_t max_watch_cookie;
map<uint64_t, librados::WatchContext *> watchers;
- void register_watcher(librados::WatchContext *wc, const object_t& oid,
- librados::WatchCtx *ctx, uint64_t *cookie);
+ void register_watcher(librados::WatchContext *wc, uint64_t *cookie);
void unregister_watcher(uint64_t cookie);
void watch_notify(MWatchNotify *m);
void get();
add_watch(CEPH_OSD_OP_NOTIFY, cookie, ver, 1, inbl);
}
- void notify_ack(uint64_t notify_id, uint64_t ver) {
+ void notify_ack(uint64_t notify_id, uint64_t ver, uint64_t cookie) {
bufferlist bl;
+ ::encode(notify_id, bl);
+ ::encode(cookie, bl);
add_watch(CEPH_OSD_OP_NOTIFY_ACK, notify_id, ver, 0, bl);
}