}
}
-void Objecter::send_linger(LingerOp *info, bool first_send)
+void Objecter::send_linger(LingerOp *info)
{
- if (!info->registering) {
- ldout(cct, 15) << "send_linger " << info->linger_id << dendl;
- vector<OSDOp> ops = info->ops; // need to pass a copy to ops
- Context *onack = (!info->registered && info->on_reg_ack) ? new C_Linger_Ack(this, info) : NULL;
- Context *oncommit = new C_Linger_Commit(this, info);
- Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ,
- onack, oncommit,
- info->pobjver);
- o->snapid = info->snap;
-
- if (info->session) {
- int r = recalc_op_target(o);
- if (r == RECALC_OP_TARGET_POOL_DNE) {
- linger_check_for_latest_map(info);
- }
- }
-
- if (first_send) {
- op_submit(o);
- } else {
- _op_submit(o);
+ ldout(cct, 15) << "send_linger " << info->linger_id << dendl;
+ vector<OSDOp> opv = info->ops; // need to pass a copy to ops
+ Context *onack = (!info->registered && info->on_reg_ack) ? new C_Linger_Ack(this, info) : NULL;
+ Context *oncommit = new C_Linger_Commit(this, info);
+ Op *o = new Op(info->oid, info->oloc, opv, info->flags | CEPH_OSD_FLAG_READ,
+ onack, oncommit,
+ info->pobjver);
+ o->snapid = info->snap;
+
+ // do not resend this; we will send a new op to reregister
+ o->should_resend = false;
+
+ if (info->session) {
+ int r = recalc_op_target(o);
+ if (r == RECALC_OP_TARGET_POOL_DNE) {
+ linger_check_for_latest_map(info);
}
+ }
- OSDSession *s = o->session;
- if (info->session != s) {
- info->session_item.remove_myself();
- info->session = s;
- if (info->session)
- s->linger_ops.push_back(&info->session_item);
+ if (info->register_tid) {
+ // repeat send. cancel old registeration op, if any.
+ if (ops.count(info->register_tid)) {
+ Op *o = ops[info->register_tid];
+ cancel_op(o);
}
- info->registering = true;
-
- logger->inc(l_osdc_linger_send);
+ info->register_tid = _op_submit(o);
} else {
- ldout(cct, 15) << "send_linger " << info->linger_id << " already (re)registering" << dendl;
+ // first send
+ info->register_tid = op_submit(o);
+ }
+
+ OSDSession *s = o->session;
+ if (info->session != s) {
+ info->session_item.remove_myself();
+ info->session = s;
+ if (info->session)
+ s->linger_ops.push_back(&info->session_item);
}
+ info->registering = true;
+
+ logger->inc(l_osdc_linger_send);
}
void Objecter::_linger_ack(LingerOp *info, int r)
logger->set(l_osdc_linger_active, linger_ops.size());
- send_linger(info, true);
+ send_linger(info);
return info->linger_id;
}
p != linger_ops.end();
p++) {
LingerOp *op = p->second;
+ ldout(cct, 10) << " checking linger op " << op->linger_id << dendl;
int r = recalc_linger_op_target(op);
if (skipped_map)
r = RECALC_OP_TARGET_NEED_RESEND;
p != ops.end();
++p) {
Op *op = p->second;
+ ldout(cct, 10) << " checking op " << op->tid << dendl;
int r = recalc_op_target(op);
if (skipped_map)
r = RECALC_OP_TARGET_NEED_RESEND;
// resend requests
for (map<tid_t, Op*>::iterator p = need_resend.begin(); p != need_resend.end(); p++) {
Op *op = p->second;
- if (op->session) {
- logger->inc(l_osdc_op_resend);
- send_op(op);
+ if (op->should_resend) {
+ if (op->session) {
+ logger->inc(l_osdc_op_resend);
+ send_op(op);
+ }
+ } else {
+ cancel_op(op);
}
}
for (list<LingerOp*>::iterator p = need_resend_linger.begin(); p != need_resend_linger.end(); p++) {
LingerOp *op = *p;
if (op->session) {
logger->inc(l_osdc_linger_resend);
- send_linger(op, false);
+ send_linger(op);
}
}
ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl;
// resend ops
- for (xlist<Op*>::iterator p = session->ops.begin(); !p.end(); ++p) {
+ for (xlist<Op*>::iterator p = session->ops.begin(); !p.end();) {
+ Op *op = *p;
+ ++p;
logger->inc(l_osdc_op_resend);
- send_op(*p);
+ if (op->should_resend) {
+ send_op(op);
+ } else {
+ cancel_op(op);
+ }
}
// resend lingers
for (xlist<LingerOp*>::iterator j = session->linger_ops.begin(); !j.end(); ++j) {
logger->inc(l_osdc_linger_resend);
- send_linger(*j, false);
+ send_linger(*j);
}
}
return RECALC_OP_TARGET_NO_ACTION;
}
+void Objecter::cancel_op(Op *op)
+{
+ ldout(cct, 15) << "cancel_op " << op->tid << dendl;
+
+ // currently this only works for linger registrations, since we just
+ // throw out the callbacks.
+ assert(!op->should_resend);
+ delete op->onack;
+ delete op->oncommit;
+
+ finish_op(op);
+}
+
+void Objecter::finish_op(Op *op)
+{
+ ldout(cct, 15) << "finish_op " << op->tid << dendl;
+
+ op->session_item.remove_myself();
+ if (op->budgeted)
+ put_op_budget(op);
+ if (op->con)
+ op->con->put();
+
+ ops.erase(op->tid);
+ logger->set(l_osdc_op_active, ops.size());
+
+ delete op;
+}
+
void Objecter::send_op(Op *op)
{
ldout(cct, 15) << "send_op " << op->tid << " to osd." << op->session->osd << dendl;
// done with this tid?
if (!op->onack && !op->oncommit) {
- op->session_item.remove_myself();
ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl;
- if (op->budgeted)
- put_op_budget(op);
- ops.erase(tid);
- logger->set(l_osdc_op_active, ops.size());
- if (op->con)
- op->con->put();
- delete op;
+ finish_op(op);
}
ldout(cct, 5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
bool budgeted;
+ /// true if we should resend this message on failure
+ bool should_resend;
+
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
int f, Context *ac, Context *co, eversion_t *ov) :
session(NULL), session_item(this), incarnation(0),
flags(f), priority(0), onack(ac), oncommit(co),
tid(0), attempts(0),
paused(false), objver(ov), reply_epoch(NULL), precalc_pgid(false),
- budgeted(false) {
+ budgeted(false),
+ should_resend(true) {
ops.swap(op);
/* initialize out_* to match op vector */
OSDSession *session;
xlist<LingerOp*>::item session_item;
+ tid_t register_tid;
+
LingerOp() : linger_id(0), flags(0), poutbl(NULL), pobjver(NULL),
registering(false), registered(false),
on_reg_ack(NULL), on_reg_commit(NULL),
- session(NULL), session_item(this) {}
+ session(NULL), session_item(this),
+ register_tid(0) {}
// no copy!
const LingerOp &operator=(const LingerOp& r);
map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
void send_op(Op *op);
+ void cancel_op(Op *op);
+ void finish_op(Op *op);
bool is_pg_changed(vector<int>& a, vector<int>& b, bool any_change=false);
enum recalc_op_target_result {
RECALC_OP_TARGET_NO_ACTION = 0,
int recalc_op_target(Op *op);
bool recalc_linger_op_target(LingerOp *op);
- void send_linger(LingerOp *info, bool first_send);
+ void send_linger(LingerOp *info);
void _linger_ack(LingerOp *info, int r);
void _linger_commit(LingerOp *info, int r);