last_objver = ver;
}
+struct WatchInfo : public Objecter::WatchContext {
+ librados::IoCtxImpl *ioctx;
+ uint64_t user_cookie;
+ object_t oid;
+ librados::WatchCtx *ctx;
+ librados::WatchCtx2 *ctx2;
+
+ WatchInfo(librados::IoCtxImpl *io, uint64_t uc, object_t o,
+ librados::WatchCtx *c, librados::WatchCtx2 *c2)
+ : ioctx(io), user_cookie(uc), oid(o), ctx(c), ctx2(c2) {}
+
+ void handle_notify(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl) {
+ ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
+ << " cookie " << user_cookie
+ << " notifier_id " << notifier_id
+ << " len " << bl.length()
+ << dendl;
+
+ if (ctx2)
+ ctx2->handle_notify(notify_id, user_cookie, notifier_id, bl);
+ if (ctx) {
+ ctx->notify(0, 0, bl);
+
+ // send ACK back to OSD if using legacy protocol
+ bufferlist empty;
+ ioctx->notify_ack(oid, notify_id, user_cookie, empty);
+ }
+ }
+ void handle_failed_notify(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id) {
+ ldout(ioctx->client->cct, 10) << __func__ << " " << notify_id
+ << " cookie " << user_cookie
+ << " notifier_id " << notifier_id
+ << dendl;
+ if (ctx2)
+ ctx2->handle_failed_notify(notify_id, user_cookie, notifier_id);
+ }
+ void handle_error(uint64_t cookie, int err) {
+ ldout(ioctx->client->cct, 10) << __func__ << " cookie " << user_cookie
+ << " err " << err
+ << dendl;
+ if (ctx2)
+ ctx2->handle_error(user_cookie, err);
+ }
+};
+
int librados::IoCtxImpl::watch(const object_t& oid,
- uint64_t *cookie,
+ uint64_t *handle,
librados::WatchCtx *ctx,
librados::WatchCtx2 *ctx2)
{
::ObjectOperation wr;
- Mutex mylock("IoCtxImpl::watch::mylock");
- Cond cond;
- bool done;
- int r;
- Context *onfinish = new C_SafeCond(&mylock, &cond, &done, &r);
version_t objver;
+ C_SaferCond onfinish;
lock->Lock();
- WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
- wc->watch_ctx = ctx;
- wc->watch_ctx2 = ctx2;
- wc->linger_op = objecter->linger_register(oid, oloc, 0, cookie);
- client->register_watch_notify_callback(wc, *cookie);
+ Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
+ *handle = reinterpret_cast<uint64_t>(linger_op);
+ linger_op->watch_context = new WatchInfo(this,
+ reinterpret_cast<uint64_t>(linger_op),
+ oid, ctx, ctx2);
+
prepare_assert_ops(&wr);
- wr.watch(*cookie, CEPH_OSD_WATCH_OP_WATCH);
+ wr.watch(linger_op->linger_id, CEPH_OSD_WATCH_OP_WATCH);
bufferlist bl;
- objecter->linger_watch(wc->linger_op, wr,
+ objecter->linger_watch(linger_op, wr,
snapc, ceph_clock_now(NULL), bl,
- *cookie,
- NULL, onfinish, &wc->on_error,
+ NULL, &onfinish,
&objver);
lock->Unlock();
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
+ int r = onfinish.wait();
set_sync_op_version(objver);
if (r < 0) {
lock->Lock();
- client->unregister_watch_notify_callback(*cookie, NULL); // destroys wc
+ objecter->linger_cancel(linger_op);
lock->Unlock();
}
{
Mutex::Locker l(*lock);
::ObjectOperation rd;
+ Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
prepare_assert_ops(&rd);
- rd.notify_ack(notify_id, cookie, bl);
+ rd.notify_ack(notify_id, linger_op->linger_id, bl);
objecter->read(oid, oloc, rd, snap_seq, (bufferlist*)NULL, 0, 0, 0);
return 0;
}
int librados::IoCtxImpl::watch_check(uint64_t cookie)
{
Mutex::Locker l(*lock);
- return client->watch_check(cookie);
+ Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
+ return objecter->linger_check(linger_op);
}
int librados::IoCtxImpl::unwatch(uint64_t cookie)
{
+ Objecter::LingerOp *linger_op = reinterpret_cast<Objecter::LingerOp*>(cookie);
bufferlist inbl, outbl;
+ C_SaferCond onfinish;
+ version_t ver = 0;
- Mutex mylock("IoCtxImpl::unwatch::mylock");
- Cond cond;
- bool done;
- int r;
- Context *oncommit = new C_SafeCond(&mylock, &cond, &done, &r);
- version_t ver;
lock->Lock();
-
- object_t oid;
- r = client->unregister_watch_notify_callback(cookie, &oid);
- if (r < 0) {
- lock->Unlock();
- return r;
- }
-
::ObjectOperation wr;
prepare_assert_ops(&wr);
- wr.watch(cookie, CEPH_OSD_WATCH_OP_UNWATCH);
- objecter->mutate(oid, oloc, wr, snapc, ceph_clock_now(client->cct), 0, NULL, oncommit, &ver);
+ wr.watch(linger_op->linger_id, CEPH_OSD_WATCH_OP_UNWATCH);
+ objecter->mutate(linger_op->target.base_oid, oloc, wr,
+ snapc, ceph_clock_now(client->cct), 0, NULL, &onfinish, &ver);
+ objecter->linger_cancel(linger_op);
lock->Unlock();
- mylock.Lock();
- while (!done)
- cond.Wait(mylock);
- mylock.Unlock();
-
+ int r = onfinish.wait();
set_sync_op_version(ver);
-
return r;
}
bufferlist *preply_bl,
char **preply_buf, size_t *preply_buf_len)
{
- bufferlist inbl, outbl;
-
- // Construct WatchNotifyInfo
- Cond cond_all;
- Mutex mylock_all("IoCtxImpl::notify::mylock_all");
- bool done_all = false;
- int r_notify = 0;
- WatchNotifyInfo *wc = new WatchNotifyInfo(this, oid);
- wc->notify_done = &done_all;
- wc->notify_lock = &mylock_all;
- wc->notify_cond = &cond_all;
- wc->notify_rval = &r_notify;
- wc->notify_reply_bl = preply_bl;
- wc->notify_reply_buf = preply_buf;
- wc->notify_reply_buf_len = preply_buf_len;
+ bufferlist inbl;
+
+ struct C_NotifyFinish : public Context {
+ Cond cond;
+ Mutex lock;
+ bool done;
+ int result;
+ bufferlist reply_bl;
+
+ C_NotifyFinish()
+ : lock("IoCtxImpl::notify::C_NotifyFinish::lock"),
+ done(false),
+ result(0) { }
+
+ void finish(int r) {}
+ void complete(int r) {
+ lock.Lock();
+ done = true;
+ result = r;
+ cond.Signal();
+ lock.Unlock();
+ }
+ void wait() {
+ lock.Lock();
+ while (!done)
+ cond.Wait(lock);
+ lock.Unlock();
+ }
+ } notify_private;
lock->Lock();
- // Acquire cookie
- uint64_t cookie;
- wc->linger_op = objecter->linger_register(oid, oloc, 0, &cookie);
- client->register_watch_notify_callback(wc, cookie);
+ Objecter::LingerOp *linger_op = objecter->linger_register(oid, oloc, 0);
+ linger_op->on_notify_finish = ¬ify_private;
+ linger_op->notify_result_bl = ¬ify_private.reply_bl;
+
uint32_t prot_ver = 1;
uint32_t timeout = notify_timeout;
if (timeout_ms)
// Construct RADOS op
::ObjectOperation rd;
prepare_assert_ops(&rd);
- rd.notify(cookie, inbl);
+ rd.notify(linger_op->linger_id, inbl);
// Issue RADOS op
C_SaferCond onack;
version_t objver;
- objecter->linger_notify(wc->linger_op,
+ objecter->linger_notify(linger_op,
rd, snap_seq, inbl, NULL,
&onack, &objver);
lock->Unlock();
- ldout(client->cct, 10) << __func__ << " issued linger op " << wc->linger_op << dendl;
+ ldout(client->cct, 10) << __func__ << " issued linger op " << linger_op << dendl;
int r_issue = onack.wait();
- ldout(client->cct, 10) << __func__ << " linger op " << wc->linger_op << " acked (" << r_issue << ")" << dendl;
+ ldout(client->cct, 10) << __func__ << " linger op " << linger_op
+ << " acked (" << r_issue << ")" << dendl;
if (r_issue == 0) {
- ldout(client->cct, 10) << __func__ << "waiting for watch_notify message for linger op " << wc->linger_op << dendl;
- mylock_all.Lock();
- while (!done_all)
- cond_all.Wait(mylock_all);
- mylock_all.Unlock();
+ ldout(client->cct, 10) << __func__ << " waiting for watch_notify finish "
+ << linger_op << dendl;
+ notify_private.wait();
+
+ // pass result back to user
+ if (notify_private.result >= 0) {
+ if (preply_buf) {
+ *preply_buf = (char*)malloc(notify_private.reply_bl.length());
+ memcpy(*preply_buf, notify_private.reply_bl.c_str(),
+ notify_private.reply_bl.length());
+ }
+ if (preply_buf_len)
+ *preply_buf_len = notify_private.reply_bl.length();
+ if (preply_bl)
+ preply_bl->claim(notify_private.reply_bl);
+ }
}
- ldout(client->cct, 10) << __func__ << " completed notify (linger op " << wc->linger_op << "), unregistering" << dendl;
+ ldout(client->cct, 10) << __func__ << " completed notify (linger op "
+ << linger_op << "), unregistering" << dendl;
lock->Lock();
- client->unregister_watch_notify_callback(cookie, NULL); // destroys wc
+ objecter->linger_cancel(linger_op);
lock->Unlock();
set_sync_op_version(objver);
- return r_issue == 0 ? r_notify : r_issue;
+ return r_issue ? r_issue : notify_private.result;
}
int librados::IoCtxImpl::set_alloc_hint(const object_t& oid,
};
-namespace librados {
-
-/**
- * watch/notify info
- *
- * Capture state about a watch or an in-progress notify
- */
-struct WatchNotifyInfo : public RefCountedWaitObject {
- IoCtxImpl *io_ctx_impl; // parent
- const object_t oid; // the object
- class Objecter::LingerOp *linger_op; // we use this to unlinger when we are done
- uint64_t cookie; // callback cookie
- int err;
-
- // watcher. only one of these will be defined.
- librados::WatchCtx *watch_ctx;
- librados::WatchCtx2 *watch_ctx2;
-
- // notify that we initiated
- Mutex *notify_lock;
- Cond *notify_cond;
- bool *notify_done;
- bufferlist *notify_reply_bl;
- char **notify_reply_buf;
- size_t *notify_reply_buf_len;
- int *notify_rval;
-
- struct OnError : public Context {
- WatchNotifyInfo *info;
- void finish(int r) { assert(0); }
- void complete(int r);
- } on_error;
-
- WatchNotifyInfo(IoCtxImpl *io_ctx_impl_,
- const object_t& _oc)
- : io_ctx_impl(io_ctx_impl_),
- oid(_oc),
- linger_op(NULL),
- cookie(0),
- err(0),
- watch_ctx(NULL),
- watch_ctx2(NULL),
- notify_lock(NULL),
- notify_cond(NULL),
- notify_done(NULL),
- notify_reply_bl(NULL),
- notify_reply_buf(NULL),
- notify_reply_buf_len(NULL),
- notify_rval(NULL) {
- io_ctx_impl->get();
- on_error.info = this;
- }
-
- ~WatchNotifyInfo() {
- io_ctx_impl->put();
- }
-};
-}
#endif
#include "include/buffer.h"
#include "include/stringify.h"
-#include "messages/MWatchNotify.h"
#include "messages/MLog.h"
#include "msg/Messenger.h"
case CEPH_MSG_MDS_MAP:
break;
- case CEPH_MSG_WATCH_NOTIFY:
- handle_watch_notify(static_cast<MWatchNotify *>(m));
- break;
-
case MSG_LOG:
handle_log(static_cast<MLog *>(m));
break;
objecter->blacklist_self(set);
}
-
-// -----------
-// watch/notify
-
-void librados::RadosClient::register_watch_notify_callback(
- WatchNotifyInfo *wc,
- uint64_t cookie)
-{
- assert(lock.is_locked_by_me());
- wc->cookie = cookie;
- ldout(cct,10) << __func__ << " cookie " << wc->cookie << dendl;
- watch_notify_info[wc->cookie] = wc;
-}
-
-int librados::RadosClient::unregister_watch_notify_callback(uint64_t cookie,
- object_t *poid)
-{
- ldout(cct,10) << __func__ << " cookie " << cookie << dendl;
- assert(lock.is_locked_by_me());
- map<uint64_t, WatchNotifyInfo *>::iterator iter = watch_notify_info.find(cookie);
- if (iter == watch_notify_info.end())
- return -EBADF;
-
- WatchNotifyInfo *ctx = iter->second;
- if (poid)
- *poid = ctx->oid;
- if (ctx->linger_op) {
- objecter->linger_cancel(ctx->linger_op);
- ctx->linger_op = NULL;
- }
-
- watch_notify_info.erase(iter);
- lock.Unlock();
- ldout(cct, 10) << __func__ << " dropping reference, waiting ctx="
- << (void *)ctx << dendl;
- ctx->put_wait();
- ldout(cct, 10) << __func__ << " done ctx=" << (void *)ctx << dendl;
- lock.Lock();
- return 0;
-}
-
-int librados::RadosClient::watch_check(uint64_t cookie)
-{
- ldout(cct,10) << __func__ << " cookie " << cookie << dendl;
- map<uint64_t, WatchNotifyInfo *>::iterator iter = watch_notify_info.find(cookie);
- if (iter == watch_notify_info.end())
- return -EBADF;
- WatchNotifyInfo *ctx = iter->second;
- if (ctx->err)
- return ctx->err;
- return objecter->linger_check(ctx->linger_op);
-}
-
-struct C_DoWatchNotify : public Context {
- librados::RadosClient *rados;
- MWatchNotify *m;
- C_DoWatchNotify(librados::RadosClient *r, MWatchNotify *m) : rados(r), m(m) {}
- void finish(int r) {
- rados->do_watch_notify(m);
- }
-};
-
-struct C_DoWatchError : public Context {
- librados::RadosClient *rados;
- uint64_t cookie;
- int err;
- C_DoWatchError(librados::RadosClient *r, uint64_t cookie, int err)
- : rados(r), cookie(cookie), err(err) {}
- void finish(int r) {
- rados->do_watch_error(cookie, err);
- }
-};
-
-void librados::WatchNotifyInfo::OnError::complete(int r)
-{
- RadosClient *client = info->io_ctx_impl->client;
- client->finisher.queue(new C_DoWatchError(client, info->cookie, r));
-}
-
-void librados::RadosClient::do_watch_error(uint64_t cookie, int err)
-{
- Mutex::Locker l(lock);
- map<uint64_t, WatchNotifyInfo *>::iterator iter =
- watch_notify_info.find(cookie);
- if (iter != watch_notify_info.end()) {
- WatchNotifyInfo *wc = iter->second;
- assert(wc);
- wc->err = err;
- if (wc->watch_ctx2) {
- wc->get();
- ldout(cct,10) << __func__ << " cookie " << cookie
- << " handle_error " << err << dendl;
- lock.Unlock();
- wc->watch_ctx2->handle_error(cookie, err);
- lock.Lock();
- ldout(cct,10) << __func__ << " cookie " << cookie
- << " handle_error " << err << " done" << dendl;
- wc->put();
- }
- } else {
- ldout(cct,10) << __func__ << " cookie " << cookie << " not found" << dendl;
- }
-}
-
-void librados::RadosClient::handle_watch_notify(MWatchNotify *m)
-{
- Mutex::Locker l(lock);
-
- if (watch_notify_info.count(m->cookie)) {
- ldout(cct,10) << __func__ << " queueing async " << *m << dendl;
- // deliver this async via a finisher thread
- finisher.queue(new C_DoWatchNotify(this, m));
- } else {
- // drop it on the floor
- ldout(cct,10) << __func__ << " cookie " << m->cookie << " unknown" << dendl;
- m->put();
- }
-}
-
-void librados::RadosClient::do_watch_notify(MWatchNotify *m)
-{
- Mutex::Locker l(lock);
- map<uint64_t, WatchNotifyInfo *>::iterator iter =
- watch_notify_info.find(m->cookie);
- if (iter != watch_notify_info.end()) {
- WatchNotifyInfo *wc = iter->second;
- assert(wc);
- if (wc->notify_lock) {
- // we sent a notify and it completed (or failed)
- // NOTE: opcode may be either NOTIFY (older OSDs) or NOTIFY_COMPLETE
- // (newer OSDs). In practice it doesn't matter because completion is the
- // only kind of event we get on notify cookies.
- ldout(cct,10) << __func__ << " completed notify " << *m << dendl;
- wc->notify_lock->Lock();
- *wc->notify_done = true;
- *wc->notify_rval = m->return_code;
- if (wc->notify_reply_bl) {
- wc->notify_reply_bl->claim(m->get_data());
- }
- if (wc->notify_reply_buf) {
- *wc->notify_reply_buf = (char*)malloc(m->get_data().length());
- memcpy(*wc->notify_reply_buf, m->get_data().c_str(),
- m->get_data().length());
- }
- if (wc->notify_reply_buf_len) {
- *wc->notify_reply_buf_len = m->get_data().length();
- }
- wc->notify_cond->Signal();
- wc->notify_lock->Unlock();
- } else if (m->opcode == CEPH_WATCH_EVENT_NOTIFY) {
- // we are watcher and got a notify
- ldout(cct,10) << __func__ << " got notify " << *m << dendl;
- wc->get();
-
- // trigger the callback
- assert(!!wc->watch_ctx ^ !!wc->watch_ctx2); // only one is defined
- lock.Unlock();
- if (wc->watch_ctx) {
- wc->watch_ctx->notify(CEPH_WATCH_EVENT_NOTIFY, m->ver, m->bl);
- // send ACK back to the OSD
- bufferlist empty;
- wc->io_ctx_impl->notify_ack(wc->oid, m->notify_id, m->cookie, empty);
- } else if (wc->watch_ctx2) {
- wc->watch_ctx2->handle_notify(m->notify_id, m->cookie,
- m->notifier_gid, m->bl);
- // user needs to explicitly ack (and may have already!)
- }
- lock.Lock();
- ldout(cct,10) << __func__ << " notify done" << dendl;
- wc->put();
- } else if (m->opcode == CEPH_WATCH_EVENT_FAILED_NOTIFY) {
- // we are watcher and failed to ack a notify in time, causing it to time
- // out.
- ldout(cct,10) << __func__ << " failed notify " << *m << dendl;
- wc->get();
- // trigger the callback
- assert(!!wc->watch_ctx ^ !!wc->watch_ctx2); // only one is defined
- lock.Unlock();
- if (wc->watch_ctx2) {
- wc->watch_ctx2->handle_failed_notify(m->notify_id, m->cookie,
- m->notifier_gid);
- }
- lock.Lock();
- ldout(cct,10) << __func__ << " failed notify done" << dendl;
- wc->put();
- } else if (m->opcode == CEPH_WATCH_EVENT_DISCONNECT) {
- // we failed to ping or reconnect and our watch was canceled.
- ldout(cct,10) << __func__ << " disconnect " << *m << dendl;
- wc->err = -ENOTCONN;
- if (wc->watch_ctx2) {
- wc->get();
- // trigger the callback
- lock.Unlock();
- wc->watch_ctx2->handle_error(m->cookie, -ENOTCONN);
- lock.Lock();
- ldout(cct,10) << __func__ << " disconnect done" << dendl;
- wc->put();
- } else {
- lderr(cct) << __func__ << " watch disconnect on "
- << wc->oid << " on old API user, silently ignoring"
- << dendl;
- }
- } else {
- lderr(cct) << __func__ << " got unknown event " << m->opcode
- << " " << ceph_watch_event_name(m->opcode) << dendl;
- }
- } else {
- ldout(cct, 4) << __func__ << " unknown cookie " << m->cookie << dendl;
- }
- m->put();
-}
-
-
int librados::RadosClient::mon_command(const vector<string>& cmd,
const bufferlist &inbl,
bufferlist *outbl, string *outs)
struct Connection;
struct md_config_t;
class Message;
-class MWatchNotify;
class MLog;
class Messenger;
-struct WatchNotifyInfo;
class librados::RadosClient : public Dispatcher
{
int pool_delete_async(const char *name, PoolAsyncCompletionImpl *c);
- // watch/notify
- map<uint64_t, librados::WatchNotifyInfo *> watch_notify_info;
-
- void register_watch_notify_callback(librados::WatchNotifyInfo *wc,
- uint64_t cookie);
- int unregister_watch_notify_callback(uint64_t cookie, object_t *poid);
- int watch_check(uint64_t cookie);
- void handle_watch_notify(MWatchNotify *m);
- void do_watch_notify(MWatchNotify *m);
- void do_watch_error(uint64_t cookie, int err);
-
int mon_command(const vector<string>& cmd, const bufferlist &inbl,
bufferlist *outbl, string *outs);
int mon_command(int rank,
#include "messages/MCommand.h"
#include "messages/MCommandReply.h"
+#include "messages/MWatchNotify.h"
+
#include <errno.h>
#include "common/config.h"
onack = new C_Linger_Reconnect(this, info);
opv.push_back(OSDOp());
opv.back().op.op = CEPH_OSD_OP_WATCH;
- opv.back().op.watch.cookie = info->cookie;
+ opv.back().op.watch.cookie = info->linger_id;
opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
} else {
ldout(cct, 15) << "send_linger " << info->linger_id << " register" << dendl;
info->pobjver = NULL;
}
+struct C_DoWatchError : public Context {
+ Objecter::LingerOp *info;
+ int err;
+ C_DoWatchError(Objecter::LingerOp *i, int r) : info(i), err(r) {
+ info->get();
+ }
+ void finish(int r) {
+ info->watch_context->handle_error(info->linger_id, err);
+ info->put();
+ }
+};
+
void Objecter::_linger_reconnect(LingerOp *info, int r)
{
ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
info->watch_lock.Lock();
info->last_error = r;
info->watch_cond.Signal();
- if (info->on_error)
- info->on_error->complete(r);
+ if (info->watch_context)
+ finisher->queue(new C_DoWatchError(info, r));
info->watch_lock.Unlock();
}
}
vector<OSDOp> opv(1);
opv[0].op.op = CEPH_OSD_OP_WATCH;
- opv[0].op.watch.cookie = info->cookie;
+ opv[0].op.watch.cookie = info->linger_id;
opv[0].op.watch.op = CEPH_OSD_WATCH_OP_PING;
C_Linger_Ping *onack = new C_Linger_Ping(this, info);
Op *o = new Op(info->target.base_oid, info->target.base_oloc,
info->watch_valid_thru = sent;
} else if (r < 0) {
info->last_error = r;
- if (info->on_error)
- info->on_error->complete(r);
+ if (info->watch_context)
+ finisher->queue(new C_DoWatchError(info, r));
}
info->watch_cond.SignalAll();
info->watch_lock.Unlock();
Objecter::LingerOp *Objecter::linger_register(const object_t& oid,
const object_locator_t& oloc,
- int flags,
- uint64_t *cookie)
+ int flags)
{
LingerOp *info = new LingerOp;
info->target.base_oid = oid;
// Acquire linger ID
info->linger_id = ++max_linger_id;
- *cookie = info->linger_id;
ldout(cct, 10) << __func__ << " info " << info
<< " linger_id " << info->linger_id << dendl;
linger_ops[info->linger_id] = info;
ceph_tid_t Objecter::linger_watch(LingerOp *info,
ObjectOperation& op,
const SnapContext& snapc, utime_t mtime,
- bufferlist& inbl, uint64_t cookie,
+ bufferlist& inbl,
Context *onack, Context *oncommit,
- Context *onerror,
version_t *objver)
{
+ info->is_watch = true;
info->snapc = snapc;
info->mtime = mtime;
info->target.flags |= CEPH_OSD_FLAG_WRITE;
info->ops = op.ops;
- info->cookie = cookie;
info->inbl = inbl;
info->poutbl = NULL;
info->pobjver = objver;
info->on_reg_ack = onack;
info->on_reg_commit = oncommit;
- info->on_error = onerror;
RWLock::WLocker wl(rwlock);
_linger_submit(info);
_send_linger(info);
}
+struct C_DoWatchNotify : public Context {
+ Objecter *objecter;
+ Objecter::LingerOp *info;
+ MWatchNotify *msg;
+ C_DoWatchNotify(Objecter *o, Objecter::LingerOp *i, MWatchNotify *m)
+ : objecter(o), info(i), msg(m) {
+ info->get();
+ msg->get();
+ }
+ void finish(int r) {
+ objecter->_do_watch_notify(info, msg);
+ }
+};
+
+void Objecter::handle_watch_notify(MWatchNotify *m)
+{
+ RWLock::RLocker l(rwlock);
+ if (!initialized.read()) {
+ return;
+ }
+
+ map<uint64_t,LingerOp*>::iterator p = linger_ops.find(m->cookie);
+ if (p == linger_ops.end()) {
+ ldout(cct, 7) << __func__ << " cookie " << m->cookie << " dne" << dendl;
+ return;
+ }
+
+ LingerOp *info = p->second;
+ finisher->queue(new C_DoWatchNotify(this, info, m));
+}
+
+void Objecter::_do_watch_notify(LingerOp *info, MWatchNotify *m)
+{
+ ldout(cct, 10) << __func__ << " " << *m << dendl;
+
+ rwlock.get_read();
+ if (!initialized.read()) {
+ rwlock.put_read();
+ goto out;
+ }
+
+ if (info->canceled) {
+ rwlock.put_read();
+ goto out;
+ }
+
+ // notify completion?
+ if (!info->is_watch) {
+ assert(info->on_notify_finish);
+ info->notify_result_bl->claim(m->get_data());
+ rwlock.put_read();
+ info->on_notify_finish->complete(m->return_code);
+ goto out;
+ }
+
+ assert(info->is_watch);
+ assert(info->watch_context);
+
+ switch (m->opcode) {
+ case CEPH_WATCH_EVENT_NOTIFY:
+ rwlock.put_read();
+ info->watch_context->handle_notify(m->notify_id, m->cookie,
+ m->notifier_gid, m->bl);
+ break;
+
+ case CEPH_WATCH_EVENT_FAILED_NOTIFY:
+ rwlock.put_read();
+ info->watch_context->handle_failed_notify(m->notify_id, m->cookie,
+ m->notifier_gid);
+ break;
+
+ case CEPH_WATCH_EVENT_DISCONNECT:
+ info->last_error = -ENOTCONN;
+ rwlock.put_read();
+ info->watch_context->handle_error(m->cookie, -ENOTCONN);
+ break;
+
+ default:
+ rwlock.put_read();
+ break;
+ }
+
+ out:
+ info->put();
+ m->put();
+}
+
bool Objecter::ms_dispatch(Message *m)
{
ldout(cct, 10) << __func__ << " " << cct << " " << *m << dendl;
handle_osd_op_reply(static_cast<MOSDOpReply*>(m));
return true;
+ case CEPH_MSG_WATCH_NOTIFY:
+ handle_watch_notify(static_cast<MWatchNotify*>(m));
+ m->put();
+ return true;
+
case MSG_COMMAND_REPLY:
if (m->get_source().type() == CEPH_ENTITY_TYPE_OSD) {
handle_command_reply(static_cast<MCommandReply*>(m));
assert(op->session);
ldout(cct, 10) << " pinging osd that serves lingering tid " << p->first << " (osd." << op->session->osd << ")" << dendl;
toping.insert(op->session);
- if (op->cookie && !op->last_error)
+ if (op->is_watch && !op->last_error)
_send_linger_ping(op);
}
for (map<uint64_t,CommandOp*>::iterator p = s->command_ops.begin();
m->put();
return;
}
-
RWLock::Context lc(rwlock, RWLock::Context::TakenForRead);
map<int, OSDSession *>::iterator siter = osd_sessions.find(osd_num);
class MGetPoolStatsReply;
class MStatfsReply;
class MCommandReply;
+class MWatchNotify;
class PerfCounters;
// -- lingering ops --
+ struct WatchContext {
+ // this simply mirrors librados WatchCtx2
+ virtual void handle_notify(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id,
+ bufferlist& bl) = 0;
+ virtual void handle_failed_notify(uint64_t notify_id,
+ uint64_t cookie,
+ uint64_t notifier_id) = 0;
+ virtual void handle_error(uint64_t cookie, int err) = 0;
+ virtual ~WatchContext() {}
+ };
+
struct LingerOp : public RefCountedObject {
uint64_t linger_id;
bufferlist *poutbl;
version_t *pobjver;
- uint64_t cookie; ///< non-zero if this is a watch
+ bool is_watch;
utime_t watch_valid_thru; ///< send time for last acked ping
int last_error; ///< error from last failed ping|reconnect, if any
Mutex watch_lock;
bool registered;
bool canceled;
- Context *on_reg_ack, *on_reg_commit, *on_error;
+ Context *on_reg_ack, *on_reg_commit;
+
+ // we trigger these from an async finisher
+ Context *on_notify_finish;
+ bufferlist *notify_result_bl;
+
+ WatchContext *watch_context;
OSDSession *session;
target(object_t(), object_locator_t(), 0),
snap(CEPH_NOSNAP),
poutbl(NULL), pobjver(NULL),
- cookie(0),
+ is_watch(false),
last_error(0),
watch_lock("Objecter::LingerOp::watch_lock"),
registered(false),
canceled(false),
on_reg_ack(NULL), on_reg_commit(NULL),
- on_error(NULL),
+ on_notify_finish(NULL),
+ notify_result_bl(NULL),
+ watch_context(NULL),
session(NULL),
register_tid(0),
ping_tid(0),
bool ms_can_fast_dispatch(Message *m) const {
switch (m->get_type()) {
case CEPH_MSG_OSD_OPREPLY:
+ case CEPH_MSG_WATCH_NOTIFY:
return true;
default:
return false;
}
void handle_osd_op_reply(class MOSDOpReply *m);
+ void handle_watch_notify(class MWatchNotify *m);
void handle_osd_map(class MOSDMap *m);
void wait_for_osd_map();
// caller owns a ref
LingerOp *linger_register(const object_t& oid, const object_locator_t& oloc,
- int flags, uint64_t *pcookie);
+ int flags);
ceph_tid_t linger_watch(LingerOp *info,
ObjectOperation& op,
const SnapContext& snapc, utime_t mtime,
- bufferlist& inbl, uint64_t cookie,
- Context *onack, Context *onfinish, Context *onerror,
+ bufferlist& inbl,
+ Context *onack, Context *onfinish,
version_t *objver);
ceph_tid_t linger_notify(LingerOp *info,
ObjectOperation& op,
void linger_cancel(LingerOp *info); // releases a reference
void _linger_cancel(LingerOp *info);
+ void _do_watch_notify(LingerOp *info, MWatchNotify *m);
+
/**
* set up initial ops in the op vector, and allocate a final op slot.
*