From: Sage Weil Date: Sun, 16 Nov 2014 21:44:08 +0000 (-0800) Subject: librados: move watch/notify into the Objecter X-Git-Tag: v0.91~95 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e72035f3f5bc2eb2b6d16a5cad4b691ec638fa5e;p=ceph.git librados: move watch/notify into the Objecter Several things here: - we move all of the junk from librados' RadosClient into Objecter - we use fast-dispatch to schedule the watch-notify events (this will keep them ordered wrt ping) - we use the LingerOp * as the librados-exposed handle so that we can avoid any lookups for watch-check. Signed-off-by: Sage Weil --- diff --git a/src/librados/IoCtxImpl.cc b/src/librados/IoCtxImpl.cc index 08d71d049cd7..9ebd545c6532 100644 --- a/src/librados/IoCtxImpl.cc +++ b/src/librados/IoCtxImpl.cc @@ -1032,46 +1032,89 @@ void librados::IoCtxImpl::set_sync_op_version(version_t ver) last_objver = ver; } +struct WatchInfo : public Objecter::WatchContext { + librados::IoCtxImpl *ioctx; + uint64_t user_cookie; + object_t oid; + librados::WatchCtx *ctx; + librados::WatchCtx2 *ctx2; + + WatchInfo(librados::IoCtxImpl *io, uint64_t uc, object_t o, + librados::WatchCtx *c, librados::WatchCtx2 *c2) + : ioctx(io), user_cookie(uc), oid(o), ctx(c), ctx2(c2) {} + + void handle_notify(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) { + ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id + << " cookie " << user_cookie + << " notifier_id " << notifier_id + << " len " << bl.length() + << dendl; + + if (ctx2) + ctx2->handle_notify(notify_id, user_cookie, notifier_id, bl); + if (ctx) { + ctx->notify(0, 0, bl); + + // send ACK back to OSD if using legacy protocol + bufferlist empty; + ioctx->notify_ack(oid, notify_id, user_cookie, empty); + } + } + void handle_failed_notify(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id) { + ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id + << " cookie " << user_cookie + << " notifier_id " << notifier_id + << dendl; + if (ctx2) + ctx2->handle_failed_notify(notify_id, user_cookie, notifier_id); + } + void handle_error(uint64_t cookie, int err) { + ldout(ioctx->client->cct, 10) << __func__ << " cookie " << user_cookie + << " err " << err + << dendl; + if (ctx2) + ctx2->handle_error(user_cookie, err); + } +}; + int librados::IoCtxImpl::watch(const object_t& oid, - uint64_t *cookie, + uint64_t *handle, librados::WatchCtx *ctx, librados::WatchCtx2 *ctx2) { ::ObjectOperation wr; - Mutex mylock("IoCtxImpl::watch::mylock"); - Cond cond; - bool done; - int r; - Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r); version_t objver; + C_SaferCond onfinish; lock->Lock(); - WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid); - wc->watch_ctx = ctx; - wc->watch_ctx2 = ctx2; - wc->linger_op = objecter->linger_register(oid, oloc, 0, cookie); - client->register_watch_notify_callback(wc, *cookie); + Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); + *handle = reinterpret_cast(linger_op); + linger_op->watch_context = new WatchInfo(this, + reinterpret_cast(linger_op), + oid, ctx, ctx2); + prepare_assert_ops(&wr); - wr.watch(*cookie, CEPH_OSD_WATCH_OP_WATCH); + wr.watch(linger_op->linger_id, CEPH_OSD_WATCH_OP_WATCH); bufferlist bl; - objecter->linger_watch(wc->linger_op, wr, + objecter->linger_watch(linger_op, wr, snapc, ceph_clock_now(NULL), bl, - *cookie, - NULL, onfinish, &wc->on_error, + NULL, &onfinish, &objver); lock->Unlock(); - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); + int r = onfinish.wait(); set_sync_op_version(objver); if (r < 0) { lock->Lock(); - client->unregister_watch_notify_callback(*cookie, NULL); // destroys wc + objecter->linger_cancel(linger_op); lock->Unlock(); } @@ -1087,8 +1130,9 @@ int librados::IoCtxImpl::notify_ack( { Mutex::Locker l(*lock); ::ObjectOperation rd; + Objecter::LingerOp *linger_op = reinterpret_cast(cookie); prepare_assert_ops(&rd); - rd.notify_ack(notify_id, cookie, bl); + rd.notify_ack(notify_id, linger_op->linger_id, bl); objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0); return 0; } @@ -1096,41 +1140,28 @@ int librados::IoCtxImpl::notify_ack( int librados::IoCtxImpl::watch_check(uint64_t cookie) { Mutex::Locker l(*lock); - return client->watch_check(cookie); + Objecter::LingerOp *linger_op = reinterpret_cast(cookie); + return objecter->linger_check(linger_op); } int librados::IoCtxImpl::unwatch(uint64_t cookie) { + Objecter::LingerOp *linger_op = reinterpret_cast(cookie); bufferlist inbl, outbl; + C_SaferCond onfinish; + version_t ver = 0; - Mutex mylock("IoCtxImpl::unwatch::mylock"); - Cond cond; - bool done; - int r; - Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r); - version_t ver; lock->Lock(); - - object_t oid; - r = client->unregister_watch_notify_callback(cookie, &oid); - if (r < 0) { - lock->Unlock(); - return r; - } - ::ObjectOperation wr; prepare_assert_ops(&wr); - wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH); - objecter->mutate(oid, oloc, wr, snapc, ceph_clock_now(client->cct), 0, NULL, oncommit, &ver); + wr.watch(linger_op->linger_id, CEPH_OSD_WATCH_OP_UNWATCH); + objecter->mutate(linger_op->target.base_oid, oloc, wr, + snapc, ceph_clock_now(client->cct), 0, NULL, &onfinish, &ver); + objecter->linger_cancel(linger_op); lock->Unlock(); - mylock.Lock(); - while (!done) - cond.Wait(mylock); - mylock.Unlock(); - + int r = onfinish.wait(); set_sync_op_version(ver); - return r; } @@ -1139,28 +1170,42 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, bufferlist *preply_bl, char **preply_buf, size_t *preply_buf_len) { - bufferlist inbl, outbl; - - // Construct WatchNotifyInfo - Cond cond_all; - Mutex mylock_all("IoCtxImpl::notify::mylock_all"); - bool done_all = false; - int r_notify = 0; - WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid); - wc->notify_done = &done_all; - wc->notify_lock = &mylock_all; - wc->notify_cond = &cond_all; - wc->notify_rval = &r_notify; - wc->notify_reply_bl = preply_bl; - wc->notify_reply_buf = preply_buf; - wc->notify_reply_buf_len = preply_buf_len; + bufferlist inbl; + + struct C_NotifyFinish : public Context { + Cond cond; + Mutex lock; + bool done; + int result; + bufferlist reply_bl; + + C_NotifyFinish() + : lock("IoCtxImpl::notify::C_NotifyFinish::lock"), + done(false), + result(0) { } + + void finish(int r) {} + void complete(int r) { + lock.Lock(); + done = true; + result = r; + cond.Signal(); + lock.Unlock(); + } + void wait() { + lock.Lock(); + while (!done) + cond.Wait(lock); + lock.Unlock(); + } + } notify_private; lock->Lock(); - // Acquire cookie - uint64_t cookie; - wc->linger_op = objecter->linger_register(oid, oloc, 0, &cookie); - client->register_watch_notify_callback(wc, cookie); + Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0); + linger_op->on_notify_finish = ¬ify_private; + linger_op->notify_result_bl = ¬ify_private.reply_bl; + uint32_t prot_ver = 1; uint32_t timeout = notify_timeout; if (timeout_ms) @@ -1172,36 +1217,49 @@ int librados::IoCtxImpl::notify(const object_t& oid, bufferlist& bl, // Construct RADOS op ::ObjectOperation rd; prepare_assert_ops(&rd); - rd.notify(cookie, inbl); + rd.notify(linger_op->linger_id, inbl); // Issue RADOS op C_SaferCond onack; version_t objver; - objecter->linger_notify(wc->linger_op, + objecter->linger_notify(linger_op, rd, snap_seq, inbl, NULL, &onack, &objver); lock->Unlock(); - ldout(client->cct, 10) << __func__ << " issued linger op " << wc->linger_op << dendl; + ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl; int r_issue = onack.wait(); - ldout(client->cct, 10) << __func__ << " linger op " << wc->linger_op << " acked (" << r_issue << ")" << dendl; + ldout(client->cct, 10) << __func__ << " linger op " << linger_op + << " acked (" << r_issue << ")" << dendl; if (r_issue == 0) { - ldout(client->cct, 10) << __func__ << "waiting for watch_notify message for linger op " << wc->linger_op << dendl; - mylock_all.Lock(); - while (!done_all) - cond_all.Wait(mylock_all); - mylock_all.Unlock(); + ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish " + << linger_op << dendl; + notify_private.wait(); + + // pass result back to user + if (notify_private.result >= 0) { + if (preply_buf) { + *preply_buf = (char*)malloc(notify_private.reply_bl.length()); + memcpy(*preply_buf, notify_private.reply_bl.c_str(), + notify_private.reply_bl.length()); + } + if (preply_buf_len) + *preply_buf_len = notify_private.reply_bl.length(); + if (preply_bl) + preply_bl->claim(notify_private.reply_bl); + } } - ldout(client->cct, 10) << __func__ << " completed notify (linger op " << wc->linger_op << "), unregistering" << dendl; + ldout(client->cct, 10) << __func__ << " completed notify (linger op " + << linger_op << "), unregistering" << dendl; lock->Lock(); - client->unregister_watch_notify_callback(cookie, NULL); // destroys wc + objecter->linger_cancel(linger_op); lock->Unlock(); set_sync_op_version(objver); - return r_issue == 0 ? r_notify : r_issue; + return r_issue ? r_issue : notify_private.result; } int librados::IoCtxImpl::set_alloc_hint(const object_t& oid, diff --git a/src/librados/IoCtxImpl.h b/src/librados/IoCtxImpl.h index 3ac48d217e25..a5c0874e2af4 100644 --- a/src/librados/IoCtxImpl.h +++ b/src/librados/IoCtxImpl.h @@ -217,62 +217,4 @@ struct librados::IoCtxImpl { }; -namespace librados { - -/** - * watch/notify info - * - * Capture state about a watch or an in-progress notify - */ -struct WatchNotifyInfo : public RefCountedWaitObject { - IoCtxImpl *io_ctx_impl; // parent - const object_t oid; // the object - class Objecter::LingerOp *linger_op; // we use this to unlinger when we are done - uint64_t cookie; // callback cookie - int err; - - // watcher. only one of these will be defined. - librados::WatchCtx *watch_ctx; - librados::WatchCtx2 *watch_ctx2; - - // notify that we initiated - Mutex *notify_lock; - Cond *notify_cond; - bool *notify_done; - bufferlist *notify_reply_bl; - char **notify_reply_buf; - size_t *notify_reply_buf_len; - int *notify_rval; - - struct OnError : public Context { - WatchNotifyInfo *info; - void finish(int r) { assert(0); } - void complete(int r); - } on_error; - - WatchNotifyInfo(IoCtxImpl *io_ctx_impl_, - const object_t& _oc) - : io_ctx_impl(io_ctx_impl_), - oid(_oc), - linger_op(NULL), - cookie(0), - err(0), - watch_ctx(NULL), - watch_ctx2(NULL), - notify_lock(NULL), - notify_cond(NULL), - notify_done(NULL), - notify_reply_bl(NULL), - notify_reply_buf(NULL), - notify_reply_buf_len(NULL), - notify_rval(NULL) { - io_ctx_impl->get(); - on_error.info = this; - } - - ~WatchNotifyInfo() { - io_ctx_impl->put(); - } -}; -} #endif diff --git a/src/librados/RadosClient.cc b/src/librados/RadosClient.cc index a6b292e516ae..ae06368b1fad 100644 --- a/src/librados/RadosClient.cc +++ b/src/librados/RadosClient.cc @@ -28,7 +28,6 @@ #include "include/buffer.h" #include "include/stringify.h" -#include "messages/MWatchNotify.h" #include "messages/MLog.h" #include "msg/Messenger.h" @@ -386,10 +385,6 @@ bool librados::RadosClient::_dispatch(Message *m) case CEPH_MSG_MDS_MAP: break; - case CEPH_MSG_WATCH_NOTIFY: - handle_watch_notify(static_cast(m)); - break; - case MSG_LOG: handle_log(static_cast(m)); break; @@ -639,219 +634,6 @@ void librados::RadosClient::blacklist_self(bool set) { objecter->blacklist_self(set); } - -// ----------- -// watch/notify - -void librados::RadosClient::register_watch_notify_callback( - WatchNotifyInfo *wc, - uint64_t cookie) -{ - assert(lock.is_locked_by_me()); - wc->cookie = cookie; - ldout(cct,10) << __func__ << " cookie " << wc->cookie << dendl; - watch_notify_info[wc->cookie] = wc; -} - -int librados::RadosClient::unregister_watch_notify_callback(uint64_t cookie, - object_t *poid) -{ - ldout(cct,10) << __func__ << " cookie " << cookie << dendl; - assert(lock.is_locked_by_me()); - map::iterator iter = watch_notify_info.find(cookie); - if (iter == watch_notify_info.end()) - return -EBADF; - - WatchNotifyInfo *ctx = iter->second; - if (poid) - *poid = ctx->oid; - if (ctx->linger_op) { - objecter->linger_cancel(ctx->linger_op); - ctx->linger_op = NULL; - } - - watch_notify_info.erase(iter); - lock.Unlock(); - ldout(cct, 10) << __func__ << " dropping reference, waiting ctx=" - << (void *)ctx << dendl; - ctx->put_wait(); - ldout(cct, 10) << __func__ << " done ctx=" << (void *)ctx << dendl; - lock.Lock(); - return 0; -} - -int librados::RadosClient::watch_check(uint64_t cookie) -{ - ldout(cct,10) << __func__ << " cookie " << cookie << dendl; - map::iterator iter = watch_notify_info.find(cookie); - if (iter == watch_notify_info.end()) - return -EBADF; - WatchNotifyInfo *ctx = iter->second; - if (ctx->err) - return ctx->err; - return objecter->linger_check(ctx->linger_op); -} - -struct C_DoWatchNotify : public Context { - librados::RadosClient *rados; - MWatchNotify *m; - C_DoWatchNotify(librados::RadosClient *r, MWatchNotify *m) : rados(r), m(m) {} - void finish(int r) { - rados->do_watch_notify(m); - } -}; - -struct C_DoWatchError : public Context { - librados::RadosClient *rados; - uint64_t cookie; - int err; - C_DoWatchError(librados::RadosClient *r, uint64_t cookie, int err) - : rados(r), cookie(cookie), err(err) {} - void finish(int r) { - rados->do_watch_error(cookie, err); - } -}; - -void librados::WatchNotifyInfo::OnError::complete(int r) -{ - RadosClient *client = info->io_ctx_impl->client; - client->finisher.queue(new C_DoWatchError(client, info->cookie, r)); -} - -void librados::RadosClient::do_watch_error(uint64_t cookie, int err) -{ - Mutex::Locker l(lock); - map::iterator iter = - watch_notify_info.find(cookie); - if (iter != watch_notify_info.end()) { - WatchNotifyInfo *wc = iter->second; - assert(wc); - wc->err = err; - if (wc->watch_ctx2) { - wc->get(); - ldout(cct,10) << __func__ << " cookie " << cookie - << " handle_error " << err << dendl; - lock.Unlock(); - wc->watch_ctx2->handle_error(cookie, err); - lock.Lock(); - ldout(cct,10) << __func__ << " cookie " << cookie - << " handle_error " << err << " done" << dendl; - wc->put(); - } - } else { - ldout(cct,10) << __func__ << " cookie " << cookie << " not found" << dendl; - } -} - -void librados::RadosClient::handle_watch_notify(MWatchNotify *m) -{ - Mutex::Locker l(lock); - - if (watch_notify_info.count(m->cookie)) { - ldout(cct,10) << __func__ << " queueing async " << *m << dendl; - // deliver this async via a finisher thread - finisher.queue(new C_DoWatchNotify(this, m)); - } else { - // drop it on the floor - ldout(cct,10) << __func__ << " cookie " << m->cookie << " unknown" << dendl; - m->put(); - } -} - -void librados::RadosClient::do_watch_notify(MWatchNotify *m) -{ - Mutex::Locker l(lock); - map::iterator iter = - watch_notify_info.find(m->cookie); - if (iter != watch_notify_info.end()) { - WatchNotifyInfo *wc = iter->second; - assert(wc); - if (wc->notify_lock) { - // we sent a notify and it completed (or failed) - // NOTE: opcode may be either NOTIFY (older OSDs) or NOTIFY_COMPLETE - // (newer OSDs). In practice it doesn't matter because completion is the - // only kind of event we get on notify cookies. - ldout(cct,10) << __func__ << " completed notify " << *m << dendl; - wc->notify_lock->Lock(); - *wc->notify_done = true; - *wc->notify_rval = m->return_code; - if (wc->notify_reply_bl) { - wc->notify_reply_bl->claim(m->get_data()); - } - if (wc->notify_reply_buf) { - *wc->notify_reply_buf = (char*)malloc(m->get_data().length()); - memcpy(*wc->notify_reply_buf, m->get_data().c_str(), - m->get_data().length()); - } - if (wc->notify_reply_buf_len) { - *wc->notify_reply_buf_len = m->get_data().length(); - } - wc->notify_cond->Signal(); - wc->notify_lock->Unlock(); - } else if (m->opcode == CEPH_WATCH_EVENT_NOTIFY) { - // we are watcher and got a notify - ldout(cct,10) << __func__ << " got notify " << *m << dendl; - wc->get(); - - // trigger the callback - assert(!!wc->watch_ctx ^ !!wc->watch_ctx2); // only one is defined - lock.Unlock(); - if (wc->watch_ctx) { - wc->watch_ctx->notify(CEPH_WATCH_EVENT_NOTIFY, m->ver, m->bl); - // send ACK back to the OSD - bufferlist empty; - wc->io_ctx_impl->notify_ack(wc->oid, m->notify_id, m->cookie, empty); - } else if (wc->watch_ctx2) { - wc->watch_ctx2->handle_notify(m->notify_id, m->cookie, - m->notifier_gid, m->bl); - // user needs to explicitly ack (and may have already!) - } - lock.Lock(); - ldout(cct,10) << __func__ << " notify done" << dendl; - wc->put(); - } else if (m->opcode == CEPH_WATCH_EVENT_FAILED_NOTIFY) { - // we are watcher and failed to ack a notify in time, causing it to time - // out. - ldout(cct,10) << __func__ << " failed notify " << *m << dendl; - wc->get(); - // trigger the callback - assert(!!wc->watch_ctx ^ !!wc->watch_ctx2); // only one is defined - lock.Unlock(); - if (wc->watch_ctx2) { - wc->watch_ctx2->handle_failed_notify(m->notify_id, m->cookie, - m->notifier_gid); - } - lock.Lock(); - ldout(cct,10) << __func__ << " failed notify done" << dendl; - wc->put(); - } else if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) { - // we failed to ping or reconnect and our watch was canceled. - ldout(cct,10) << __func__ << " disconnect " << *m << dendl; - wc->err = -ENOTCONN; - if (wc->watch_ctx2) { - wc->get(); - // trigger the callback - lock.Unlock(); - wc->watch_ctx2->handle_error(m->cookie, -ENOTCONN); - lock.Lock(); - ldout(cct,10) << __func__ << " disconnect done" << dendl; - wc->put(); - } else { - lderr(cct) << __func__ << " watch disconnect on " - << wc->oid << " on old API user, silently ignoring" - << dendl; - } - } else { - lderr(cct) << __func__ << " got unknown event " << m->opcode - << " " << ceph_watch_event_name(m->opcode) << dendl; - } - } else { - ldout(cct, 4) << __func__ << " unknown cookie " << m->cookie << dendl; - } - m->put(); -} - - int librados::RadosClient::mon_command(const vector& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs) diff --git a/src/librados/RadosClient.h b/src/librados/RadosClient.h index 299ffa288ed8..b3aa1e168d80 100644 --- a/src/librados/RadosClient.h +++ b/src/librados/RadosClient.h @@ -30,10 +30,8 @@ class CephContext; struct Connection; struct md_config_t; class Message; -class MWatchNotify; class MLog; class Messenger; -struct WatchNotifyInfo; class librados::RadosClient : public Dispatcher { @@ -108,17 +106,6 @@ public: int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c); - // watch/notify - map watch_notify_info; - - void register_watch_notify_callback(librados::WatchNotifyInfo *wc, - uint64_t cookie); - int unregister_watch_notify_callback(uint64_t cookie, object_t *poid); - int watch_check(uint64_t cookie); - void handle_watch_notify(MWatchNotify *m); - void do_watch_notify(MWatchNotify *m); - void do_watch_error(uint64_t cookie, int err); - int mon_command(const vector& cmd, const bufferlist &inbl, bufferlist *outbl, string *outs); int mon_command(int rank, diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 6833dc51c4ab..a27147f4b3d2 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -40,6 +40,8 @@ #include "messages/MCommand.h" #include "messages/MCommandReply.h" +#include "messages/MWatchNotify.h" + #include #include "common/config.h" @@ -421,7 +423,7 @@ void Objecter::_send_linger(LingerOp *info) onack = new C_Linger_Reconnect(this, info); opv.push_back(OSDOp()); opv.back().op.op = CEPH_OSD_OP_WATCH; - opv.back().op.watch.cookie = info->cookie; + opv.back().op.watch.cookie = info->linger_id; opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT; } else { ldout(cct, 15) << "send_linger " << info->linger_id << " register" << dendl; @@ -484,6 +486,18 @@ void Objecter::_linger_commit(LingerOp *info, int r) info->pobjver = NULL; } +struct C_DoWatchError : public Context { + Objecter::LingerOp *info; + int err; + C_DoWatchError(Objecter::LingerOp *i, int r) : info(i), err(r) { + info->get(); + } + void finish(int r) { + info->watch_context->handle_error(info->linger_id, err); + info->put(); + } +}; + void Objecter::_linger_reconnect(LingerOp *info, int r) { ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r @@ -492,8 +506,8 @@ void Objecter::_linger_reconnect(LingerOp *info, int r) info->watch_lock.Lock(); info->last_error = r; info->watch_cond.Signal(); - if (info->on_error) - info->on_error->complete(r); + if (info->watch_context) + finisher->queue(new C_DoWatchError(info, r)); info->watch_lock.Unlock(); } } @@ -519,7 +533,7 @@ void Objecter::_send_linger_ping(LingerOp *info) vector opv(1); opv[0].op.op = CEPH_OSD_OP_WATCH; - opv[0].op.watch.cookie = info->cookie; + opv[0].op.watch.cookie = info->linger_id; opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING; C_Linger_Ping *onack = new C_Linger_Ping(this, info); Op *o = new Op(info->target.base_oid, info->target.base_oloc, @@ -548,8 +562,8 @@ void Objecter::_linger_ping(LingerOp *info, int r, utime_t sent) info->watch_valid_thru = sent; } else if (r < 0) { info->last_error = r; - if (info->on_error) - info->on_error->complete(r); + if (info->watch_context) + finisher->queue(new C_DoWatchError(info, r)); } info->watch_cond.SignalAll(); info->watch_lock.Unlock(); @@ -596,8 +610,7 @@ void Objecter::_linger_cancel(LingerOp *info) Objecter::LingerOp *Objecter::linger_register(const object_t& oid, const object_locator_t& oloc, - int flags, - uint64_t *cookie) + int flags) { LingerOp *info = new LingerOp; info->target.base_oid = oid; @@ -611,7 +624,6 @@ Objecter::LingerOp *Objecter::linger_register(const object_t& oid, // Acquire linger ID info->linger_id = ++max_linger_id; - *cookie = info->linger_id; ldout(cct, 10) << __func__ << " info " << info << " linger_id " << info->linger_id << dendl; linger_ops[info->linger_id] = info; @@ -623,22 +635,20 @@ Objecter::LingerOp *Objecter::linger_register(const object_t& oid, ceph_tid_t Objecter::linger_watch(LingerOp *info, ObjectOperation& op, const SnapContext& snapc, utime_t mtime, - bufferlist& inbl, uint64_t cookie, + bufferlist& inbl, Context *onack, Context *oncommit, - Context *onerror, version_t *objver) { + info->is_watch = true; info->snapc = snapc; info->mtime = mtime; info->target.flags |= CEPH_OSD_FLAG_WRITE; info->ops = op.ops; - info->cookie = cookie; info->inbl = inbl; info->poutbl = NULL; info->pobjver = objver; info->on_reg_ack = onack; info->on_reg_commit = oncommit; - info->on_error = onerror; RWLock::WLocker wl(rwlock); _linger_submit(info); @@ -691,6 +701,93 @@ void Objecter::_linger_submit(LingerOp *info) _send_linger(info); } +struct C_DoWatchNotify : public Context { + Objecter *objecter; + Objecter::LingerOp *info; + MWatchNotify *msg; + C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m) + : objecter(o), info(i), msg(m) { + info->get(); + msg->get(); + } + void finish(int r) { + objecter->_do_watch_notify(info, msg); + } +}; + +void Objecter::handle_watch_notify(MWatchNotify *m) +{ + RWLock::RLocker l(rwlock); + if (!initialized.read()) { + return; + } + + map::iterator p = linger_ops.find(m->cookie); + if (p == linger_ops.end()) { + ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl; + return; + } + + LingerOp *info = p->second; + finisher->queue(new C_DoWatchNotify(this, info, m)); +} + +void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m) +{ + ldout(cct, 10) << __func__ << " " << *m << dendl; + + rwlock.get_read(); + if (!initialized.read()) { + rwlock.put_read(); + goto out; + } + + if (info->canceled) { + rwlock.put_read(); + goto out; + } + + // notify completion? + if (!info->is_watch) { + assert(info->on_notify_finish); + info->notify_result_bl->claim(m->get_data()); + rwlock.put_read(); + info->on_notify_finish->complete(m->return_code); + goto out; + } + + assert(info->is_watch); + assert(info->watch_context); + + switch (m->opcode) { + case CEPH_WATCH_EVENT_NOTIFY: + rwlock.put_read(); + info->watch_context->handle_notify(m->notify_id, m->cookie, + m->notifier_gid, m->bl); + break; + + case CEPH_WATCH_EVENT_FAILED_NOTIFY: + rwlock.put_read(); + info->watch_context->handle_failed_notify(m->notify_id, m->cookie, + m->notifier_gid); + break; + + case CEPH_WATCH_EVENT_DISCONNECT: + info->last_error = -ENOTCONN; + rwlock.put_read(); + info->watch_context->handle_error(m->cookie, -ENOTCONN); + break; + + default: + rwlock.put_read(); + break; + } + + out: + info->put(); + m->put(); +} + bool Objecter::ms_dispatch(Message *m) { ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl; @@ -703,6 +800,11 @@ bool Objecter::ms_dispatch(Message *m) handle_osd_op_reply(static_cast(m)); return true; + case CEPH_MSG_WATCH_NOTIFY: + handle_watch_notify(static_cast(m)); + m->put(); + return true; + case MSG_COMMAND_REPLY: if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) { handle_command_reply(static_cast(m)); @@ -1715,7 +1817,7 @@ void Objecter::tick() assert(op->session); ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first << " (osd." << op->session->osd << ")" << dendl; toping.insert(op->session); - if (op->cookie && !op->last_error) + if (op->is_watch && !op->last_error) _send_linger_ping(op); } for (map::iterator p = s->command_ops.begin(); @@ -2651,7 +2753,6 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) m->put(); return; } - RWLock::Context lc(rwlock, RWLock::Context::TakenForRead); map::iterator siter = osd_sessions.find(osd_num); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 8e207ab8e3e7..d07103b60454 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -44,6 +44,7 @@ class MPoolOpReply; class MGetPoolStatsReply; class MStatfsReply; class MCommandReply; +class MWatchNotify; class PerfCounters; @@ -1446,6 +1447,19 @@ public: // -- lingering ops -- + struct WatchContext { + // this simply mirrors librados WatchCtx2 + virtual void handle_notify(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id, + bufferlist& bl) = 0; + virtual void handle_failed_notify(uint64_t notify_id, + uint64_t cookie, + uint64_t notifier_id) = 0; + virtual void handle_error(uint64_t cookie, int err) = 0; + virtual ~WatchContext() {} + }; + struct LingerOp : public RefCountedObject { uint64_t linger_id; @@ -1460,7 +1474,7 @@ public: bufferlist *poutbl; version_t *pobjver; - uint64_t cookie; ///< non-zero if this is a watch + bool is_watch; utime_t watch_valid_thru; ///< send time for last acked ping int last_error; ///< error from last failed ping|reconnect, if any Mutex watch_lock; @@ -1468,7 +1482,13 @@ public: bool registered; bool canceled; - Context *on_reg_ack, *on_reg_commit, *on_error; + Context *on_reg_ack, *on_reg_commit; + + // we trigger these from an async finisher + Context *on_notify_finish; + bufferlist *notify_result_bl; + + WatchContext *watch_context; OSDSession *session; @@ -1480,13 +1500,15 @@ public: target(object_t(), object_locator_t(), 0), snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL), - cookie(0), + is_watch(false), last_error(0), watch_lock("Objecter::LingerOp::watch_lock"), registered(false), canceled(false), on_reg_ack(NULL), on_reg_commit(NULL), - on_error(NULL), + on_notify_finish(NULL), + notify_result_bl(NULL), + watch_context(NULL), session(NULL), register_tid(0), ping_tid(0), @@ -1803,6 +1825,7 @@ public: bool ms_can_fast_dispatch(Message *m) const { switch (m->get_type()) { case CEPH_MSG_OSD_OPREPLY: + case CEPH_MSG_WATCH_NOTIFY: return true; default: return false; @@ -1813,6 +1836,7 @@ public: } void handle_osd_op_reply(class MOSDOpReply *m); + void handle_watch_notify(class MWatchNotify *m); void handle_osd_map(class MOSDMap *m); void wait_for_osd_map(); @@ -1968,12 +1992,12 @@ public: // caller owns a ref LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc, - int flags, uint64_t *pcookie); + int flags); ceph_tid_t linger_watch(LingerOp *info, ObjectOperation& op, const SnapContext& snapc, utime_t mtime, - bufferlist& inbl, uint64_t cookie, - Context *onack, Context *onfinish, Context *onerror, + bufferlist& inbl, + Context *onack, Context *onfinish, version_t *objver); ceph_tid_t linger_notify(LingerOp *info, ObjectOperation& op, @@ -1985,6 +2009,8 @@ public: void linger_cancel(LingerOp *info); // releases a reference void _linger_cancel(LingerOp *info); + void _do_watch_notify(LingerOp *info, MWatchNotify *m); + /** * set up initial ops in the op vector, and allocate a final op slot. *