From: Sage Weil Date: Wed, 15 Dec 2010 00:15:04 +0000 (-0800) Subject: objecter: track pending requests by osd, not pg X-Git-Tag: v0.25~451^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=065cdf523ee9b560da218e8a3918419dbd683669;p=ceph.git objecter: track pending requests by osd, not pg This is a big cleanup. Also - switch to keeping per-osd Connection *'s - make requests time out independently (not very efficiently yet) Signed-off-by: Sage Weil --- diff --git a/src/osdc/Objecter.cc b/src/osdc/Objecter.cc index 349762699b88..5fcf3ba97efc 100644 --- a/src/osdc/Objecter.cc +++ b/src/osdc/Objecter.cc @@ -58,8 +58,10 @@ void Objecter::shutdown() { } -tid_t Objecter::resend_linger(LingerOp *info) +tid_t Objecter::send_linger(LingerOp *info) { + dout(15) << "send_linger " << info->linger_id << dendl; + vector ops = info->ops; // need to pass a copy to ops Context *onack = NULL; Context *oncommit = NULL; @@ -71,7 +73,7 @@ tid_t Objecter::resend_linger(LingerOp *info) } Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ, onack, oncommit, - info->pobjver, true); + info->pobjver); o->snapid = info->snap; return op_submit(o, info); } @@ -106,7 +108,7 @@ void Objecter::unregister_linger(uint64_t linger_id) map::iterator iter = op_linger_info.find(linger_id); if (iter != op_linger_info.end()) { LingerOp *info = iter->second; - info->pg_item.remove_myself(); + info->session_item.remove_myself(); op_linger_info.erase(iter); delete info; } @@ -133,7 +135,7 @@ tid_t Objecter::linger(const object_t& oid, const object_locator_t& oloc, info->linger_id = ++max_linger_id; op_linger_info[info->linger_id] = info; - resend_linger(info); + send_linger(info); return info->linger_id; } @@ -177,6 +179,10 @@ void Objecter::handle_osd_map(MOSDMap *m) return; } + bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD); + bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR); + bool was_full = osdmap->test_flag(CEPH_OSDMAP_FULL); + if (m->get_last() <= osdmap->get_epoch()) { dout(3) << "handle_osd_map ignoring epochs [" << m->get_first() << "," << m->get_last() @@ -188,18 +194,12 @@ void Objecter::handle_osd_map(MOSDMap *m) << "] > " << osdmap->get_epoch() << dendl; - set changed_pgs; - if (osdmap->get_epoch()) { // we want incrementals for (epoch_t e = osdmap->get_epoch() + 1; e <= m->get_last(); e++) { - - bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD); - bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR); - bool was_full = osdmap->test_flag(CEPH_OSDMAP_FULL); - + if (m->incremental_maps.count(e)) { dout(3) << "handle_osd_map decoding incremental epoch " << e << dendl; OSDMap::Incremental inc(m->incremental_maps[e]); @@ -222,26 +222,6 @@ void Objecter::handle_osd_map(MOSDMap *m) break; } - if (was_pauserd || was_pausewr || was_full) - maybe_request_map(); - - // scan pgs for changes - scan_pgs(changed_pgs); - - // kick paused - if ((was_pauserd && !osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) || - (was_pausewr && !osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) || - (was_full && !osdmap->test_flag(CEPH_OSDMAP_FULL))) { - for (hash_map::iterator p = op_osd.begin(); - p != op_osd.end(); - p++) { - if (p->second->paused) { - p->second->paused = false; - op_submit(p->second); - } - } - } - dump_active(); assert(e == osdmap->get_epoch()); } @@ -250,25 +230,103 @@ void Objecter::handle_osd_map(MOSDMap *m) if (m->maps.count(m->get_last())) { dout(3) << "handle_osd_map decoding full epoch " << m->get_last() << dendl; osdmap->decode(m->maps[m->get_last()]); - - scan_pgs(changed_pgs); } else { dout(3) << "handle_osd_map hmm, i want a full map, requesting" << dendl; monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME); monc->renew_subs(); } } + } - // kick requests who might be timing out on the wrong osds - if (!changed_pgs.empty()) - kick_requests(changed_pgs); + if (was_pauserd || was_pausewr || was_full) + maybe_request_map(); + + bool kick_paused = + (was_pauserd && !osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) || + (was_pausewr && !osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) || + (was_full && !osdmap->test_flag(CEPH_OSDMAP_FULL)); + + // osd addr changes? + for (map::iterator p = osd_sessions.begin(); + p != osd_sessions.end(); + p++) { + OSDSession *s = p->second; + if (osdmap->is_up(s->osd)) { + entity_inst_t inst = osdmap->get_inst(s->osd); + if (s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr) { + dout(10) << " reopening osd" << s->osd << " session, addr now " << inst << dendl; + messenger->mark_down(s->con); + s->con->put(); + s->con = messenger->get_connection(inst); + s->incarnation++; + } + } + } + + // active requests + for (hash_map::iterator p = op_osd.begin(); + p != op_osd.end(); + p++) { + Op *op = p->second; + + // calc target + vector acting; + if (op->oid.name.length()) + op->pgid = osdmap->object_locator_to_pg(op->oid, op->oloc); + osdmap->pg_to_acting_osds(op->pgid, acting); + OSDSession *s = 0; + + if (acting.size()) + s = get_session(acting[0]); + if (op->session != s) { + dout(10) << " redirecting tid " << op->tid << " to osd" << (s ? s->osd : -1) << dendl; + if (!op->session) + num_homeless_ops--; + op->session_item.remove_myself(); + op->session = s; + if (s) { + op->session->ops.push_back(&op->session_item); + send_op(op); + } else { + num_homeless_ops++; + } + } else if (s) { + if (op->incarnation != s->incarnation) { + dout(10) << " resending tid " << op->tid << " to (reopened) osd" << s->osd << dendl; + send_op(op); + } else if (op->paused && kick_paused) { + dout(10) << " kicking paused tid " << op->tid << " on osd" << s->osd << dendl; + send_op(op); + } + } } - //now check if the map is full -- we want to subscribe if it is! + // lingers + for (map::iterator p = op_linger_info.begin(); + p != op_linger_info.end(); + p++) { + LingerOp *op = p->second; + vector acting; + pg_t pgid = osdmap->object_locator_to_pg(op->oid, op->oloc); + osdmap->pg_to_acting_osds(pgid, acting); + OSDSession *s = acting.size() ? get_session(acting[0]) : NULL; + if (op->session != s) { + dout(10) << " redirecting linger id " << op->linger_id << " to osd" << (s ? s->osd : -1) << dendl; + op->session_item.remove_myself(); + op->session = s; + if (s) { + send_linger(op); + } + } + } + + dump_active(); + + // now check if the map is full -- we want to subscribe if it is! if (osdmap->test_flag(CEPH_OSDMAP_FULL) & CEPH_OSDMAP_FULL) maybe_request_map(); - //finish any Contexts that were waiting on a map update + // finish any Contexts that were waiting on a map update map > >::iterator p = waiting_for_map.begin(); while (p != waiting_for_map.end() && @@ -316,123 +374,17 @@ void Objecter::maybe_request_map() } -Objecter::PG &Objecter::get_pg(pg_t pgid) -{ - if (!pg_map.count(pgid)) { - osdmap->pg_to_acting_osds(pgid, pg_map[pgid].acting); - dout(10) << "get_pg " << pgid << " is new, " << pg_map[pgid].acting << dendl; - } else { - dout(10) << "get_pg " << pgid << " is old, " << pg_map[pgid].acting << dendl; - } - return pg_map[pgid]; -} - - -void Objecter::scan_pgs_for(set& pgs, int osd) -{ - dout(10) << "scan_pgs_for osd" << osd << dendl; - - for (hash_map::iterator i = pg_map.begin(); - i != pg_map.end(); - i++) { - pg_t pgid = i->first; - PG& pg = i->second; - if (pg.acting.size() && pg.acting[0] == osd) - pgs.insert(pgid); - } -} - -void Objecter::scan_pgs(set& changed_pgs) +void Objecter::kick_requests(OSDSession *session) { - dout(10) << "scan_pgs" << dendl; - - for (hash_map::iterator i = pg_map.begin(); - i != pg_map.end(); - i++) { - pg_t pgid = i->first; - PG& pg = i->second; - - // calc new. - vector other; - osdmap->pg_to_acting_osds(pgid, other); - - if (other == pg.acting) - continue; // no change. + dout(10) << "kick_requests for osd" << session->osd << dendl; - dout(10) << "scan_pgs " << pgid << " " << pg.acting << " -> " << other << dendl; - - other.swap(pg.acting); - - if (other.size() && pg.acting.size() && - other[0] == pg.acting[0]) - continue; // same primary. - - // changed significantly. - dout(10) << "scan_pgs pg " << pgid - << " (" << pg.active_tids << ")" - << " " << other << " -> " << pg.acting - << dendl; - changed_pgs.insert(pgid); - } -} + // resend ops + for (xlist::iterator p = session->ops.begin(); !p.end(); ++p) + send_op(*p); -void Objecter::kick_requests(set& changed_pgs) -{ - dout(10) << "kick_requests in pgs " << changed_pgs << dendl; - dout(0) << "*** kick_requests in pgs " << changed_pgs << dendl; - - for (set::iterator i = changed_pgs.begin(); - i != changed_pgs.end(); - i++) { - pg_t pgid = *i; - PG& pg = pg_map[pgid]; - - // resubmit ops! - set tids; - tids.swap( pg.active_tids ); - - // resend lingers - for (xlist::iterator j = pg.linger_ops.begin(); !j.end(); ++j) - resend_linger(*j); - - if (pg.linger_ops.empty()) - close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing - - dout(10) << "kick_requests pg " << pgid << " tids " << tids << dendl; - for (set::iterator p = tids.begin(); - p != tids.end(); - p++) { - tid_t tid = *p; - - hash_map::iterator p = op_osd.find(tid); - if (p != op_osd.end()) { - Op *op = p->second; - put_op_budget(op); - op_osd.erase(p); - - if (op->onack) - num_unacked--; - if (op->oncommit) - num_uncommitted--; - - dout(0) << "kick_requests tid=" << tid << " linger=" << op->linger << dendl; - // WRITE - if (op->onack) { - dout(3) << "kick_requests missing ack, resub " << tid << dendl; - op_submit(op, false); - } else { - if (!op->linger) { - assert(op->oncommit); - dout(3) << "kick_requests missing commit, resub " << tid << dendl; - dout(0) << "kick_requests missing commit, resub " << tid << dendl; - op_submit(op, false); - } - } - } else - assert(0); - } - } - dout(0) << "*** kick_requests done" << dendl; + // resend lingers + for (xlist::iterator j = session->linger_ops.begin(); !j.end(); ++j) + send_linger(*j); } @@ -440,31 +392,34 @@ void Objecter::tick() { dout(10) << "tick" << dendl; - set ping; + set toping; - // look for laggy pgs + // look for laggy requests utime_t cutoff = g_clock.now(); cutoff -= g_conf.objecter_timeout; // timeout - for (hash_map::iterator i = pg_map.begin(); - i != pg_map.end(); - i++) { - if (!i->second.active_tids.empty() && - i->second.last < cutoff) { - dout(1) << " pg " << i->first << " on " << i->second.acting - << " is laggy: " << i->second.active_tids << dendl; - maybe_request_map(); - //break; - - // send a ping to this osd, to ensure we detect any session resets - // (osd reply message policy is lossy) - if (i->second.acting.size()) - ping.insert(i->second.acting[0]); + + for (hash_map::iterator p = op_osd.begin(); + p != op_osd.end(); + p++) { + Op *op = p->second; + if (op->session && op->stamp < cutoff) { + dout(2) << " tid " << p->first << " on osd" << op->session->osd << " is laggy" << dendl; + toping.insert(op->session); } } - for (set::iterator p = ping.begin(); p != ping.end(); p++) - messenger->send_message(new MPing, osdmap->get_inst(*p)); + if (num_homeless_ops || !toping.empty()) + maybe_request_map(); + if (!toping.empty()) { + // send a ping to these osds, to ensure we detect any session resets + // (osd reply message policy is lossy) + for (set::iterator i = toping.begin(); + i != toping.end(); + i++) + messenger->send_message(new MPing, osdmap->get_inst((*i)->osd)); + } + // reschedule timer.add_event_after(g_conf.objecter_tick_interval, new C_Tick(this)); } @@ -501,19 +456,26 @@ tid_t Objecter::op_submit(Op *op, LingerOp *linger_op) // take_op_budget() may drop our lock while it blocks. take_op_budget(op); - if (op->oid.name.length()) - op->pgid = osdmap->object_locator_to_pg(op->oid, op->oloc); - - // find - PG &pg = get_pg(op->pgid); - - if (linger_op) - pg.linger_ops.push_back(&linger_op->pg_item); - // pick tid op->tid = ++last_tid; assert(client_inc >= 0); + // pick target + vector acting; + if (op->oid.name.length()) + op->pgid = osdmap->object_locator_to_pg(op->oid, op->oloc); + osdmap->pg_to_acting_osds(op->pgid, acting); + + if (acting.size()) { + op->session = get_session(acting[0]); + op->session->ops.push_back(&op->session_item); + if (linger_op) + op->session->linger_ops.push_back(&linger_op->session_item); + } else { + op->session = NULL; + num_homeless_ops++; + } + // add to gather set(s) int flags = op->flags; if (op->onack) { @@ -530,19 +492,11 @@ tid_t Objecter::op_submit(Op *op, LingerOp *linger_op) } op_osd[op->tid] = op; - pg.active_tids.insert(op->tid); - pg.last = g_clock.now(); - // send? - dout(0) << "op_submit oid " << op->oid - << " " << op->oloc - << " " << op->ops << " tid " << op->tid - << " osd" << pg.primary() - << dendl; dout(10) << "op_submit oid " << op->oid << " " << op->oloc << " " << op->ops << " tid " << op->tid - << " osd" << pg.primary() + << " osd" << (op->session ? op->session->osd : -1) << dendl; assert(op->flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)); @@ -562,56 +516,69 @@ tid_t Objecter::op_submit(Op *op, LingerOp *linger_op) dout(10) << " FULL, paused modify " << op << " tid " << last_tid << dendl; op->paused = true; maybe_request_map(); - } else if (pg.primary() >= 0) { - int flags = op->flags; - if (op->oncommit) - flags |= CEPH_OSD_FLAG_ONDISK; - if (op->onack) - flags |= CEPH_OSD_FLAG_ACK; + } else if (op->session) { + send_op(op); + } else + maybe_request_map(); + + dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl; + + return op->tid; +} - if (op->con) { - if (op->outbl && op->outbl->length()) { - dout(20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl; - op->con->revoke_rx_buffer(op->tid); - } - op->con->put(); - } - op->con = messenger->get_connection(osdmap->get_inst(pg.primary())); - assert(op->con); - if (op->outbl && op->outbl->length()) { - dout(20) << " posting rx buffer for " << op->tid << " on " << op->con << dendl; - op->con->post_rx_buffer(op->tid, *op->outbl); - } +void Objecter::send_op(Op *op) +{ + dout(15) << "send_op " << op->tid << " to osd" << op->session->osd << dendl; + + int flags = op->flags; + if (op->oncommit) + flags |= CEPH_OSD_FLAG_ONDISK; + if (op->onack) + flags |= CEPH_OSD_FLAG_ACK; + + if (!op->session->con) + op->session->con = messenger->get_connection(osdmap->get_inst(op->session->osd)); + assert(op->session->con); - ceph_object_layout ol; - ol.ol_pgid = op->pgid.v; - ol.ol_stripe_unit = 0; + // preallocated rx buffer? + if (op->con) { + dout(20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl; + op->con->revoke_rx_buffer(op->tid); + op->con->put(); + } + if (op->outbl && op->outbl->length()) { + dout(20) << " posting rx buffer for " << op->tid << " on " << op->session->con << dendl; + op->con = op->session->con->get(); + op->con->post_rx_buffer(op->tid, *op->outbl); + } - MOSDOp *m = new MOSDOp(client_inc, op->tid, - op->oid, ol, osdmap->get_epoch(), - flags); + op->paused = false; + op->incarnation = op->session->incarnation; + op->stamp = g_clock.now(); - m->set_snapid(op->snapid); - m->set_snap_seq(op->snapc.seq); - m->get_snaps() = op->snapc.snaps; + ceph_object_layout ol; + ol.ol_pgid = op->pgid.v; + ol.ol_stripe_unit = 0; - m->ops = op->ops; - m->set_mtime(op->mtime); - m->set_retry_attempt(op->attempts++); + MOSDOp *m = new MOSDOp(client_inc, op->tid, + op->oid, ol, osdmap->get_epoch(), + flags); - if (op->version != eversion_t()) - m->set_version(op->version); // we're replaying this op! + m->set_snapid(op->snapid); + m->set_snap_seq(op->snapc.seq); + m->get_snaps() = op->snapc.snaps; - if (op->priority) - m->set_priority(op->priority); + m->ops = op->ops; + m->set_mtime(op->mtime); + m->set_retry_attempt(op->attempts++); - messenger->send_message(m, op->con); - } else - maybe_request_map(); - - dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl; - - return op->tid; + if (op->version != eversion_t()) + m->set_version(op->version); // we're replaying this op! + + if (op->priority) + m->set_priority(op->priority); + + messenger->send_message(m, op->session->con); } int Objecter::calc_op_budget(Op *op) @@ -664,11 +631,11 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) << (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack")) << " v " << m->get_version() << " in " << m->get_pg() << dendl; - Op *op = op_osd[ tid ]; + Op *op = op_osd[tid]; - if (op->con != m->get_connection()) { + if (op->session->con != m->get_connection()) { dout(7) << " ignoring reply from " << m->get_source_inst() - << ", i last sent to " << op->con->get_peer_addr() << dendl; + << ", i last sent to " << op->session->con->get_peer_addr() << dendl; m->put(); return; } @@ -694,7 +661,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) // got data? if (op->outbl) { - if (op->outbl->length()) + if (op->con) op->con->revoke_rx_buffer(op->tid); m->claim_data(*op->outbl); op->outbl = 0; @@ -717,15 +684,10 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) // done with this tid? if (!op->onack && !op->oncommit) { - PG &pg = get_pg( m->get_pg() ); - assert(pg.active_tids.count(tid)); - pg.active_tids.erase(tid); - dout(15) << "handle_osd_op_reply completed tid " << tid << ", pg " << m->get_pg() - << " still has " << pg.active_tids << dendl; - if (pg.active_tids.empty() && pg.linger_ops.empty()) - close_pg( m->get_pg() ); + op->session_item.remove_myself(); + dout(15) << "handle_osd_op_reply completed tid " << tid << dendl; put_op_budget(op); - op_osd.erase( tid ); + op_osd.erase(tid); if (op->con) op->con->put(); delete op; @@ -796,7 +758,7 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) { object_locator_t oloc(list_context->pool_id); // - Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL, NULL, false); + Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL, NULL); o->priority = op.priority; o->snapid = list_context->pool_snap_seq; o->outbl = bl; @@ -1266,10 +1228,15 @@ void Objecter::ms_handle_reset(Connection *con) int osd = osdmap->identify_osd(con->get_peer_addr()); if (osd >= 0) { dout(1) << "ms_handle_reset on osd" << osd << dendl; - set changed_pgs; - scan_pgs_for(changed_pgs, osd); - kick_requests(changed_pgs); - maybe_request_map(); + map::iterator p = osd_sessions.find(osd); + if (p != osd_sessions.end()) { + OSDSession *session = p->second; + messenger->mark_down(session->con->get_peer_addr()); + session->con->put(); + session->con = NULL; + kick_requests(session); + maybe_request_map(); + } } else { dout(10) << "ms_handle_reset on unknown osd addr " << con->get_peer_addr() << dendl; } diff --git a/src/osdc/Objecter.h b/src/osdc/Objecter.h index fa2da08c22ae..262c93868372 100644 --- a/src/osdc/Objecter.h +++ b/src/osdc/Objecter.h @@ -244,8 +244,12 @@ public: // read public: + class OSDSession; + struct Op { + OSDSession *session; xlist::item session_item; + int incarnation; object_t oid; object_locator_t oloc; @@ -272,16 +276,16 @@ public: eversion_t *objver; - bool linger; + utime_t stamp; Op(const object_t& o, const object_locator_t& ol, vector& op, - int f, Context *ac, Context *co, eversion_t *ov, bool ln = false) : - session_item(this), + int f, Context *ac, Context *co, eversion_t *ov) : + session(NULL), session_item(this), incarnation(0), oid(o), oloc(ol), con(NULL), snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co), tid(0), attempts(0), - paused(false), objver(ov), linger(ln) { + paused(false), objver(ov) { ops.swap(op); } }; @@ -394,55 +398,8 @@ public: auid(0), crush_rule(0), snapid(0), blp(NULL) {} }; - // -- osd sessions -- - struct Session { - xlist ops; - int osd; - int incarnation; - }; - map sessions; - - - private: - // pending ops - hash_map op_osd; - map op_poolstat; - map op_statfs; - map op_pool; - - map > > waiting_for_map; - - /** - * track pending ops by pg - * ...so we can cope with failures, map changes - */ - class LingerOp; - - class PG { - public: - vector acting; - set active_tids; // active ops - utime_t last; - xlist linger_ops; - - PG() {} - - // primary - where i write - int primary() { - if (acting.empty()) return -1; - return acting[0]; - } - // acker - where i read, and receive acks from - int acker() { - if (acting.empty()) return -1; - return acting[0]; - } - }; - - hash_map pg_map; - // --- - // lingering ops + // -- lingering ops -- struct LingerOp { uint64_t linger_id; @@ -458,20 +415,18 @@ public: bool registered; Context *on_reg_ack, *on_reg_commit; - PG *pg; - xlist::item pg_item; + OSDSession *session; + xlist::item session_item; LingerOp() : linger_id(0), flags(0), poutbl(NULL), pobjver(NULL), registered(false), on_reg_ack(NULL), on_reg_commit(NULL), - pg(NULL), pg_item(this) {} + session(NULL), session_item(this) {} // no copy! const LingerOp &operator=(const LingerOp& r); LingerOp(const LingerOp& o); }; - map op_linger_info; - struct C_Linger_Ack : public Context { Objecter *objecter; LingerOp *info; @@ -490,16 +445,41 @@ public: } }; - - PG &get_pg(pg_t pgid); - void close_pg(pg_t pgid) { - assert(pg_map.count(pgid)); - assert(pg_map[pgid].active_tids.empty()); - pg_map.erase(pgid); + + // -- osd sessions -- + struct OSDSession { + xlist ops; + xlist linger_ops; + int osd; + int incarnation; + Connection *con; + + OSDSession(int o) : osd(o), incarnation(0), con(NULL) {} + }; + map osd_sessions; + + OSDSession *get_session(int osd) { + map::iterator p = osd_sessions.find(osd); + if (p != osd_sessions.end()) + return p->second; + OSDSession *s = new OSDSession(osd); + osd_sessions[osd] = s; + return s; } - void scan_pgs(set& changed_pgs); - void scan_pgs_for(set& changed_pgs, int osd); - void kick_requests(set& changed_pgs); + + private: + // pending ops + hash_map op_osd; + int num_homeless_ops; + map op_linger_info; + map op_poolstat; + map op_statfs; + map op_pool; + + map > > waiting_for_map; + + void send_op(Op *op); + void kick_requests(OSDSession *session); void _list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish); @@ -535,6 +515,7 @@ public: last_seen_osdmap_version(0), last_seen_pgmap_version(0), client_lock(l), timer(t), + num_homeless_ops(0), op_throttler(g_conf.objecter_inflight_op_bytes) { } ~Objecter() { } @@ -566,7 +547,7 @@ private: // low-level tid_t op_submit(Op *op, LingerOp *linger_op = NULL); - tid_t resend_linger(LingerOp *info); + tid_t send_linger(LingerOp *info); void _linger_ack(LingerOp *info, int r); void _linger_commit(LingerOp *info, int r); @@ -756,7 +737,7 @@ private: const SnapContext& snapc, int flags, Context *onack, Context *oncommit, eversion_t *objver = NULL) { - Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, false); + Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver); o->mtime = mtime; o->snapc = snapc; return op_submit(o);