]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objecter: always resend linger registrations
authorSage Weil <sage@inktank.com>
Wed, 18 Jul 2012 19:55:35 +0000 (12:55 -0700)
committerSage Weil <sage@inktank.com>
Wed, 18 Jul 2012 19:55:35 +0000 (12:55 -0700)
If a linger op (watch) is sent to the OSD and updates the object, and then
the client loses the reply, it will resend the request.  The OSD will see
that it is a dup, however, and not set up the in-memory session state for
the watch.  This in turn will break the watch (i.e., notifies won't
get delivered).

Instead, always resend linger registration ops, so that we always have a
unique reqid and do the correct session registeration for each session.

 * track the tid of the registation op for each LingerOp
 * mark registrations ops as should_resend=false; cancel as needed
 * when we send a new registration op, cancel the old one to ensure we
   ignore the reply.  This is needed becuase we resend linger ops on any
   pg change, not just a primary change.
 * drop the first_send arg to send_linger(), as we can now infer that
   from register_tid == 0.

The bug was easily reproduced with ms inject socket failures = 500 and the
test_stress_watch utility.

Fixes: #2796
Signed-off-by: Sage Weil <sage@inktank.com>
Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 15614c4510cc2358dd30cc476b3a33f529988675..829f20326dfc32aa1d48842fd08f825cc4f02630 100644 (file)
@@ -246,44 +246,49 @@ void Objecter::shutdown()
   }
 }
 
-void Objecter::send_linger(LingerOp *info, bool first_send)
+void Objecter::send_linger(LingerOp *info)
 {
-  if (!info->registering) {
-    ldout(cct, 15) << "send_linger " << info->linger_id << dendl;
-    vector<OSDOp> ops = info->ops; // need to pass a copy to ops
-    Context *onack = (!info->registered && info->on_reg_ack) ? new C_Linger_Ack(this, info) : NULL;
-    Context *oncommit = new C_Linger_Commit(this, info);
-    Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ,
-                  onack, oncommit,
-                  info->pobjver);
-    o->snapid = info->snap;
-
-    if (info->session) {
-      int r = recalc_op_target(o);
-      if (r == RECALC_OP_TARGET_POOL_DNE) {
-       linger_check_for_latest_map(info);
-      }
-    }
-
-    if (first_send) {
-      op_submit(o);
-    } else {
-      _op_submit(o);
+  ldout(cct, 15) << "send_linger " << info->linger_id << dendl;
+  vector<OSDOp> opv = info->ops; // need to pass a copy to ops
+  Context *onack = (!info->registered && info->on_reg_ack) ? new C_Linger_Ack(this, info) : NULL;
+  Context *oncommit = new C_Linger_Commit(this, info);
+  Op *o = new Op(info->oid, info->oloc, opv, info->flags | CEPH_OSD_FLAG_READ,
+                onack, oncommit,
+                info->pobjver);
+  o->snapid = info->snap;
+
+  // do not resend this; we will send a new op to reregister
+  o->should_resend = false;
+
+  if (info->session) {
+    int r = recalc_op_target(o);
+    if (r == RECALC_OP_TARGET_POOL_DNE) {
+      linger_check_for_latest_map(info);
     }
+  }
 
-    OSDSession *s = o->session;
-    if (info->session != s) {
-      info->session_item.remove_myself();
-      info->session = s;
-      if (info->session)
-       s->linger_ops.push_back(&info->session_item);
+  if (info->register_tid) {
+    // repeat send.  cancel old registeration op, if any.
+    if (ops.count(info->register_tid)) {
+      Op *o = ops[info->register_tid];
+      cancel_op(o);
     }
-    info->registering = true;
-
-    logger->inc(l_osdc_linger_send);
+    info->register_tid = _op_submit(o);
   } else {
-    ldout(cct, 15) << "send_linger " << info->linger_id << " already (re)registering" << dendl;
+    // first send
+    info->register_tid = op_submit(o);
+  }
+
+  OSDSession *s = o->session;
+  if (info->session != s) {
+    info->session_item.remove_myself();
+    info->session = s;
+    if (info->session)
+      s->linger_ops.push_back(&info->session_item);
   }
+  info->registering = true;
+
+  logger->inc(l_osdc_linger_send);
 }
 
 void Objecter::_linger_ack(LingerOp *info, int r) 
@@ -348,7 +353,7 @@ tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc,
 
   logger->set(l_osdc_linger_active, linger_ops.size());
 
-  send_linger(info, true);
+  send_linger(info);
 
   return info->linger_id;
 }
@@ -450,6 +455,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
             p != linger_ops.end();
             p++) {
          LingerOp *op = p->second;
+         ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
          int r = recalc_linger_op_target(op);
          if (skipped_map)
            r = RECALC_OP_TARGET_NEED_RESEND;
@@ -472,6 +478,7 @@ void Objecter::handle_osd_map(MOSDMap *m)
             p != ops.end();
             ++p) {
          Op *op = p->second;
+         ldout(cct, 10) << " checking op " << op->tid << dendl;
          int r = recalc_op_target(op);
          if (skipped_map)
            r = RECALC_OP_TARGET_NEED_RESEND;
@@ -541,16 +548,20 @@ void Objecter::handle_osd_map(MOSDMap *m)
   // resend requests
   for (map<tid_t, Op*>::iterator p = need_resend.begin(); p != need_resend.end(); p++) {
     Op *op = p->second;
-    if (op->session) {
-      logger->inc(l_osdc_op_resend);
-      send_op(op);
+    if (op->should_resend) {
+      if (op->session) {
+       logger->inc(l_osdc_op_resend);
+       send_op(op);
+      }
+    } else {
+      cancel_op(op);
     }
   }
   for (list<LingerOp*>::iterator p = need_resend_linger.begin(); p != need_resend_linger.end(); p++) {
     LingerOp *op = *p;
     if (op->session) {
       logger->inc(l_osdc_linger_resend);
-      send_linger(op, false);
+      send_linger(op);
     }
   }
 
@@ -747,15 +758,21 @@ void Objecter::kick_requests(OSDSession *session)
   ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
 
   // resend ops
-  for (xlist<Op*>::iterator p = session->ops.begin(); !p.end(); ++p) {
+  for (xlist<Op*>::iterator p = session->ops.begin(); !p.end();) {
+    Op *op = *p;
+    ++p;
     logger->inc(l_osdc_op_resend);
-    send_op(*p);
+    if (op->should_resend) {
+      send_op(op);
+    } else {
+      cancel_op(op);
+    }
   }
 
   // resend lingers
   for (xlist<LingerOp*>::iterator j = session->linger_ops.begin(); !j.end(); ++j) {
     logger->inc(l_osdc_linger_resend);
-    send_linger(*j, false);
+    send_linger(*j);
   }
 }
 
@@ -1086,6 +1103,35 @@ bool Objecter::recalc_linger_op_target(LingerOp *linger_op)
   return RECALC_OP_TARGET_NO_ACTION;
 }
 
+void Objecter::cancel_op(Op *op)
+{
+  ldout(cct, 15) << "cancel_op " << op->tid << dendl;
+
+  // currently this only works for linger registrations, since we just
+  // throw out the callbacks.
+  assert(!op->should_resend);
+  delete op->onack;
+  delete op->oncommit;
+
+  finish_op(op);
+}
+
+void Objecter::finish_op(Op *op)
+{
+  ldout(cct, 15) << "finish_op " << op->tid << dendl;
+
+  op->session_item.remove_myself();
+  if (op->budgeted)
+    put_op_budget(op);
+  if (op->con)
+    op->con->put();
+
+  ops.erase(op->tid);
+  logger->set(l_osdc_op_active, ops.size());
+
+  delete op;
+}
+
 void Objecter::send_op(Op *op)
 {
   ldout(cct, 15) << "send_op " << op->tid << " to osd." << op->session->osd << dendl;
@@ -1292,15 +1338,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
   // done with this tid?
   if (!op->onack && !op->oncommit) {
-    op->session_item.remove_myself();
     ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
-    if (op->budgeted)
-      put_op_budget(op);
-    ops.erase(tid);
-    logger->set(l_osdc_op_active, ops.size());
-    if (op->con)
-      op->con->put();
-    delete op;
+    finish_op(op);
   }
   
   ldout(cct, 5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
index 2d405f1963b02cb39e8edeb834e806276e5de524..3fa765c605729442b7b492de1b36bccf0cc0d642 100644 (file)
@@ -602,6 +602,9 @@ public:
 
     bool budgeted;
 
+    /// true if we should resend this message on failure
+    bool should_resend;
+
     Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
        int f, Context *ac, Context *co, eversion_t *ov) :
       session(NULL), session_item(this), incarnation(0),
@@ -612,7 +615,8 @@ public:
       flags(f), priority(0), onack(ac), oncommit(co),
       tid(0), attempts(0),
       paused(false), objver(ov), reply_epoch(NULL), precalc_pgid(false),
-      budgeted(false) {
+      budgeted(false),
+      should_resend(true) {
       ops.swap(op);
       
       /* initialize out_* to match op vector */
@@ -779,10 +783,13 @@ public:
     OSDSession *session;
     xlist<LingerOp*>::item session_item;
 
+    tid_t register_tid;
+
     LingerOp() : linger_id(0), flags(0), poutbl(NULL), pobjver(NULL),
                 registering(false), registered(false),
                 on_reg_ack(NULL), on_reg_commit(NULL),
-                session(NULL), session_item(this) {}
+                session(NULL), session_item(this),
+                register_tid(0) {}
 
     // no copy!
     const LingerOp &operator=(const LingerOp& r);
@@ -857,6 +864,8 @@ public:
   map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
 
   void send_op(Op *op);
+  void cancel_op(Op *op);
+  void finish_op(Op *op);
   bool is_pg_changed(vector<int>& a, vector<int>& b, bool any_change=false);
   enum recalc_op_target_result {
     RECALC_OP_TARGET_NO_ACTION = 0,
@@ -866,7 +875,7 @@ public:
   int recalc_op_target(Op *op);
   bool recalc_linger_op_target(LingerOp *op);
 
-  void send_linger(LingerOp *info, bool first_send);
+  void send_linger(LingerOp *info);
   void _linger_ack(LingerOp *info, int r);
   void _linger_commit(LingerOp *info, int r);