]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc/Objecter: onack + oncommit -> onfinish etc
authorSage Weil <sage@redhat.com>
Wed, 21 Dec 2016 20:20:47 +0000 (15:20 -0500)
committerSage Weil <sage@redhat.com>
Wed, 28 Dec 2016 22:08:42 +0000 (17:08 -0500)
And num_unack + num_unsafe -> num_in_flight.
l_osdc_op_ack + l_osdc_op_commit -> l_osdc_op_reply

Signed-off-by: Sage Weil <sage@redhat.com>
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 410c6646995af03054039449faa7e27696b178bf..391934d7d0678122dae26f29e436e69515b0fe40 100644 (file)
@@ -70,8 +70,7 @@ enum {
   l_osdc_op_send,
   l_osdc_op_send_bytes,
   l_osdc_op_resend,
-  l_osdc_op_ack,
-  l_osdc_op_commit,
+  l_osdc_op_reply,
 
   l_osdc_op,
   l_osdc_op_r,
@@ -247,8 +246,7 @@ void Objecter::init()
     pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
     pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data");
     pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
-    pcb.add_u64_counter(l_osdc_op_ack, "op_ack", "Commit callbacks");
-    pcb.add_u64_counter(l_osdc_op_commit, "op_commit", "Operation commits");
+    pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
 
     pcb.add_u64_counter(l_osdc_op, "op", "Operations");
     pcb.add_u64_counter(l_osdc_op_r, "op_r",
@@ -555,7 +553,7 @@ void Objecter::_send_linger(LingerOp *info,
   watchl.unlock();
   Op *o = new Op(info->target.base_oid, info->target.base_oloc,
                 opv, info->target.flags | CEPH_OSD_FLAG_READ,
-                NULL, NULL,
+                NULL,
                 info->pobjver);
   o->oncommit_sync = oncommit;
   o->outbl = poutbl;
@@ -1457,11 +1455,8 @@ void Objecter::_check_op_pool_dne(Op *op, unique_lock& sl)
       ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
                     << " concluding pool " << op->target.base_pgid.pool()
                     << " dne" << dendl;
-      if (op->onack) {
-       op->onack->complete(-ENOENT);
-      }
-      if (op->oncommit) {
-       op->oncommit->complete(-ENOENT);
+      if (op->onfinish) {
+       op->onfinish->complete(-ENOENT);
       }
       if (op->oncommit_sync) {
        op->oncommit_sync->complete(-ENOENT);
@@ -2211,15 +2206,10 @@ void Objecter::_send_op_account(Op *op)
   inflight_ops.inc();
 
   // add to gather set(s)
-  if (op->onack) {
-    num_unacked.inc();
-  } else {
-    ldout(cct, 20) << " note: not requesting ack" << dendl;
-  }
-  if (op->oncommit || op->oncommit_sync) {
-    num_uncommitted.inc();
+  if (op->onfinish || op->oncommit_sync) {
+    num_in_flight.inc();
   } else {
-    ldout(cct, 20) << " note: not requesting commit" << dendl;
+    ldout(cct, 20) << " note: not requesting reply" << dendl;
   }
 
   logger->inc(l_osdc_op_active);
@@ -2398,8 +2388,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid)
   sl.unlock();
   put_session(s);
 
-  ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read()
-               << " uncommitted" << dendl;
+  ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl;
 }
 
 int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
@@ -2424,16 +2413,11 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
   ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
                 << dendl;
   Op *op = p->second;
-  if (op->onack) {
-    op->onack->complete(r);
-    op->onack = NULL;
-    num_unacked.dec();
-  }
-  if (op->oncommit || op->oncommit_sync)
-    num_uncommitted.dec();
-  if (op->oncommit) {
-    op->oncommit->complete(r);
-    op->oncommit = NULL;
+  if (op->onfinish || op->oncommit_sync)
+    num_in_flight.dec();
+  if (op->onfinish) {
+    op->onfinish->complete(r);
+    op->onfinish = NULL;
   }
   if (op->oncommit_sync) {
     op->oncommit_sync->complete(r);
@@ -2986,14 +2970,10 @@ void Objecter::_cancel_linger_op(Op *op)
   ldout(cct, 15) << "cancel_op " << op->tid << dendl;
 
   assert(!op->should_resend);
-  if (op->onack) {
-    delete op->onack;
-    num_unacked.dec();
-  }
-  if (op->oncommit || op->oncommit_sync) {
-    delete op->oncommit;
+  if (op->onfinish || op->oncommit_sync) {
+    delete op->onfinish;
     delete op->oncommit_sync;
-    num_uncommitted.dec();
+    num_in_flight.dec();
   }
 
   _finish_op(op, 0);
@@ -3044,10 +3024,8 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op)
 
   int flags = op->target.flags;
   flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
-  if (op->oncommit || op->oncommit_sync)
+  if (op->onfinish || op->oncommit_sync)
     flags |= CEPH_OSD_FLAG_ONDISK;
-  if (op->onack)
-    flags |= CEPH_OSD_FLAG_ACK;
 
   if (!honor_osdmap_full)
     flags |= CEPH_OSD_FLAG_FULL_FORCE;
@@ -3239,11 +3217,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   if (retry_writes_after_first_reply && op->attempts == 1 &&
       (op->target.flags & CEPH_OSD_FLAG_WRITE)) {
     ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
-    if (op->onack) {
-      num_unacked.dec();
-    }
-    if (op->oncommit || op->oncommit_sync) {
-      num_uncommitted.dec();
+    if (op->onfinish || op->oncommit_sync) {
+      num_in_flight.dec();
     }
     _session_op_remove(s, op);
     sl.unlock();
@@ -3272,17 +3247,14 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     // have, but that is better than doing callbacks out of order.
   }
 
-  Context *onack = 0;
-  Context *oncommit = 0;
+  Context *onfinish = 0;
 
   int rc = m->get_result();
 
   if (m->is_redirect_reply()) {
     ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
-    if (op->onack)
-      num_unacked.dec();
-    if (op->oncommit || op->oncommit_sync)
-      num_uncommitted.dec();
+    if (op->onfinish || op->oncommit_sync)
+      num_in_flight.dec();
     _session_op_remove(s, op);
     sl.unlock();
     put_session(s);
@@ -3362,43 +3334,34 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
     }
   }
 
-  // ack|commit -> ack
-  if (op->onack) {
-    ldout(cct, 15) << "handle_osd_op_reply ack" << dendl;
-    op->replay_version = m->get_replay_version();
-    onack = op->onack;
-    op->onack = 0;  // only do callback once
-    num_unacked.dec();
-    logger->inc(l_osdc_op_ack);
-  }
-  if (m->is_ondisk() || rc) {
-    if (op->oncommit) {
-      ldout(cct, 15) << "handle_osd_op_reply safe" << dendl;
-      oncommit = op->oncommit;
-      op->oncommit = NULL;
-      num_uncommitted.dec();
-      logger->inc(l_osdc_op_commit);
-    }
-    if (op->oncommit_sync) {
-      ldout(cct, 15) << "handle_osd_op_reply safe (sync)" << dendl;
-      op->oncommit_sync->complete(rc);
-      op->oncommit_sync = NULL;
-      num_uncommitted.dec();
-      logger->inc(l_osdc_op_commit);
-    }
+  // NOTE: we assume that since we only request ONDISK ever we will
+  // only ever get back one (type of) ack ever.
+
+  if (op->onfinish || op->oncommit_sync) {
+    num_in_flight.dec();
   }
+  if (op->onfinish) {
+    ldout(cct, 15) << "handle_osd_op_reply finish" << dendl;
+    onfinish = op->onfinish;
+    op->onfinish = NULL;
+  }
+  if (op->oncommit_sync) {
+    ldout(cct, 15) << "handle_osd_op_reply finish (sync)" << dendl;
+    op->oncommit_sync->complete(rc);
+    op->oncommit_sync = NULL;
+  }
+  logger->inc(l_osdc_op_reply);
 
   /* get it before we call _finish_op() */
   auto completion_lock = s->get_lock(op->target.base_oid);
 
   // done with this tid?
-  if (!op->onack && !op->oncommit && !op->oncommit_sync) {
+  if (!op->onfinish && !op->oncommit_sync) {
     ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
     _finish_op(op, 0);
   }
 
-  ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read()
-               << " uncommitted" << dendl;
+  ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl;
 
   // serialize completions
   if (completion_lock.mutex()) {
@@ -3407,11 +3370,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
   sl.unlock();
 
   // do callbacks
-  if (onack) {
-    onack->complete(rc);
-  }
-  if (oncommit) {
-    oncommit->complete(rc);
+  if (onfinish) {
+    onfinish->complete(rc);
   }
   if (completion_lock.mutex()) {
     completion_lock.unlock();
index 0cad6b1e486016ce1ab5c605026f36184b6429fd..375f8d16ccfbacdb548730004f9a89a787f3042a 100644 (file)
@@ -1124,8 +1124,7 @@ private:
   atomic_t inflight_ops;
   atomic_t client_inc;
   uint64_t max_linger_id;
-  atomic_t num_unacked;
-  atomic_t num_uncommitted;
+  atomic_t num_in_flight;
   atomic_t global_op_flags; // flags which are applied to each IO op
   bool keep_balanced_budget;
   bool honor_osdmap_full;
@@ -1233,7 +1232,7 @@ public:
     vector<int*> out_rval;
 
     int priority;
-    Context *onack, *oncommit;
+    Context *onfinish;
     uint64_t ontimeout;
     Context *oncommit_sync; // used internally by watch/notify
 
@@ -1266,7 +1265,7 @@ public:
     osd_reqid_t reqid; // explicitly setting reqid
 
     Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
-       int f, Context *ac, Context *co, version_t *ov, int *offset = NULL) :
+       int f, Context *fin, version_t *ov, int *offset = NULL) :
       session(NULL), incarnation(0),
       target(o, ol, f),
       con(NULL),
@@ -1274,8 +1273,7 @@ public:
       snapid(CEPH_NOSNAP),
       outbl(NULL),
       priority(0),
-      onack(ac),
-      oncommit(co),
+      onfinish(fin),
       ontimeout(0),
       oncommit_sync(NULL),
       tid(0),
@@ -1935,7 +1933,7 @@ private:
           double osd_timeout) :
     Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
     osdmap(new OSDMap), initialized(0), last_tid(0), client_inc(-1),
-    max_linger_id(0), num_unacked(0), num_uncommitted(0), global_op_flags(0),
+    max_linger_id(0), num_in_flight(0), global_op_flags(0),
     keep_balanced_budget(false), honor_osdmap_full(true),
     last_seen_osdmap_version(0), last_seen_pgmap_version(0),
     logger(NULL), tick_event(0), m_request_state_hook(NULL),
@@ -2158,7 +2156,7 @@ public:
     Context *oncommit, version_t *objver = NULL,
     osd_reqid_t reqid = osd_reqid_t()) {
     Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->priority = op.priority;
     o->mtime = mtime;
     o->snapc = snapc;
@@ -2186,7 +2184,7 @@ public:
     int *data_offset = NULL,
     uint64_t features = 0) {
     Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_READ, onack, NULL, objver, data_offset);
+                  CEPH_OSD_FLAG_READ, onack, objver, data_offset);
     o->priority = op.priority;
     o->snapid = snapid;
     o->outbl = pbl;
@@ -2219,7 +2217,7 @@ public:
     int *ctx_budget) {
     Op *o = new Op(object_t(), oloc,
                   op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ,
-                  onack, NULL, NULL);
+                  onack, NULL);
     o->target.precalc_pgid = true;
     o->target.base_pgid = pg_t(hash, oloc.pool);
     o->priority = op.priority;
@@ -2306,7 +2304,7 @@ public:
     ops[i].op.op = CEPH_OSD_OP_STAT;
     C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_READ, fin, 0, objver);
+                  CEPH_OSD_FLAG_READ, fin, objver);
     o->snapid = snap;
     o->outbl = &fin->bl;
     return o;
@@ -2337,7 +2335,7 @@ public:
     ops[i].op.extent.truncate_seq = 0;
     ops[i].op.flags = op_flags;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+                  CEPH_OSD_FLAG_READ, onfinish, objver);
     o->snapid = snap;
     o->outbl = pbl;
     return o;
@@ -2369,7 +2367,7 @@ public:
     ops[i].op.extent.truncate_seq = trunc_seq;
     ops[i].op.flags = op_flags;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+                  CEPH_OSD_FLAG_READ, onfinish, objver);
     o->snapid = snap;
     o->outbl = pbl;
     ceph_tid_t tid;
@@ -2388,7 +2386,7 @@ public:
     ops[i].op.extent.truncate_size = 0;
     ops[i].op.extent.truncate_seq = 0;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+                  CEPH_OSD_FLAG_READ, onfinish, objver);
     o->snapid = snap;
     o->outbl = pbl;
     ceph_tid_t tid;
@@ -2407,7 +2405,7 @@ public:
     if (name)
       ops[i].indata.append(name);
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+                  CEPH_OSD_FLAG_READ, onfinish, objver);
     o->snapid = snap;
     o->outbl = pbl;
     ceph_tid_t tid;
@@ -2424,7 +2422,7 @@ public:
     ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
     C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_READ, fin, 0, objver);
+                  CEPH_OSD_FLAG_READ, fin, objver);
     o->snapid = snap;
     o->outbl = &fin->bl;
     ceph_tid_t tid;
@@ -2448,7 +2446,7 @@ public:
                     Context *oncommit,
                     version_t *objver = NULL) {
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     ceph_tid_t tid;
@@ -2471,7 +2469,7 @@ public:
     ops[i].indata = bl;
     ops[i].op.flags = op_flags;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return o;
@@ -2504,7 +2502,7 @@ public:
     ops[i].op.extent.truncate_seq = 0;
     ops[i].indata = bl;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return o;
@@ -2539,7 +2537,7 @@ public:
     ops[i].indata = bl;
     ops[i].op.flags = op_flags;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     ceph_tid_t tid;
@@ -2560,7 +2558,7 @@ public:
     ops[i].indata = bl;
     ops[i].op.flags = op_flags;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return o;
@@ -2594,7 +2592,7 @@ public:
     ops[i].indata = bl;
     ops[i].op.flags = op_flags;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return o;
@@ -2627,7 +2625,7 @@ public:
     ops[i].op.extent.truncate_size = trunc_size;
     ops[i].op.extent.truncate_seq = trunc_seq;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     ceph_tid_t tid;
@@ -2644,7 +2642,7 @@ public:
     ops[i].op.extent.offset = off;
     ops[i].op.extent.length = len;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     ceph_tid_t tid;
@@ -2660,8 +2658,7 @@ public:
     int i = init_ops(ops, 1, extra_ops);
     ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
     ops[i].op.snap.snapid = snapid;
-    Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, NULL, oncommit,
-                  objver);
+    Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     ceph_tid_t tid;
@@ -2678,7 +2675,7 @@ public:
     ops[i].op.op = CEPH_OSD_OP_CREATE;
     ops[i].op.flags = create_flags;
     Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     ceph_tid_t tid;
@@ -2694,7 +2691,7 @@ public:
     int i = init_ops(ops, 1, extra_ops);
     ops[i].op.op = CEPH_OSD_OP_DELETE;
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     return o;
@@ -2725,7 +2722,7 @@ public:
       ops[i].indata.append(name);
     ops[i].indata.append(bl);
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     ceph_tid_t tid;
@@ -2745,7 +2742,7 @@ public:
     if (name)
       ops[i].indata.append(name);
     Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
-                  CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+                  CEPH_OSD_FLAG_WRITE, oncommit, objver);
     o->mtime = mtime;
     o->snapc = snapc;
     ceph_tid_t tid;