From: Sage Weil Date: Fri, 17 Oct 2014 03:16:07 +0000 (-0700) Subject: osdc/Objecter: separate WATCH from RECONNECT X-Git-Tag: v0.91~139 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e424103301904dc0d97de92438554a7ae7b755b3;p=ceph.git osdc/Objecter: separate WATCH from RECONNECT Use WATCH op for the initial registration. This is idempotent in that it will succeed whether the watch information has been persisted or not. It is used by the client if it does not know that it is registered. The RECONNECT op is used for any subsequent session reconnect. It will fail if the watch state isn't already persisted on the OSD. Signed-off-by: Sage Weil --- diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 2e832ff5178f..a0c892838176 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -411,10 +411,20 @@ void Objecter::_send_linger(LingerOp *info) RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite); - ldout(cct, 15) << "send_linger " << info->linger_id << dendl; - vector opv = info->ops; // need to pass a copy to ops - Context *onack = (!info->registered && info->on_reg_ack) ? - new C_Linger_Ack(this, info) : NULL; + vector opv; + Context *onack = NULL; + if (info->registered) { + ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" << dendl; + onack = new C_Linger_Reconnect(this, info); + opv.push_back(OSDOp()); + opv.back().op.op = CEPH_OSD_OP_WATCH; + opv.back().op.watch.cookie = info->cookie; + opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT; + } else { + ldout(cct, 15) << "send_linger " << info->linger_id << " register" << dendl; + onack = new C_Linger_Register(this, info); + opv = info->ops; + } Context *oncommit = new C_Linger_Commit(this, info); Op *o = new Op(info->target.base_oid, info->target.base_oloc, opv, info->target.flags | CEPH_OSD_FLAG_READ, @@ -449,9 +459,9 @@ void Objecter::_send_linger(LingerOp *info) logger->inc(l_osdc_linger_send); } -void Objecter::_linger_ack(LingerOp *info, int r) +void Objecter::_linger_register(LingerOp *info, int r) { - ldout(cct, 10) << "_linger_ack " << info->linger_id << dendl; + ldout(cct, 10) << "_linger_register " << info->linger_id << dendl; if (info->on_reg_ack) { info->on_reg_ack->complete(r); info->on_reg_ack = NULL; @@ -471,6 +481,12 @@ void Objecter::_linger_commit(LingerOp *info, int r) info->pobjver = NULL; } +void Objecter::_linger_reconnect(LingerOp *info, int r) +{ + ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r + << " (last_error " << info->last_error << ")" << dendl; +} + void Objecter::unregister_linger(uint64_t linger_id) { RWLock::WLocker wl(rwlock); @@ -514,6 +530,7 @@ ceph_tid_t Objecter::linger_mutate(const object_t& oid, const object_locator_t& info->mtime = mtime; info->target.flags = flags | CEPH_OSD_FLAG_WRITE; info->ops = op.ops; + info->cookie = cookie; info->inbl = inbl; info->poutbl = NULL; info->pobjver = objver; diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 64cf6bcb6c04..be20ddbb0320 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1458,6 +1458,8 @@ public: bufferlist *poutbl; version_t *pobjver; + uint64_t cookie; ///< non-zero if this is a watch + int last_error; ///< error from last failed ping|reconnect, if any bool registered; bool canceled; Context *on_reg_ack, *on_reg_commit; @@ -1471,6 +1473,8 @@ public: target(object_t(), object_locator_t(), 0), snap(CEPH_NOSNAP), poutbl(NULL), pobjver(NULL), + cookie(0), + last_error(0), registered(false), canceled(false), on_reg_ack(NULL), on_reg_commit(NULL), @@ -1485,17 +1489,17 @@ public: ~LingerOp() {} }; - struct C_Linger_Ack : public Context { + struct C_Linger_Register : public Context { Objecter *objecter; LingerOp *info; - C_Linger_Ack(Objecter *o, LingerOp *l) : objecter(o), info(l) { + C_Linger_Register(Objecter *o, LingerOp *l) : objecter(o), info(l) { info->get(); } - ~C_Linger_Ack() { + ~C_Linger_Register() { info->put(); } void finish(int r) { - objecter->_linger_ack(info, r); + objecter->_linger_register(info, r); } }; @@ -1513,6 +1517,20 @@ public: } }; + struct C_Linger_Reconnect : public Context { + Objecter *objecter; + LingerOp *info; + C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) { + info->get(); + } + ~C_Linger_Reconnect() { + info->put(); + } + void finish(int r) { + objecter->_linger_reconnect(info, r); + } + }; + struct C_Linger_Map_Latest : public Context { Objecter *objecter; uint64_t linger_id; @@ -1618,8 +1636,9 @@ public: void _linger_submit(LingerOp *info); void _send_linger(LingerOp *info); - void _linger_ack(LingerOp *info, int r); + void _linger_register(LingerOp *info, int r); void _linger_commit(LingerOp *info, int r); + void _linger_reconnect(LingerOp *info, int r); void _check_op_pool_dne(Op *op, bool session_locked); void _send_op_map_check(Op *op);