]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Objecter: separate WATCH from RECONNECT
authorSage Weil <sage@redhat.com>
Fri, 17 Oct 2014 03:16:07 +0000 (20:16 -0700)
committerSage Weil <sage@redhat.com>
Thu, 4 Dec 2014 18:32:38 +0000 (10:32 -0800)
Use WATCH op for the initial registration.  This is idempotent in that
it will succeed whether the watch information has been persisted or not.
It is used by the client if it does not know that it is registered.

The RECONNECT op is used for any subsequent session reconnect.  It will
fail if the watch state isn't already persisted on the OSD.

Signed-off-by: Sage Weil <sage@redhat.com>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 2e832ff5178f57393b515942821fe43b8d264bf0..a0c8928381769c10f12d4ad3479ea9968bd1784e 100644 (file)
@@ -411,10 +411,20 @@ void Objecter::_send_linger(LingerOp *info)
 
   RWLock::Context lc(rwlock, RWLock::Context::TakenForWrite);
 
-  ldout(cct, 15) << "send_linger " << info->linger_id << dendl;
-  vector<OSDOp> opv = info->ops; // need to pass a copy to ops
-  Context *onack = (!info->registered && info->on_reg_ack) ?
-    new C_Linger_Ack(this, info) : NULL;
+  vector<OSDOp> opv;
+  Context *onack = NULL;
+  if (info->registered) {
+    ldout(cct, 15) << "send_linger " << info->linger_id << " reconnect" << dendl;
+    onack = new C_Linger_Reconnect(this, info);
+    opv.push_back(OSDOp());
+    opv.back().op.op = CEPH_OSD_OP_WATCH;
+    opv.back().op.watch.cookie = info->cookie;
+    opv.back().op.watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
+  } else {
+    ldout(cct, 15) << "send_linger " << info->linger_id << " register" << dendl;
+    onack = new C_Linger_Register(this, info);
+    opv = info->ops;
+  }
   Context *oncommit = new C_Linger_Commit(this, info);
   Op *o = new Op(info->target.base_oid, info->target.base_oloc,
                 opv, info->target.flags | CEPH_OSD_FLAG_READ,
@@ -449,9 +459,9 @@ void Objecter::_send_linger(LingerOp *info)
   logger->inc(l_osdc_linger_send);
 }
 
-void Objecter::_linger_ack(LingerOp *info, int r) 
+void Objecter::_linger_register(LingerOp *info, int r)
 {
-  ldout(cct, 10) << "_linger_ack " << info->linger_id << dendl;
+  ldout(cct, 10) << "_linger_register " << info->linger_id << dendl;
   if (info->on_reg_ack) {
     info->on_reg_ack->complete(r);
     info->on_reg_ack = NULL;
@@ -471,6 +481,12 @@ void Objecter::_linger_commit(LingerOp *info, int r)
   info->pobjver = NULL;
 }
 
+void Objecter::_linger_reconnect(LingerOp *info, int r)
+{
+  ldout(cct, 10) << __func__ << " " << info->linger_id << " = " << r
+                << " (last_error " << info->last_error << ")" << dendl;
+}
+
 void Objecter::unregister_linger(uint64_t linger_id)
 {
   RWLock::WLocker wl(rwlock);
@@ -514,6 +530,7 @@ ceph_tid_t Objecter::linger_mutate(const object_t& oid, const object_locator_t&
   info->mtime = mtime;
   info->target.flags = flags | CEPH_OSD_FLAG_WRITE;
   info->ops = op.ops;
+  info->cookie = cookie;
   info->inbl = inbl;
   info->poutbl = NULL;
   info->pobjver = objver;
index 64cf6bcb6c04775f47e579a99d4f67e372db7020..be20ddbb03205719847da403a4bfda13dd7877ed 100644 (file)
@@ -1458,6 +1458,8 @@ public:
     bufferlist *poutbl;
     version_t *pobjver;
 
+    uint64_t cookie;   ///< non-zero if this is a watch
+    int last_error;  ///< error from last failed ping|reconnect, if any
     bool registered;
     bool canceled;
     Context *on_reg_ack, *on_reg_commit;
@@ -1471,6 +1473,8 @@ public:
                 target(object_t(), object_locator_t(), 0),
                 snap(CEPH_NOSNAP),
                 poutbl(NULL), pobjver(NULL),
+                cookie(0),
+                last_error(0),
                 registered(false),
                 canceled(false),
                 on_reg_ack(NULL), on_reg_commit(NULL),
@@ -1485,17 +1489,17 @@ public:
     ~LingerOp() {}
   };
 
-  struct C_Linger_Ack : public Context {
+  struct C_Linger_Register : public Context {
     Objecter *objecter;
     LingerOp *info;
-    C_Linger_Ack(Objecter *o, LingerOp *l) : objecter(o), info(l) {
+    C_Linger_Register(Objecter *o, LingerOp *l) : objecter(o), info(l) {
       info->get();
     }
-    ~C_Linger_Ack() {
+    ~C_Linger_Register() {
       info->put();
     }
     void finish(int r) {
-      objecter->_linger_ack(info, r);
+      objecter->_linger_register(info, r);
     }
   };
   
@@ -1513,6 +1517,20 @@ public:
     }
   };
 
+  struct C_Linger_Reconnect : public Context {
+    Objecter *objecter;
+    LingerOp *info;
+    C_Linger_Reconnect(Objecter *o, LingerOp *l) : objecter(o), info(l) {
+      info->get();
+    }
+    ~C_Linger_Reconnect() {
+      info->put();
+    }
+    void finish(int r) {
+      objecter->_linger_reconnect(info, r);
+    }
+  };
+
   struct C_Linger_Map_Latest : public Context {
     Objecter *objecter;
     uint64_t linger_id;
@@ -1618,8 +1636,9 @@ public:
 
   void _linger_submit(LingerOp *info);
   void _send_linger(LingerOp *info);
-  void _linger_ack(LingerOp *info, int r);
+  void _linger_register(LingerOp *info, int r);
   void _linger_commit(LingerOp *info, int r);
+  void _linger_reconnect(LingerOp *info, int r);
 
   void _check_op_pool_dne(Op *op, bool session_locked);
   void _send_op_map_check(Op *op);