l_osdc_op_send,
l_osdc_op_send_bytes,
l_osdc_op_resend,
- l_osdc_op_ack,
- l_osdc_op_commit,
+ l_osdc_op_reply,
l_osdc_op,
l_osdc_op_r,
pcb.add_u64_counter(l_osdc_op_send, "op_send", "Sent operations");
pcb.add_u64_counter(l_osdc_op_send_bytes, "op_send_bytes", "Sent data");
pcb.add_u64_counter(l_osdc_op_resend, "op_resend", "Resent operations");
- pcb.add_u64_counter(l_osdc_op_ack, "op_ack", "Commit callbacks");
- pcb.add_u64_counter(l_osdc_op_commit, "op_commit", "Operation commits");
+ pcb.add_u64_counter(l_osdc_op_reply, "op_reply", "Operation reply");
pcb.add_u64_counter(l_osdc_op, "op", "Operations");
pcb.add_u64_counter(l_osdc_op_r, "op_r",
watchl.unlock();
Op *o = new Op(info->target.base_oid, info->target.base_oloc,
opv, info->target.flags | CEPH_OSD_FLAG_READ,
- NULL, NULL,
+ NULL,
info->pobjver);
o->oncommit_sync = oncommit;
o->outbl = poutbl;
ldout(cct, 10) << "check_op_pool_dne tid " << op->tid
<< " concluding pool " << op->target.base_pgid.pool()
<< " dne" << dendl;
- if (op->onack) {
- op->onack->complete(-ENOENT);
- }
- if (op->oncommit) {
- op->oncommit->complete(-ENOENT);
+ if (op->onfinish) {
+ op->onfinish->complete(-ENOENT);
}
if (op->oncommit_sync) {
op->oncommit_sync->complete(-ENOENT);
inflight_ops.inc();
// add to gather set(s)
- if (op->onack) {
- num_unacked.inc();
- } else {
- ldout(cct, 20) << " note: not requesting ack" << dendl;
- }
- if (op->oncommit || op->oncommit_sync) {
- num_uncommitted.inc();
+ if (op->onfinish || op->oncommit_sync) {
+ num_in_flight.inc();
} else {
- ldout(cct, 20) << " note: not requesting commit" << dendl;
+ ldout(cct, 20) << " note: not requesting reply" << dendl;
}
logger->inc(l_osdc_op_active);
sl.unlock();
put_session(s);
- ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read()
- << " uncommitted" << dendl;
+ ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl;
}
int Objecter::op_cancel(OSDSession *s, ceph_tid_t tid, int r)
ldout(cct, 10) << __func__ << " tid " << tid << " in session " << s->osd
<< dendl;
Op *op = p->second;
- if (op->onack) {
- op->onack->complete(r);
- op->onack = NULL;
- num_unacked.dec();
- }
- if (op->oncommit || op->oncommit_sync)
- num_uncommitted.dec();
- if (op->oncommit) {
- op->oncommit->complete(r);
- op->oncommit = NULL;
+ if (op->onfinish || op->oncommit_sync)
+ num_in_flight.dec();
+ if (op->onfinish) {
+ op->onfinish->complete(r);
+ op->onfinish = NULL;
}
if (op->oncommit_sync) {
op->oncommit_sync->complete(r);
ldout(cct, 15) << "cancel_op " << op->tid << dendl;
assert(!op->should_resend);
- if (op->onack) {
- delete op->onack;
- num_unacked.dec();
- }
- if (op->oncommit || op->oncommit_sync) {
- delete op->oncommit;
+ if (op->onfinish || op->oncommit_sync) {
+ delete op->onfinish;
delete op->oncommit_sync;
- num_uncommitted.dec();
+ num_in_flight.dec();
}
_finish_op(op, 0);
int flags = op->target.flags;
flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
- if (op->oncommit || op->oncommit_sync)
+ if (op->onfinish || op->oncommit_sync)
flags |= CEPH_OSD_FLAG_ONDISK;
- if (op->onack)
- flags |= CEPH_OSD_FLAG_ACK;
if (!honor_osdmap_full)
flags |= CEPH_OSD_FLAG_FULL_FORCE;
if (retry_writes_after_first_reply && op->attempts == 1 &&
(op->target.flags & CEPH_OSD_FLAG_WRITE)) {
ldout(cct, 7) << "retrying write after first reply: " << tid << dendl;
- if (op->onack) {
- num_unacked.dec();
- }
- if (op->oncommit || op->oncommit_sync) {
- num_uncommitted.dec();
+ if (op->onfinish || op->oncommit_sync) {
+ num_in_flight.dec();
}
_session_op_remove(s, op);
sl.unlock();
// have, but that is better than doing callbacks out of order.
}
- Context *onack = 0;
- Context *oncommit = 0;
+ Context *onfinish = 0;
int rc = m->get_result();
if (m->is_redirect_reply()) {
ldout(cct, 5) << " got redirect reply; redirecting" << dendl;
- if (op->onack)
- num_unacked.dec();
- if (op->oncommit || op->oncommit_sync)
- num_uncommitted.dec();
+ if (op->onfinish || op->oncommit_sync)
+ num_in_flight.dec();
_session_op_remove(s, op);
sl.unlock();
put_session(s);
}
}
- // ack|commit -> ack
- if (op->onack) {
- ldout(cct, 15) << "handle_osd_op_reply ack" << dendl;
- op->replay_version = m->get_replay_version();
- onack = op->onack;
- op->onack = 0; // only do callback once
- num_unacked.dec();
- logger->inc(l_osdc_op_ack);
- }
- if (m->is_ondisk() || rc) {
- if (op->oncommit) {
- ldout(cct, 15) << "handle_osd_op_reply safe" << dendl;
- oncommit = op->oncommit;
- op->oncommit = NULL;
- num_uncommitted.dec();
- logger->inc(l_osdc_op_commit);
- }
- if (op->oncommit_sync) {
- ldout(cct, 15) << "handle_osd_op_reply safe (sync)" << dendl;
- op->oncommit_sync->complete(rc);
- op->oncommit_sync = NULL;
- num_uncommitted.dec();
- logger->inc(l_osdc_op_commit);
- }
+ // NOTE: we assume that since we only request ONDISK ever we will
+ // only ever get back one (type of) ack ever.
+
+ if (op->onfinish || op->oncommit_sync) {
+ num_in_flight.dec();
}
+ if (op->onfinish) {
+ ldout(cct, 15) << "handle_osd_op_reply finish" << dendl;
+ onfinish = op->onfinish;
+ op->onfinish = NULL;
+ }
+ if (op->oncommit_sync) {
+ ldout(cct, 15) << "handle_osd_op_reply finish (sync)" << dendl;
+ op->oncommit_sync->complete(rc);
+ op->oncommit_sync = NULL;
+ }
+ logger->inc(l_osdc_op_reply);
/* get it before we call _finish_op() */
auto completion_lock = s->get_lock(op->target.base_oid);
// done with this tid?
- if (!op->onack && !op->oncommit && !op->oncommit_sync) {
+ if (!op->onfinish && !op->oncommit_sync) {
ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
_finish_op(op, 0);
}
- ldout(cct, 5) << num_unacked.read() << " unacked, " << num_uncommitted.read()
- << " uncommitted" << dendl;
+ ldout(cct, 5) << num_in_flight.read() << " in flight" << dendl;
// serialize completions
if (completion_lock.mutex()) {
sl.unlock();
// do callbacks
- if (onack) {
- onack->complete(rc);
- }
- if (oncommit) {
- oncommit->complete(rc);
+ if (onfinish) {
+ onfinish->complete(rc);
}
if (completion_lock.mutex()) {
completion_lock.unlock();
atomic_t inflight_ops;
atomic_t client_inc;
uint64_t max_linger_id;
- atomic_t num_unacked;
- atomic_t num_uncommitted;
+ atomic_t num_in_flight;
atomic_t global_op_flags; // flags which are applied to each IO op
bool keep_balanced_budget;
bool honor_osdmap_full;
vector<int*> out_rval;
int priority;
- Context *onack, *oncommit;
+ Context *onfinish;
uint64_t ontimeout;
Context *oncommit_sync; // used internally by watch/notify
osd_reqid_t reqid; // explicitly setting reqid
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
- int f, Context *ac, Context *co, version_t *ov, int *offset = NULL) :
+ int f, Context *fin, version_t *ov, int *offset = NULL) :
session(NULL), incarnation(0),
target(o, ol, f),
con(NULL),
snapid(CEPH_NOSNAP),
outbl(NULL),
priority(0),
- onack(ac),
- oncommit(co),
+ onfinish(fin),
ontimeout(0),
oncommit_sync(NULL),
tid(0),
double osd_timeout) :
Dispatcher(cct_), messenger(m), monc(mc), finisher(fin),
osdmap(new OSDMap), initialized(0), last_tid(0), client_inc(-1),
- max_linger_id(0), num_unacked(0), num_uncommitted(0), global_op_flags(0),
+ max_linger_id(0), num_in_flight(0), global_op_flags(0),
keep_balanced_budget(false), honor_osdmap_full(true),
last_seen_osdmap_version(0), last_seen_pgmap_version(0),
logger(NULL), tick_event(0), m_request_state_hook(NULL),
Context *oncommit, version_t *objver = NULL,
osd_reqid_t reqid = osd_reqid_t()) {
Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->priority = op.priority;
o->mtime = mtime;
o->snapc = snapc;
int *data_offset = NULL,
uint64_t features = 0) {
Op *o = new Op(oid, oloc, op.ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_READ, onack, NULL, objver, data_offset);
+ CEPH_OSD_FLAG_READ, onack, objver, data_offset);
o->priority = op.priority;
o->snapid = snapid;
o->outbl = pbl;
int *ctx_budget) {
Op *o = new Op(object_t(), oloc,
op.ops, flags | global_op_flags.read() | CEPH_OSD_FLAG_READ,
- onack, NULL, NULL);
+ onack, NULL);
o->target.precalc_pgid = true;
o->target.base_pgid = pg_t(hash, oloc.pool);
o->priority = op.priority;
ops[i].op.op = CEPH_OSD_OP_STAT;
C_Stat *fin = new C_Stat(psize, pmtime, onfinish);
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_READ, fin, 0, objver);
+ CEPH_OSD_FLAG_READ, fin, objver);
o->snapid = snap;
o->outbl = &fin->bl;
return o;
ops[i].op.extent.truncate_seq = 0;
ops[i].op.flags = op_flags;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+ CEPH_OSD_FLAG_READ, onfinish, objver);
o->snapid = snap;
o->outbl = pbl;
return o;
ops[i].op.extent.truncate_seq = trunc_seq;
ops[i].op.flags = op_flags;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+ CEPH_OSD_FLAG_READ, onfinish, objver);
o->snapid = snap;
o->outbl = pbl;
ceph_tid_t tid;
ops[i].op.extent.truncate_size = 0;
ops[i].op.extent.truncate_seq = 0;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+ CEPH_OSD_FLAG_READ, onfinish, objver);
o->snapid = snap;
o->outbl = pbl;
ceph_tid_t tid;
if (name)
ops[i].indata.append(name);
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_READ, onfinish, 0, objver);
+ CEPH_OSD_FLAG_READ, onfinish, objver);
o->snapid = snap;
o->outbl = pbl;
ceph_tid_t tid;
ops[i].op.op = CEPH_OSD_OP_GETXATTRS;
C_GetAttrs *fin = new C_GetAttrs(attrset, onfinish);
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_READ, fin, 0, objver);
+ CEPH_OSD_FLAG_READ, fin, objver);
o->snapid = snap;
o->outbl = &fin->bl;
ceph_tid_t tid;
Context *oncommit,
version_t *objver = NULL) {
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ceph_tid_t tid;
ops[i].indata = bl;
ops[i].op.flags = op_flags;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return o;
ops[i].op.extent.truncate_seq = 0;
ops[i].indata = bl;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return o;
ops[i].indata = bl;
ops[i].op.flags = op_flags;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ceph_tid_t tid;
ops[i].indata = bl;
ops[i].op.flags = op_flags;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return o;
ops[i].indata = bl;
ops[i].op.flags = op_flags;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return o;
ops[i].op.extent.truncate_size = trunc_size;
ops[i].op.extent.truncate_seq = trunc_seq;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ceph_tid_t tid;
ops[i].op.extent.offset = off;
ops[i].op.extent.length = len;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ceph_tid_t tid;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_ROLLBACK;
ops[i].op.snap.snapid = snapid;
- Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, NULL, oncommit,
- objver);
+ Op *o = new Op(oid, oloc, ops, CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ceph_tid_t tid;
ops[i].op.op = CEPH_OSD_OP_CREATE;
ops[i].op.flags = create_flags;
Op *o = new Op(oid, oloc, ops, global_flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ceph_tid_t tid;
int i = init_ops(ops, 1, extra_ops);
ops[i].op.op = CEPH_OSD_OP_DELETE;
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return o;
ops[i].indata.append(name);
ops[i].indata.append(bl);
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ceph_tid_t tid;
if (name)
ops[i].indata.append(name);
Op *o = new Op(oid, oloc, ops, flags | global_op_flags.read() |
- CEPH_OSD_FLAG_WRITE, NULL, oncommit, objver);
+ CEPH_OSD_FLAG_WRITE, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
ceph_tid_t tid;