lock->Lock();
- WatchContext *wc = new WatchContext(this, oid, ctx);
+ WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
+ wc->watch_ctx = ctx;
client->register_watch_notify_callback(wc, cookie);
prepare_assert_ops(&wr);
wr.watch(*cookie, ver, 1);
if (r < 0) {
lock->Lock();
- client->unregister_watch_notify_callback(*cookie);
+ client->unregister_watch_notify_callback(*cookie); // destroys wc
lock->Unlock();
}
prepare_assert_ops(&rd);
rd.notify_ack(notify_id, ver, cookie);
objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
-
return 0;
}
Context *onack = new C_SafeCond(&mylock, &cond, &done, &r);
version_t objver;
uint64_t cookie;
- C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all);
::ObjectOperation rd;
prepare_assert_ops(&rd);
lock->Lock();
- WatchContext *wc = new WatchContext(this, oid, ctx);
+ WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
+ wc->notify_done = &done_all;
+ wc->notify_lock = &mylock_all;
+ wc->notify_cond = &cond_all;
+ wc->notify_rval = &r;
client->register_watch_notify_callback(wc, &cookie);
uint32_t prot_ver = 1;
uint32_t timeout = notify_timeout;
cond.Wait(mylock);
mylock.Unlock();
- mylock_all.Lock();
if (r == 0) {
+ mylock_all.Lock();
while (!done_all)
cond_all.Wait(mylock_all);
+ mylock_all.Unlock();
}
- mylock_all.Unlock();
lock->Lock();
- client->unregister_watch_notify_callback(cookie);
+ client->unregister_watch_notify_callback(cookie); // destroys wc
lock->Unlock();
set_sync_op_version(objver);
- delete ctx;
return r;
}
c->put_unlock();
}
-///////////////////////// C_NotifyComplete /////////////////////////////
-
-librados::IoCtxImpl::C_NotifyComplete::C_NotifyComplete(Mutex *_l,
- Cond *_c,
- bool *_d)
- : lock(_l), cond(_c), done(_d)
-{
- *done = false;
-}
-
-void librados::IoCtxImpl::C_NotifyComplete::notify(uint8_t opcode,
- uint64_t ver,
- bufferlist& bl)
-{
- lock->Lock();
- *done = true;
- cond->Signal();
- lock->Unlock();
-}
-
-/////////////////////////// WatchContext ///////////////////////////////
-
-librados::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_,
- const object_t& _oc,
- librados::WatchCtx *_ctx)
- : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0), cookie(0)
-{
- io_ctx_impl->get();
-}
-
-librados::WatchContext::~WatchContext()
-{
- io_ctx_impl->put();
-}
-
-void librados::WatchContext::notify(Mutex *client_lock,
- uint8_t opcode,
- uint64_t ver,
- uint64_t notify_id,
- bufferlist& payload)
-{
- ctx->notify(opcode, ver, payload);
- if (opcode != WATCH_NOTIFY_COMPLETE) {
- client_lock->Lock();
- io_ctx_impl->_notify_ack(oid, notify_id, ver, cookie);
- client_lock->Unlock();
- }
-}
void set_assert_src_version(const object_t& oid, uint64_t ver);
void set_notify_timeout(uint32_t timeout);
- struct C_NotifyComplete : public librados::WatchCtx {
- Mutex *lock;
- Cond *cond;
- bool *done;
-
- C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d);
- void notify(uint8_t opcode, uint64_t ver, bufferlist& bl);
- };
};
namespace librados {
-struct WatchContext : public RefCountedWaitObject {
- IoCtxImpl *io_ctx_impl;
- const object_t oid;
- librados::WatchCtx *ctx;
- uint64_t linger_id;
- uint64_t cookie;
-
- WatchContext(IoCtxImpl *io_ctx_impl_,
- const object_t& _oc,
- librados::WatchCtx *_ctx);
- ~WatchContext();
+
+ /**
+ * watch/notify info
+ *
+ * Capture state about a watch or an in-progress notify
+ */
+struct WatchNotifyInfo : public RefCountedWaitObject {
+ IoCtxImpl *io_ctx_impl; // parent
+ const object_t oid; // the object
+ uint64_t linger_id; // we use this to unlinger when we are done
+ uint64_t cookie; // callback cookie
+
+ // watcher
+ librados::WatchCtx *watch_ctx;
+
+ // notify that we initiated
+ Mutex *notify_lock;
+ Cond *notify_cond;
+ bool *notify_done;
+ int *notify_rval;
+
+ WatchNotifyInfo(IoCtxImpl *io_ctx_impl_,
+ const object_t& _oc)
+ : io_ctx_impl(io_ctx_impl_),
+ oid(_oc),
+ linger_id(0),
+ cookie(0),
+ watch_ctx(NULL),
+ notify_lock(NULL),
+ notify_cond(NULL),
+ notify_done(NULL),
+ notify_rval(NULL) {
+ io_ctx_impl->get();
+ }
+
+ ~WatchNotifyInfo() {
+ io_ctx_impl->put();
+ }
+
void notify(Mutex *lock, uint8_t opcode, uint64_t ver, uint64_t notify_id,
- bufferlist& payload);
+ bufferlist& payload,
+ int return_code);
};
}
#endif
refcnt(1),
log_last_version(0), log_cb(NULL), log_cb_arg(NULL),
finisher(cct),
- max_watch_cookie(0)
+ max_watch_notify_cookie(0)
{
}
return r;
}
+void librados::RadosClient::blacklist_self(bool set) {
+ Mutex::Locker l(lock);
+ objecter->blacklist_self(set);
+}
+
+
+// -----------
+// watch/notify
+
void librados::RadosClient::register_watch_notify_callback(
- WatchContext *wc,
+ WatchNotifyInfo *wc,
uint64_t *cookie)
{
assert(lock.is_locked_by_me());
- wc->cookie = *cookie = ++max_watch_cookie;
- watchers[wc->cookie] = wc;
+ wc->cookie = *cookie = ++max_watch_notify_cookie;
+ ldout(cct,10) << __func__ << " cookie " << wc->cookie << dendl;
+ watch_notify_info[wc->cookie] = wc;
}
void librados::RadosClient::unregister_watch_notify_callback(uint64_t cookie)
{
+ ldout(cct,10) << __func__ << " cookie " << cookie << dendl;
assert(lock.is_locked_by_me());
- map<uint64_t, WatchContext *>::iterator iter = watchers.find(cookie);
- if (iter != watchers.end()) {
- WatchContext *ctx = iter->second;
+ map<uint64_t, WatchNotifyInfo *>::iterator iter =
+ watch_notify_info.find(cookie);
+ if (iter != watch_notify_info.end()) {
+ WatchNotifyInfo *ctx = iter->second;
if (ctx->linger_id)
objecter->unregister_linger(ctx->linger_id);
- watchers.erase(iter);
+ watch_notify_info.erase(iter);
lock.Unlock();
- ldout(cct, 10) << __func__ << " dropping reference, waiting ctx=" << (void *)ctx << dendl;
+ ldout(cct, 10) << __func__ << " dropping reference, waiting ctx="
+ << (void *)ctx << dendl;
ctx->put_wait();
ldout(cct, 10) << __func__ << " done ctx=" << (void *)ctx << dendl;
lock.Lock();
}
}
-void librados::RadosClient::blacklist_self(bool set) {
- Mutex::Locker l(lock);
- objecter->blacklist_self(set);
-}
-
-class C_WatchNotify : public Context {
- librados::WatchContext *ctx;
- Mutex *client_lock;
- uint8_t opcode;
- uint64_t ver;
- uint64_t notify_id;
- bufferlist bl;
-
-public:
- C_WatchNotify(librados::WatchContext *_ctx, Mutex *_client_lock,
- uint8_t _o, uint64_t _v, uint64_t _n, bufferlist& _bl) :
- ctx(_ctx), client_lock(_client_lock), opcode(_o), ver(_v), notify_id(_n), bl(_bl) {}
-
+struct C_DoWatchNotify : public Context {
+ librados::RadosClient *rados;
+ MWatchNotify *m;
+ C_DoWatchNotify(librados::RadosClient *r, MWatchNotify *m) : rados(r), m(m) {}
void finish(int r) {
- ctx->notify(client_lock, opcode, ver, notify_id, bl);
- ctx->put();
+ rados->do_watch_notify(m);
}
};
void librados::RadosClient::handle_watch_notify(MWatchNotify *m)
{
Mutex::Locker l(lock);
- map<uint64_t, WatchContext *>::iterator iter = watchers.find(m->cookie);
- if (iter != watchers.end()) {
- WatchContext *wc = iter->second;
+
+ if (watch_notify_info.count(m->cookie)) {
+ ldout(cct,10) << __func__ << " queueing async " << *m << dendl;
+ // deliver this async via a finisher thread
+ finisher.queue(new C_DoWatchNotify(this, m));
+ } else {
+ // drop it on the floor
+ ldout(cct,10) << __func__ << " cookie " << m->cookie << " unknown" << dendl;
+ m->put();
+ }
+}
+
+void librados::RadosClient::do_watch_notify(MWatchNotify *m)
+{
+ Mutex::Locker l(lock);
+ map<uint64_t, WatchNotifyInfo *>::iterator iter =
+ watch_notify_info.find(m->cookie);
+ if (iter != watch_notify_info.end()) {
+ WatchNotifyInfo *wc = iter->second;
assert(wc);
- wc->get();
- finisher.queue(new C_WatchNotify(wc, &lock, m->opcode, m->ver, m->notify_id, m->bl));
+ if (wc->notify_lock) {
+ // we sent a notify and it completed (or failed)
+ ldout(cct,10) << __func__ << " completed notify " << *m << dendl;
+ wc->notify_lock->Lock();
+ *wc->notify_done = true;
+ *wc->notify_rval = m->return_code;
+ wc->notify_cond->Signal();
+ wc->notify_lock->Unlock();
+ } else {
+ // we are watcher and got a notify
+ ldout(cct,10) << __func__ << " got notify " << *m << dendl;
+ wc->get();
+
+ // trigger the callback
+ lock.Unlock();
+ wc->watch_ctx->notify(m->opcode, m->ver, m->bl);
+ lock.Lock();
+
+ // send ACK back to the OSD
+ wc->io_ctx_impl->_notify_ack(wc->oid, m->notify_id, m->ver, m->cookie);
+
+ ldout(cct,10) << __func__ << " notify done" << dendl;
+ wc->put();
+ }
}
m->put();
}
+
int librados::RadosClient::mon_command(const vector<string>& cmd,
const bufferlist &inbl,
bufferlist *outbl, string *outs)
int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c);
// watch/notify
- uint64_t max_watch_cookie;
- map<uint64_t, librados::WatchContext *> watchers;
+ uint64_t max_watch_notify_cookie;
+ map<uint64_t, librados::WatchNotifyInfo *> watch_notify_info;
- void register_watch_notify_callback(librados::WatchContext *wc,
+ void register_watch_notify_callback(librados::WatchNotifyInfo *wc,
uint64_t *cookie);
void unregister_watch_notify_callback(uint64_t cookie);
void handle_watch_notify(MWatchNotify *m);
+ void do_watch_notify(MWatchNotify *m);
+
int mon_command(const vector<string>& cmd, const bufferlist &inbl,
bufferlist *outbl, string *outs);
int mon_command(int rank,
static void watch_notify_test_cb(uint8_t opcode, uint64_t ver, void *arg)
{
+ std::cout << __func__ << std::endl;
sem_post(&sem);
}
public:
void notify(uint8_t opcode, uint64_t ver, bufferlist& bl)
{
+ std::cout << __func__ << std::endl;
sem_post(&sem);
}
};