io_ctx_impl->put();
}
-void librados::WatchContext::notify(uint8_t opcode,
+void librados::WatchContext::notify(Mutex *client_lock,
+ uint8_t opcode,
uint64_t ver,
uint64_t notify_id,
bufferlist& payload)
{
ctx->notify(opcode, ver, payload);
if (opcode != WATCH_NOTIFY_COMPLETE) {
+ client_lock->Lock();
io_ctx_impl->_notify_ack(oid, notify_id, ver);
+ client_lock->Unlock();
}
}
};
namespace librados {
-struct WatchContext {
+struct WatchContext : public RefCountedWaitObject {
IoCtxImpl *io_ctx_impl;
const object_t oid;
uint64_t cookie;
const object_t& _oc,
librados::WatchCtx *_ctx);
~WatchContext();
- void notify(uint8_t opcode, uint64_t ver, uint64_t notify_id,
+ void notify(Mutex *lock, uint8_t opcode, uint64_t ver, uint64_t notify_id,
bufferlist& payload);
};
}
WatchContext *ctx = iter->second;
if (ctx->linger_id)
objecter->unregister_linger(ctx->linger_id);
- delete ctx;
+
watchers.erase(iter);
+ lock.Unlock();
+ ldout(cct, 10) << "unregister_watcher, dropping reference, waiting ctx=" << (void *)ctx << dendl;
+ ctx->put_wait();
+ ldout(cct, 10) << "unregister_watcher, done ctx=" << (void *)ctx << dendl;
+ lock.Lock();
}
}
+
+class C_WatchNotify : public Context {
+ librados::WatchContext *ctx;
+ Mutex *client_lock;
+ uint8_t opcode;
+ uint64_t ver;
+ uint64_t notify_id;
+ bufferlist bl;
+
+public:
+ C_WatchNotify(librados::WatchContext *_ctx, Mutex *_client_lock,
+ uint8_t _o, uint64_t _v, uint64_t _n, bufferlist& _bl) :
+ ctx(_ctx), client_lock(_client_lock), opcode(_o), ver(_v), notify_id(_n), bl(_bl) {}
+
+ void finish(int r) {
+ ctx->notify(client_lock, opcode, ver, notify_id, bl);
+ ctx->put();
+ }
+};
+
void librados::RadosClient::watch_notify(MWatchNotify *m)
{
assert(lock.is_locked());
if (!wc)
return;
- wc->notify(m->opcode, m->ver, m->notify_id, m->bl);
-
+ wc->get();
+ finisher.queue(new C_WatchNotify(wc, &lock, m->opcode, m->ver, m->notify_id, m->bl));
m->put();
}
}
};
+struct RefCountedCond : public RefCountedObject {
+ Mutex lock;
+ Cond cond;
+ atomic_t complete;
+
+ RefCountedCond() : lock("RefCountedCond") {}
+
+ void wait() {
+ Mutex::Locker l(lock);
+ while (!complete.read())
+ cond.Wait(lock);
+ }
+
+ void done() {
+ Mutex::Locker l(lock);
+ cond.SignalAll();
+ complete.set(1);
+ }
+};
+
+struct RefCountedWaitObject {
+ atomic_t nref;
+ RefCountedCond *c;
+
+ RefCountedWaitObject() : nref(1) {
+ c = new RefCountedCond;
+ }
+ virtual ~RefCountedWaitObject() {
+ c->put();
+ }
+
+ RefCountedWaitObject *get() {
+ nref.inc();
+ return this;
+ }
+
+ bool put() {
+ bool ret = false;
+ RefCountedCond *cond = c;
+ cond->get();
+ if (nref.dec() == 0) {
+ cond->done();
+ delete this;
+ ret = true;
+ }
+ cond->put();
+ return ret;
+ }
+
+ void put_wait() {
+ RefCountedCond *cond = c;
+
+ cond->get();
+ if (nref.dec() == 0) {
+ cond->done();
+ delete this;
+ } else {
+ cond->wait();
+ }
+ cond->put();
+ }
+};
+
struct Connection : public RefCountedObject {
Mutex lock;
RefCountedObject *priv;