From 70f70d803ab72ccad22c42e117a65062a43a23e0 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 24 Apr 2012 15:51:14 -0700 Subject: [PATCH] librados: call notification under different thread context This fixes #2342. We shouldn't call notify on the dispatcher context. We should also make sure that we don't hold the client lock while waiting for the responses. Also, pushed the client_lock locking into the ctx->notify(). Signed-off-by: Yehuda Sadeh --- src/librados/IoCtxImpl.cc | 5 ++- src/librados/IoCtxImpl.h | 4 +-- src/librados/RadosClient.cc | 31 ++++++++++++++++-- src/msg/Message.h | 63 +++++++++++++++++++++++++++++++++++++ 4 files changed, 97 insertions(+), 6 deletions(-) diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index 83faef7d322f1..d90d204adfd51 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -1594,13 +1594,16 @@ librados::WatchContext::~WatchContext() io_ctx_impl->put(); } -void librados::WatchContext::notify(uint8_t opcode, +void librados::WatchContext::notify(Mutex *client_lock, + uint8_t opcode, uint64_t ver, uint64_t notify_id, bufferlist& payload) { ctx->notify(opcode, ver, payload); if (opcode != WATCH_NOTIFY_COMPLETE) { + client_lock->Lock(); io_ctx_impl->_notify_ack(oid, notify_id, ver); + client_lock->Unlock(); } } diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index d0e99d2a71be4..f2372ff745bd4 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -200,7 +200,7 @@ struct librados::IoCtxImpl { }; namespace librados { -struct WatchContext { +struct WatchContext : public RefCountedWaitObject { IoCtxImpl *io_ctx_impl; const object_t oid; uint64_t cookie; @@ -212,7 +212,7 @@ struct WatchContext { const object_t& _oc, librados::WatchCtx *_ctx); ~WatchContext(); - void notify(uint8_t opcode, uint64_t ver, uint64_t notify_id, + void notify(Mutex *lock, uint8_t opcode, uint64_t ver, uint64_t notify_id, bufferlist& payload); }; } diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index 755a9b7e8245c..8016199dc01c2 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -447,11 +447,36 @@ void librados::RadosClient::unregister_watcher(uint64_t cookie) WatchContext *ctx = iter->second; if (ctx->linger_id) objecter->unregister_linger(ctx->linger_id); - delete ctx; + watchers.erase(iter); + lock.Unlock(); + ldout(cct, 10) << "unregister_watcher, dropping reference, waiting ctx=" << (void *)ctx << dendl; + ctx->put_wait(); + ldout(cct, 10) << "unregister_watcher, done ctx=" << (void *)ctx << dendl; + lock.Lock(); } } + +class C_WatchNotify : public Context { + librados::WatchContext *ctx; + Mutex *client_lock; + uint8_t opcode; + uint64_t ver; + uint64_t notify_id; + bufferlist bl; + +public: + C_WatchNotify(librados::WatchContext *_ctx, Mutex *_client_lock, + uint8_t _o, uint64_t _v, uint64_t _n, bufferlist& _bl) : + ctx(_ctx), client_lock(_client_lock), opcode(_o), ver(_v), notify_id(_n), bl(_bl) {} + + void finish(int r) { + ctx->notify(client_lock, opcode, ver, notify_id, bl); + ctx->put(); + } +}; + void librados::RadosClient::watch_notify(MWatchNotify *m) { assert(lock.is_locked()); @@ -463,7 +488,7 @@ void librados::RadosClient::watch_notify(MWatchNotify *m) if (!wc) return; - wc->notify(m->opcode, m->ver, m->notify_id, m->bl); - + wc->get(); + finisher.queue(new C_WatchNotify(wc, &lock, m->opcode, m->ver, m->notify_id, m->bl)); m->put(); } diff --git a/src/msg/Message.h b/src/msg/Message.h index 476b93f407926..865ea268b917b 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -168,6 +168,69 @@ struct RefCountedObject { } }; +struct RefCountedCond : public RefCountedObject { + Mutex lock; + Cond cond; + atomic_t complete; + + RefCountedCond() : lock("RefCountedCond") {} + + void wait() { + Mutex::Locker l(lock); + while (!complete.read()) + cond.Wait(lock); + } + + void done() { + Mutex::Locker l(lock); + cond.SignalAll(); + complete.set(1); + } +}; + +struct RefCountedWaitObject { + atomic_t nref; + RefCountedCond *c; + + RefCountedWaitObject() : nref(1) { + c = new RefCountedCond; + } + virtual ~RefCountedWaitObject() { + c->put(); + } + + RefCountedWaitObject *get() { + nref.inc(); + return this; + } + + bool put() { + bool ret = false; + RefCountedCond *cond = c; + cond->get(); + if (nref.dec() == 0) { + cond->done(); + delete this; + ret = true; + } + cond->put(); + return ret; + } + + void put_wait() { + RefCountedCond *cond = c; + + cond->get(); + if (nref.dec() == 0) { + cond->done(); + delete this; + } else { + cond->wait(); + } + cond->put(); + } +}; + struct Connection : public RefCountedObject { Mutex lock; RefCountedObject *priv; -- 2.39.5