From: Sage Weil Date: Wed, 21 Dec 2016 20:20:47 +0000 (-0500) Subject: osdc/Objecter: onack + oncommit -> onfinish etc X-Git-Tag: v12.0.0~247^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=540601c2b7dbafe91e6ebc9ab7d8cb797e319f12;p=ceph.git osdc/Objecter: onack + oncommit -> onfinish etc And num_unack + num_unsafe -> num_in_flight. l_osdc_op_ack + l_osdc_op_commit -> l_osdc_op_reply Signed-off-by: Sage Weil --- diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 410c6646995a..391934d7d067 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -70,8 +70,7 @@ enum { l_osdc_op_send, l_osdc_op_send_bytes, l_osdc_op_resend, - l_osdc_op_ack, - l_osdc_op_commit, + l_osdc_op_reply, l_osdc_op, l_osdc_op_r, @@ -247,8 +246,7 @@ void Objecter::init() pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations"); pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data"); pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations"); - pcb.add_u64_counter(l_osdc_op_ack, "op_ack", "Commit callbacks"); - pcb.add_u64_counter(l_osdc_op_commit, "op_commit", "Operation commits"); + pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply"); pcb.add_u64_counter(l_osdc_op, "op", "Operations"); pcb.add_u64_counter(l_osdc_op_r, "op_r", @@ -555,7 +553,7 @@ void Objecter::_send_linger(LingerOp *info, watchl.unlock(); Op *o = new Op(info->target.base_oid, info->target.base_oloc, opv, info->target.flags | CEPH_OSD_FLAG_READ, - NULL, NULL, + NULL, info->pobjver); o->oncommit_sync = oncommit; o->outbl = poutbl; @@ -1457,11 +1455,8 @@ void Objecter::_check_op_pool_dne(Op *op, unique_lock& sl) ldout(cct, 10) << "check_op_pool_dne tid " << op->tid << " concluding pool " << op->target.base_pgid.pool() << " dne" << dendl; - if (op->onack) { - op->onack->complete(-ENOENT); - } - if (op->oncommit) { - op->oncommit->complete(-ENOENT); + if (op->onfinish) { + op->onfinish->complete(-ENOENT); } if (op->oncommit_sync) { op->oncommit_sync->complete(-ENOENT); @@ -2211,15 +2206,10 @@ void Objecter::_send_op_account(Op *op) inflight_ops.inc(); // add to gather set(s) - if (op->onack) { - num_unacked.inc(); - } else { - ldout(cct, 20) << " note: not requesting ack" << dendl; - } - if (op->oncommit || op->oncommit_sync) { - num_uncommitted.inc(); + if (op->onfinish || op->oncommit_sync) { + num_in_flight.inc(); } else { - ldout(cct, 20) << " note: not requesting commit" << dendl; + ldout(cct, 20) << " note: not requesting reply" << dendl; } logger->inc(l_osdc_op_active); @@ -2398,8 +2388,7 @@ void Objecter::_op_submit(Op *op, shunique_lock& sul, ceph_tid_t *ptid) sl.unlock(); put_session(s); - ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() - << " uncommitted" << dendl; + ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl; } int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) @@ -2424,16 +2413,11 @@ int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r) ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd << dendl; Op *op = p->second; - if (op->onack) { - op->onack->complete(r); - op->onack = NULL; - num_unacked.dec(); - } - if (op->oncommit || op->oncommit_sync) - num_uncommitted.dec(); - if (op->oncommit) { - op->oncommit->complete(r); - op->oncommit = NULL; + if (op->onfinish || op->oncommit_sync) + num_in_flight.dec(); + if (op->onfinish) { + op->onfinish->complete(r); + op->onfinish = NULL; } if (op->oncommit_sync) { op->oncommit_sync->complete(r); @@ -2986,14 +2970,10 @@ void Objecter::_cancel_linger_op(Op *op) ldout(cct, 15) << "cancel_op " << op->tid << dendl; assert(!op->should_resend); - if (op->onack) { - delete op->onack; - num_unacked.dec(); - } - if (op->oncommit || op->oncommit_sync) { - delete op->oncommit; + if (op->onfinish || op->oncommit_sync) { + delete op->onfinish; delete op->oncommit_sync; - num_uncommitted.dec(); + num_in_flight.dec(); } _finish_op(op, 0); @@ -3044,10 +3024,8 @@ MOSDOp *Objecter::_prepare_osd_op(Op *op) int flags = op->target.flags; flags |= CEPH_OSD_FLAG_KNOWN_REDIR; - if (op->oncommit || op->oncommit_sync) + if (op->onfinish || op->oncommit_sync) flags |= CEPH_OSD_FLAG_ONDISK; - if (op->onack) - flags |= CEPH_OSD_FLAG_ACK; if (!honor_osdmap_full) flags |= CEPH_OSD_FLAG_FULL_FORCE; @@ -3239,11 +3217,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) if (retry_writes_after_first_reply && op->attempts == 1 && (op->target.flags & CEPH_OSD_FLAG_WRITE)) { ldout(cct, 7) << "retrying write after first reply: " << tid << dendl; - if (op->onack) { - num_unacked.dec(); - } - if (op->oncommit || op->oncommit_sync) { - num_uncommitted.dec(); + if (op->onfinish || op->oncommit_sync) { + num_in_flight.dec(); } _session_op_remove(s, op); sl.unlock(); @@ -3272,17 +3247,14 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) // have, but that is better than doing callbacks out of order. } - Context *onack = 0; - Context *oncommit = 0; + Context *onfinish = 0; int rc = m->get_result(); if (m->is_redirect_reply()) { ldout(cct, 5) << " got redirect reply; redirecting" << dendl; - if (op->onack) - num_unacked.dec(); - if (op->oncommit || op->oncommit_sync) - num_uncommitted.dec(); + if (op->onfinish || op->oncommit_sync) + num_in_flight.dec(); _session_op_remove(s, op); sl.unlock(); put_session(s); @@ -3362,43 +3334,34 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) } } - // ack|commit -> ack - if (op->onack) { - ldout(cct, 15) << "handle_osd_op_reply ack" << dendl; - op->replay_version = m->get_replay_version(); - onack = op->onack; - op->onack = 0; // only do callback once - num_unacked.dec(); - logger->inc(l_osdc_op_ack); - } - if (m->is_ondisk() || rc) { - if (op->oncommit) { - ldout(cct, 15) << "handle_osd_op_reply safe" << dendl; - oncommit = op->oncommit; - op->oncommit = NULL; - num_uncommitted.dec(); - logger->inc(l_osdc_op_commit); - } - if (op->oncommit_sync) { - ldout(cct, 15) << "handle_osd_op_reply safe (sync)" << dendl; - op->oncommit_sync->complete(rc); - op->oncommit_sync = NULL; - num_uncommitted.dec(); - logger->inc(l_osdc_op_commit); - } + // NOTE: we assume that since we only request ONDISK ever we will + // only ever get back one (type of) ack ever. + + if (op->onfinish || op->oncommit_sync) { + num_in_flight.dec(); } + if (op->onfinish) { + ldout(cct, 15) << "handle_osd_op_reply finish" << dendl; + onfinish = op->onfinish; + op->onfinish = NULL; + } + if (op->oncommit_sync) { + ldout(cct, 15) << "handle_osd_op_reply finish (sync)" << dendl; + op->oncommit_sync->complete(rc); + op->oncommit_sync = NULL; + } + logger->inc(l_osdc_op_reply); /* get it before we call _finish_op() */ auto completion_lock = s->get_lock(op->target.base_oid); // done with this tid? - if (!op->onack && !op->oncommit && !op->oncommit_sync) { + if (!op->onfinish && !op->oncommit_sync) { ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl; _finish_op(op, 0); } - ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read() - << " uncommitted" << dendl; + ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl; // serialize completions if (completion_lock.mutex()) { @@ -3407,11 +3370,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) sl.unlock(); // do callbacks - if (onack) { - onack->complete(rc); - } - if (oncommit) { - oncommit->complete(rc); + if (onfinish) { + onfinish->complete(rc); } if (completion_lock.mutex()) { completion_lock.unlock(); diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 0cad6b1e4860..375f8d16ccfb 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -1124,8 +1124,7 @@ private: atomic_t inflight_ops; atomic_t client_inc; uint64_t max_linger_id; - atomic_t num_unacked; - atomic_t num_uncommitted; + atomic_t num_in_flight; atomic_t global_op_flags; // flags which are applied to each IO op bool keep_balanced_budget; bool honor_osdmap_full; @@ -1233,7 +1232,7 @@ public: vector out_rval; int priority; - Context *onack, *oncommit; + Context *onfinish; uint64_t ontimeout; Context *oncommit_sync; // used internally by watch/notify @@ -1266,7 +1265,7 @@ public: osd_reqid_t reqid; // explicitly setting reqid Op(const object_t& o, const object_locator_t& ol, vector& op, - int f, Context *ac, Context *co, version_t *ov, int *offset = NULL) : + int f, Context *fin, version_t *ov, int *offset = NULL) : session(NULL), incarnation(0), target(o, ol, f), con(NULL), @@ -1274,8 +1273,7 @@ public: snapid(CEPH_NOSNAP), outbl(NULL), priority(0), - onack(ac), - oncommit(co), + onfinish(fin), ontimeout(0), oncommit_sync(NULL), tid(0), @@ -1935,7 +1933,7 @@ private: double osd_timeout) : Dispatcher(cct_), messenger(m), monc(mc), finisher(fin), osdmap(new OSDMap), initialized(0), last_tid(0), client_inc(-1), - max_linger_id(0), num_unacked(0), num_uncommitted(0), global_op_flags(0), + max_linger_id(0), num_in_flight(0), global_op_flags(0), keep_balanced_budget(false), honor_osdmap_full(true), last_seen_osdmap_version(0), last_seen_pgmap_version(0), logger(NULL), tick_event(0), m_request_state_hook(NULL), @@ -2158,7 +2156,7 @@ public: Context *oncommit, version_t *objver = NULL, osd_reqid_t reqid = osd_reqid_t()) { Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->priority = op.priority; o->mtime = mtime; o->snapc = snapc; @@ -2186,7 +2184,7 @@ public: int *data_offset = NULL, uint64_t features = 0) { Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_READ, onack, NULL, objver, data_offset); + CEPH_OSD_FLAG_READ, onack, objver, data_offset); o->priority = op.priority; o->snapid = snapid; o->outbl = pbl; @@ -2219,7 +2217,7 @@ public: int *ctx_budget) { Op *o = new Op(object_t(), oloc, op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ, - onack, NULL, NULL); + onack, NULL); o->target.precalc_pgid = true; o->target.base_pgid = pg_t(hash, oloc.pool); o->priority = op.priority; @@ -2306,7 +2304,7 @@ public: ops[i].op.op = CEPH_OSD_OP_STAT; C_Stat *fin = new C_Stat(psize, pmtime, onfinish); Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_READ, fin, 0, objver); + CEPH_OSD_FLAG_READ, fin, objver); o->snapid = snap; o->outbl = &fin->bl; return o; @@ -2337,7 +2335,7 @@ public: ops[i].op.extent.truncate_seq = 0; ops[i].op.flags = op_flags; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_READ, onfinish, 0, objver); + CEPH_OSD_FLAG_READ, onfinish, objver); o->snapid = snap; o->outbl = pbl; return o; @@ -2369,7 +2367,7 @@ public: ops[i].op.extent.truncate_seq = trunc_seq; ops[i].op.flags = op_flags; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_READ, onfinish, 0, objver); + CEPH_OSD_FLAG_READ, onfinish, objver); o->snapid = snap; o->outbl = pbl; ceph_tid_t tid; @@ -2388,7 +2386,7 @@ public: ops[i].op.extent.truncate_size = 0; ops[i].op.extent.truncate_seq = 0; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_READ, onfinish, 0, objver); + CEPH_OSD_FLAG_READ, onfinish, objver); o->snapid = snap; o->outbl = pbl; ceph_tid_t tid; @@ -2407,7 +2405,7 @@ public: if (name) ops[i].indata.append(name); Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_READ, onfinish, 0, objver); + CEPH_OSD_FLAG_READ, onfinish, objver); o->snapid = snap; o->outbl = pbl; ceph_tid_t tid; @@ -2424,7 +2422,7 @@ public: ops[i].op.op = CEPH_OSD_OP_GETXATTRS; C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish); Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_READ, fin, 0, objver); + CEPH_OSD_FLAG_READ, fin, objver); o->snapid = snap; o->outbl = &fin->bl; ceph_tid_t tid; @@ -2448,7 +2446,7 @@ public: Context *oncommit, version_t *objver = NULL) { Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; ceph_tid_t tid; @@ -2471,7 +2469,7 @@ public: ops[i].indata = bl; ops[i].op.flags = op_flags; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return o; @@ -2504,7 +2502,7 @@ public: ops[i].op.extent.truncate_seq = 0; ops[i].indata = bl; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return o; @@ -2539,7 +2537,7 @@ public: ops[i].indata = bl; ops[i].op.flags = op_flags; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; ceph_tid_t tid; @@ -2560,7 +2558,7 @@ public: ops[i].indata = bl; ops[i].op.flags = op_flags; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return o; @@ -2594,7 +2592,7 @@ public: ops[i].indata = bl; ops[i].op.flags = op_flags; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return o; @@ -2627,7 +2625,7 @@ public: ops[i].op.extent.truncate_size = trunc_size; ops[i].op.extent.truncate_seq = trunc_seq; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; ceph_tid_t tid; @@ -2644,7 +2642,7 @@ public: ops[i].op.extent.offset = off; ops[i].op.extent.length = len; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; ceph_tid_t tid; @@ -2660,8 +2658,7 @@ public: int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_ROLLBACK; ops[i].op.snap.snapid = snapid; - Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, NULL, oncommit, - objver); + Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; ceph_tid_t tid; @@ -2678,7 +2675,7 @@ public: ops[i].op.op = CEPH_OSD_OP_CREATE; ops[i].op.flags = create_flags; Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; ceph_tid_t tid; @@ -2694,7 +2691,7 @@ public: int i = init_ops(ops, 1, extra_ops); ops[i].op.op = CEPH_OSD_OP_DELETE; Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return o; @@ -2725,7 +2722,7 @@ public: ops[i].indata.append(name); ops[i].indata.append(bl); Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; ceph_tid_t tid; @@ -2745,7 +2742,7 @@ public: if (name) ops[i].indata.append(name); Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() | - CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver); + CEPH_OSD_FLAG_WRITE, oncommit, objver); o->mtime = mtime; o->snapc = snapc; ceph_tid_t tid;