From: Sage Weil Date: Wed, 13 Aug 2014 00:40:36 +0000 (-0700) Subject: librados: refactor watch/notify; return notify error code X-Git-Tag: v0.86~213^2~18 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=69612e7519f9fa73d6ac1c6358bfeac33e998774;p=ceph.git librados: refactor watch/notify; return notify error code Get rid of a level of intermediate classes with confusing names and put the notify and notify finish logic in a single place so that it is easier to follow and understand. Pass the return value from the notify completion message to the caller. Signed-off-by: Sage Weil --- diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index d766c40dac2..d02a7de7c24 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -1013,7 +1013,8 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver, lock->Lock(); - WatchContext *wc = new WatchContext(this, oid, ctx); + WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid); + wc->watch_ctx = ctx; client->register_watch_notify_callback(wc, cookie); prepare_assert_ops(&wr); wr.watch(*cookie, ver, 1); @@ -1033,7 +1034,7 @@ int librados::IoCtxImpl::watch(const object_t& oid, uint64_t ver, if (r < 0) { lock->Lock(); - client->unregister_watch_notify_callback(*cookie); + client->unregister_watch_notify_callback(*cookie); // destroys wc lock->Unlock(); } @@ -1051,7 +1052,6 @@ int librados::IoCtxImpl::_notify_ack( prepare_assert_ops(&rd); rd.notify_ack(notify_id, ver, cookie); objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0); - return 0; } @@ -1097,13 +1097,16 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b Context *onack = new C_SafeCond(&mylock, &cond, &done, &r); version_t objver; uint64_t cookie; - C_NotifyComplete *ctx = new C_NotifyComplete(&mylock_all, &cond_all, &done_all); ::ObjectOperation rd; prepare_assert_ops(&rd); lock->Lock(); - WatchContext *wc = new WatchContext(this, oid, ctx); + WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid); + wc->notify_done = &done_all; + wc->notify_lock = &mylock_all; + wc->notify_cond = &cond_all; + wc->notify_rval = &r; client->register_watch_notify_callback(wc, &cookie); uint32_t prot_ver = 1; uint32_t timeout = notify_timeout; @@ -1120,19 +1123,18 @@ int librados::IoCtxImpl::notify(const object_t& oid, uint64_t ver, bufferlist& b cond.Wait(mylock); mylock.Unlock(); - mylock_all.Lock(); if (r == 0) { + mylock_all.Lock(); while (!done_all) cond_all.Wait(mylock_all); + mylock_all.Unlock(); } - mylock_all.Unlock(); lock->Lock(); - client->unregister_watch_notify_callback(cookie); + client->unregister_watch_notify_callback(cookie); // destroys wc lock->Unlock(); set_sync_op_version(objver); - delete ctx; return r; } @@ -1250,51 +1252,3 @@ void librados::IoCtxImpl::C_aio_Safe::finish(int r) c->put_unlock(); } -///////////////////////// C_NotifyComplete ///////////////////////////// - -librados::IoCtxImpl::C_NotifyComplete::C_NotifyComplete(Mutex *_l, - Cond *_c, - bool *_d) - : lock(_l), cond(_c), done(_d) -{ - *done = false; -} - -void librados::IoCtxImpl::C_NotifyComplete::notify(uint8_t opcode, - uint64_t ver, - bufferlist& bl) -{ - lock->Lock(); - *done = true; - cond->Signal(); - lock->Unlock(); -} - -/////////////////////////// WatchContext /////////////////////////////// - -librados::WatchContext::WatchContext(IoCtxImpl *io_ctx_impl_, - const object_t& _oc, - librados::WatchCtx *_ctx) - : io_ctx_impl(io_ctx_impl_), oid(_oc), ctx(_ctx), linger_id(0), cookie(0) -{ - io_ctx_impl->get(); -} - -librados::WatchContext::~WatchContext() -{ - io_ctx_impl->put(); -} - -void librados::WatchContext::notify(Mutex *client_lock, - uint8_t opcode, - uint64_t ver, - uint64_t notify_id, - bufferlist& payload) -{ - ctx->notify(opcode, ver, payload); - if (opcode != WATCH_NOTIFY_COMPLETE) { - client_lock->Lock(); - io_ctx_impl->_notify_ack(oid, notify_id, ver, cookie); - client_lock->Unlock(); - } -} diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index 21e64441a4d..4d79e7b16f9 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -212,30 +212,51 @@ struct librados::IoCtxImpl { void set_assert_src_version(const object_t& oid, uint64_t ver); void set_notify_timeout(uint32_t timeout); - struct C_NotifyComplete : public librados::WatchCtx { - Mutex *lock; - Cond *cond; - bool *done; - - C_NotifyComplete(Mutex *_l, Cond *_c, bool *_d); - void notify(uint8_t opcode, uint64_t ver, bufferlist& bl); - }; }; namespace librados { -struct WatchContext : public RefCountedWaitObject { - IoCtxImpl *io_ctx_impl; - const object_t oid; - librados::WatchCtx *ctx; - uint64_t linger_id; - uint64_t cookie; - - WatchContext(IoCtxImpl *io_ctx_impl_, - const object_t& _oc, - librados::WatchCtx *_ctx); - ~WatchContext(); + + /** + * watch/notify info + * + * Capture state about a watch or an in-progress notify + */ +struct WatchNotifyInfo : public RefCountedWaitObject { + IoCtxImpl *io_ctx_impl; // parent + const object_t oid; // the object + uint64_t linger_id; // we use this to unlinger when we are done + uint64_t cookie; // callback cookie + + // watcher + librados::WatchCtx *watch_ctx; + + // notify that we initiated + Mutex *notify_lock; + Cond *notify_cond; + bool *notify_done; + int *notify_rval; + + WatchNotifyInfo(IoCtxImpl *io_ctx_impl_, + const object_t& _oc) + : io_ctx_impl(io_ctx_impl_), + oid(_oc), + linger_id(0), + cookie(0), + watch_ctx(NULL), + notify_lock(NULL), + notify_cond(NULL), + notify_done(NULL), + notify_rval(NULL) { + io_ctx_impl->get(); + } + + ~WatchNotifyInfo() { + io_ctx_impl->put(); + } + void notify(Mutex *lock, uint8_t opcode, uint64_t ver, uint64_t notify_id, - bufferlist& payload); + bufferlist& payload, + int return_code); }; } #endif diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 956cc2c1428..faf8ea7a9c4 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -79,7 +79,7 @@ librados::RadosClient::RadosClient(CephContext *cct_) refcnt(1), log_last_version(0), log_cb(NULL), log_cb_arg(NULL), finisher(cct), - max_watch_cookie(0) + max_watch_notify_cookie(0) { } @@ -607,70 +607,107 @@ int librados::RadosClient::pool_delete_async(const char *name, PoolAsyncCompleti return r; } +void librados::RadosClient::blacklist_self(bool set) { + Mutex::Locker l(lock); + objecter->blacklist_self(set); +} + + +// ----------- +// watch/notify + void librados::RadosClient::register_watch_notify_callback( - WatchContext *wc, + WatchNotifyInfo *wc, uint64_t *cookie) { assert(lock.is_locked_by_me()); - wc->cookie = *cookie = ++max_watch_cookie; - watchers[wc->cookie] = wc; + wc->cookie = *cookie = ++max_watch_notify_cookie; + ldout(cct,10) << __func__ << " cookie " << wc->cookie << dendl; + watch_notify_info[wc->cookie] = wc; } void librados::RadosClient::unregister_watch_notify_callback(uint64_t cookie) { + ldout(cct,10) << __func__ << " cookie " << cookie << dendl; assert(lock.is_locked_by_me()); - map::iterator iter = watchers.find(cookie); - if (iter != watchers.end()) { - WatchContext *ctx = iter->second; + map::iterator iter = + watch_notify_info.find(cookie); + if (iter != watch_notify_info.end()) { + WatchNotifyInfo *ctx = iter->second; if (ctx->linger_id) objecter->unregister_linger(ctx->linger_id); - watchers.erase(iter); + watch_notify_info.erase(iter); lock.Unlock(); - ldout(cct, 10) << __func__ << " dropping reference, waiting ctx=" << (void *)ctx << dendl; + ldout(cct, 10) << __func__ << " dropping reference, waiting ctx=" + << (void *)ctx << dendl; ctx->put_wait(); ldout(cct, 10) << __func__ << " done ctx=" << (void *)ctx << dendl; lock.Lock(); } } -void librados::RadosClient::blacklist_self(bool set) { - Mutex::Locker l(lock); - objecter->blacklist_self(set); -} - -class C_WatchNotify : public Context { - librados::WatchContext *ctx; - Mutex *client_lock; - uint8_t opcode; - uint64_t ver; - uint64_t notify_id; - bufferlist bl; - -public: - C_WatchNotify(librados::WatchContext *_ctx, Mutex *_client_lock, - uint8_t _o, uint64_t _v, uint64_t _n, bufferlist& _bl) : - ctx(_ctx), client_lock(_client_lock), opcode(_o), ver(_v), notify_id(_n), bl(_bl) {} - +struct C_DoWatchNotify : public Context { + librados::RadosClient *rados; + MWatchNotify *m; + C_DoWatchNotify(librados::RadosClient *r, MWatchNotify *m) : rados(r), m(m) {} void finish(int r) { - ctx->notify(client_lock, opcode, ver, notify_id, bl); - ctx->put(); + rados->do_watch_notify(m); } }; void librados::RadosClient::handle_watch_notify(MWatchNotify *m) { Mutex::Locker l(lock); - map::iterator iter = watchers.find(m->cookie); - if (iter != watchers.end()) { - WatchContext *wc = iter->second; + + if (watch_notify_info.count(m->cookie)) { + ldout(cct,10) << __func__ << " queueing async " << *m << dendl; + // deliver this async via a finisher thread + finisher.queue(new C_DoWatchNotify(this, m)); + } else { + // drop it on the floor + ldout(cct,10) << __func__ << " cookie " << m->cookie << " unknown" << dendl; + m->put(); + } +} + +void librados::RadosClient::do_watch_notify(MWatchNotify *m) +{ + Mutex::Locker l(lock); + map::iterator iter = + watch_notify_info.find(m->cookie); + if (iter != watch_notify_info.end()) { + WatchNotifyInfo *wc = iter->second; assert(wc); - wc->get(); - finisher.queue(new C_WatchNotify(wc, &lock, m->opcode, m->ver, m->notify_id, m->bl)); + if (wc->notify_lock) { + // we sent a notify and it completed (or failed) + ldout(cct,10) << __func__ << " completed notify " << *m << dendl; + wc->notify_lock->Lock(); + *wc->notify_done = true; + *wc->notify_rval = m->return_code; + wc->notify_cond->Signal(); + wc->notify_lock->Unlock(); + } else { + // we are watcher and got a notify + ldout(cct,10) << __func__ << " got notify " << *m << dendl; + wc->get(); + + // trigger the callback + lock.Unlock(); + wc->watch_ctx->notify(m->opcode, m->ver, m->bl); + lock.Lock(); + + // send ACK back to the OSD + wc->io_ctx_impl->_notify_ack(wc->oid, m->notify_id, m->ver, m->cookie); + + ldout(cct,10) << __func__ << " notify done" << dendl; + wc->put(); + } } m->put(); } + int librados::RadosClient::mon_command(const vector& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs) diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h index b2e44cadaa1..9a394b3d5b7 100755 --- a/src/librados/RadosClient.h +++ b/src/librados/RadosClient.h @@ -107,13 +107,15 @@ public: int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c); // watch/notify - uint64_t max_watch_cookie; - map watchers; + uint64_t max_watch_notify_cookie; + map watch_notify_info; - void register_watch_notify_callback(librados::WatchContext *wc, + void register_watch_notify_callback(librados::WatchNotifyInfo *wc, uint64_t *cookie); void unregister_watch_notify_callback(uint64_t cookie); void handle_watch_notify(MWatchNotify *m); + void do_watch_notify(MWatchNotify *m); + int mon_command(const vector& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs); int mon_command(int rank, diff --git a/src/test/librados/watch_notify.cc b/src/test/librados/watch_notify.cc index d92c1b8ebe6..41ca711e6d2 100644 --- a/src/test/librados/watch_notify.cc +++ b/src/test/librados/watch_notify.cc @@ -19,6 +19,7 @@ static sem_t sem; static void watch_notify_test_cb(uint8_t opcode, uint64_t ver, void *arg) { + std::cout << __func__ << std::endl; sem_post(&sem); } @@ -27,6 +28,7 @@ class WatchNotifyTestCtx : public WatchCtx public: void notify(uint8_t opcode, uint64_t ver, bufferlist& bl) { + std::cout << __func__ << std::endl; sem_post(&sem); } };