From 5dd68b95b1d2ac0e4972609ca497d4cff28ef351 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 18 Jul 2012 12:55:35 -0700 Subject: [PATCH] objecter: always resend linger registrations If a linger op (watch) is sent to the OSD and updates the object, and then the client loses the reply, it will resend the request. The OSD will see that it is a dup, however, and not set up the in-memory session state for the watch. This in turn will break the watch (i.e., notifies won't get delivered). Instead, always resend linger registration ops, so that we always have a unique reqid and do the correct session registeration for each session. * track the tid of the registation op for each LingerOp * mark registrations ops as should_resend=false; cancel as needed * when we send a new registration op, cancel the old one to ensure we ignore the reply. This is needed becuase we resend linger ops on any pg change, not just a primary change. * drop the first_send arg to send_linger(), as we can now infer that from register_tid == 0. The bug was easily reproduced with ms inject socket failures = 500 and the test_stress_watch utility. Fixes: #2796 Signed-off-by: Sage Weil Reviewed-by: Josh Durgin --- src/osdc/Objecter.cc | 135 ++++++++++++++++++++++++++++--------------- src/osdc/Objecter.h | 15 ++++- 2 files changed, 99 insertions(+), 51 deletions(-) diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 15614c4510cc2..829f20326dfc3 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -246,44 +246,49 @@ void Objecter::shutdown() } } -void Objecter::send_linger(LingerOp *info, bool first_send) +void Objecter::send_linger(LingerOp *info) { - if (!info->registering) { - ldout(cct, 15) << "send_linger " << info->linger_id << dendl; - vector ops = info->ops; // need to pass a copy to ops - Context *onack = (!info->registered && info->on_reg_ack) ? new C_Linger_Ack(this, info) : NULL; - Context *oncommit = new C_Linger_Commit(this, info); - Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ, - onack, oncommit, - info->pobjver); - o->snapid = info->snap; - - if (info->session) { - int r = recalc_op_target(o); - if (r == RECALC_OP_TARGET_POOL_DNE) { - linger_check_for_latest_map(info); - } - } - - if (first_send) { - op_submit(o); - } else { - _op_submit(o); + ldout(cct, 15) << "send_linger " << info->linger_id << dendl; + vector opv = info->ops; // need to pass a copy to ops + Context *onack = (!info->registered && info->on_reg_ack) ? new C_Linger_Ack(this, info) : NULL; + Context *oncommit = new C_Linger_Commit(this, info); + Op *o = new Op(info->oid, info->oloc, opv, info->flags | CEPH_OSD_FLAG_READ, + onack, oncommit, + info->pobjver); + o->snapid = info->snap; + + // do not resend this; we will send a new op to reregister + o->should_resend = false; + + if (info->session) { + int r = recalc_op_target(o); + if (r == RECALC_OP_TARGET_POOL_DNE) { + linger_check_for_latest_map(info); } + } - OSDSession *s = o->session; - if (info->session != s) { - info->session_item.remove_myself(); - info->session = s; - if (info->session) - s->linger_ops.push_back(&info->session_item); + if (info->register_tid) { + // repeat send. cancel old registeration op, if any. + if (ops.count(info->register_tid)) { + Op *o = ops[info->register_tid]; + cancel_op(o); } - info->registering = true; - - logger->inc(l_osdc_linger_send); + info->register_tid = _op_submit(o); } else { - ldout(cct, 15) << "send_linger " << info->linger_id << " already (re)registering" << dendl; + // first send + info->register_tid = op_submit(o); + } + + OSDSession *s = o->session; + if (info->session != s) { + info->session_item.remove_myself(); + info->session = s; + if (info->session) + s->linger_ops.push_back(&info->session_item); } + info->registering = true; + + logger->inc(l_osdc_linger_send); } void Objecter::_linger_ack(LingerOp *info, int r) @@ -348,7 +353,7 @@ tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc, logger->set(l_osdc_linger_active, linger_ops.size()); - send_linger(info, true); + send_linger(info); return info->linger_id; } @@ -450,6 +455,7 @@ void Objecter::handle_osd_map(MOSDMap *m) p != linger_ops.end(); p++) { LingerOp *op = p->second; + ldout(cct, 10) << " checking linger op " << op->linger_id << dendl; int r = recalc_linger_op_target(op); if (skipped_map) r = RECALC_OP_TARGET_NEED_RESEND; @@ -472,6 +478,7 @@ void Objecter::handle_osd_map(MOSDMap *m) p != ops.end(); ++p) { Op *op = p->second; + ldout(cct, 10) << " checking op " << op->tid << dendl; int r = recalc_op_target(op); if (skipped_map) r = RECALC_OP_TARGET_NEED_RESEND; @@ -541,16 +548,20 @@ void Objecter::handle_osd_map(MOSDMap *m) // resend requests for (map::iterator p = need_resend.begin(); p != need_resend.end(); p++) { Op *op = p->second; - if (op->session) { - logger->inc(l_osdc_op_resend); - send_op(op); + if (op->should_resend) { + if (op->session) { + logger->inc(l_osdc_op_resend); + send_op(op); + } + } else { + cancel_op(op); } } for (list::iterator p = need_resend_linger.begin(); p != need_resend_linger.end(); p++) { LingerOp *op = *p; if (op->session) { logger->inc(l_osdc_linger_resend); - send_linger(op, false); + send_linger(op); } } @@ -747,15 +758,21 @@ void Objecter::kick_requests(OSDSession *session) ldout(cct, 10) << "kick_requests for osd." << session->osd << dendl; // resend ops - for (xlist::iterator p = session->ops.begin(); !p.end(); ++p) { + for (xlist::iterator p = session->ops.begin(); !p.end();) { + Op *op = *p; + ++p; logger->inc(l_osdc_op_resend); - send_op(*p); + if (op->should_resend) { + send_op(op); + } else { + cancel_op(op); + } } // resend lingers for (xlist::iterator j = session->linger_ops.begin(); !j.end(); ++j) { logger->inc(l_osdc_linger_resend); - send_linger(*j, false); + send_linger(*j); } } @@ -1086,6 +1103,35 @@ bool Objecter::recalc_linger_op_target(LingerOp *linger_op) return RECALC_OP_TARGET_NO_ACTION; } +void Objecter::cancel_op(Op *op) +{ + ldout(cct, 15) << "cancel_op " << op->tid << dendl; + + // currently this only works for linger registrations, since we just + // throw out the callbacks. + assert(!op->should_resend); + delete op->onack; + delete op->oncommit; + + finish_op(op); +} + +void Objecter::finish_op(Op *op) +{ + ldout(cct, 15) << "finish_op " << op->tid << dendl; + + op->session_item.remove_myself(); + if (op->budgeted) + put_op_budget(op); + if (op->con) + op->con->put(); + + ops.erase(op->tid); + logger->set(l_osdc_op_active, ops.size()); + + delete op; +} + void Objecter::send_op(Op *op) { ldout(cct, 15) << "send_op " << op->tid << " to osd." << op->session->osd << dendl; @@ -1292,15 +1338,8 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) // done with this tid? if (!op->onack && !op->oncommit) { - op->session_item.remove_myself(); ldout(cct, 15) << "handle_osd_op_reply completed tid " << tid << dendl; - if (op->budgeted) - put_op_budget(op); - ops.erase(tid); - logger->set(l_osdc_op_active, ops.size()); - if (op->con) - op->con->put(); - delete op; + finish_op(op); } ldout(cct, 5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl; diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index 2d405f1963b02..3fa765c605729 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -602,6 +602,9 @@ public: bool budgeted; + /// true if we should resend this message on failure + bool should_resend; + Op(const object_t& o, const object_locator_t& ol, vector& op, int f, Context *ac, Context *co, eversion_t *ov) : session(NULL), session_item(this), incarnation(0), @@ -612,7 +615,8 @@ public: flags(f), priority(0), onack(ac), oncommit(co), tid(0), attempts(0), paused(false), objver(ov), reply_epoch(NULL), precalc_pgid(false), - budgeted(false) { + budgeted(false), + should_resend(true) { ops.swap(op); /* initialize out_* to match op vector */ @@ -779,10 +783,13 @@ public: OSDSession *session; xlist::item session_item; + tid_t register_tid; + LingerOp() : linger_id(0), flags(0), poutbl(NULL), pobjver(NULL), registering(false), registered(false), on_reg_ack(NULL), on_reg_commit(NULL), - session(NULL), session_item(this) {} + session(NULL), session_item(this), + register_tid(0) {} // no copy! const LingerOp &operator=(const LingerOp& r); @@ -857,6 +864,8 @@ public: map > > waiting_for_map; void send_op(Op *op); + void cancel_op(Op *op); + void finish_op(Op *op); bool is_pg_changed(vector& a, vector& b, bool any_change=false); enum recalc_op_target_result { RECALC_OP_TARGET_NO_ACTION = 0, @@ -866,7 +875,7 @@ public: int recalc_op_target(Op *op); bool recalc_linger_op_target(LingerOp *op); - void send_linger(LingerOp *info, bool first_send); + void send_linger(LingerOp *info); void _linger_ack(LingerOp *info, int r); void _linger_commit(LingerOp *info, int r); -- 2.39.5