{
}
-tid_t Objecter::resend_linger(LingerOp *info)
+tid_t Objecter::send_linger(LingerOp *info)
{
+ dout(15) << "send_linger " << info->linger_id << dendl;
+
vector<OSDOp> ops = info->ops; // need to pass a copy to ops
Context *onack = NULL;
Context *oncommit = NULL;
}
Op *o = new Op(info->oid, info->oloc, ops, info->flags | CEPH_OSD_FLAG_READ,
onack, oncommit,
- info->pobjver, true);
+ info->pobjver);
o->snapid = info->snap;
return op_submit(o, info);
}
map<uint64_t, LingerOp*>::iterator iter = op_linger_info.find(linger_id);
if (iter != op_linger_info.end()) {
LingerOp *info = iter->second;
- info->pg_item.remove_myself();
+ info->session_item.remove_myself();
op_linger_info.erase(iter);
delete info;
}
info->linger_id = ++max_linger_id;
op_linger_info[info->linger_id] = info;
- resend_linger(info);
+ send_linger(info);
return info->linger_id;
}
return;
}
+ bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
+ bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR);
+ bool was_full = osdmap->test_flag(CEPH_OSDMAP_FULL);
+
if (m->get_last() <= osdmap->get_epoch()) {
dout(3) << "handle_osd_map ignoring epochs ["
<< m->get_first() << "," << m->get_last()
<< "] > " << osdmap->get_epoch()
<< dendl;
- set<pg_t> changed_pgs;
-
if (osdmap->get_epoch()) {
// we want incrementals
for (epoch_t e = osdmap->get_epoch() + 1;
e <= m->get_last();
e++) {
-
- bool was_pauserd = osdmap->test_flag(CEPH_OSDMAP_PAUSERD);
- bool was_pausewr = osdmap->test_flag(CEPH_OSDMAP_PAUSEWR);
- bool was_full = osdmap->test_flag(CEPH_OSDMAP_FULL);
-
+
if (m->incremental_maps.count(e)) {
dout(3) << "handle_osd_map decoding incremental epoch " << e << dendl;
OSDMap::Incremental inc(m->incremental_maps[e]);
break;
}
- if (was_pauserd || was_pausewr || was_full)
- maybe_request_map();
-
- // scan pgs for changes
- scan_pgs(changed_pgs);
-
- // kick paused
- if ((was_pauserd && !osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) ||
- (was_pausewr && !osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) ||
- (was_full && !osdmap->test_flag(CEPH_OSDMAP_FULL))) {
- for (hash_map<tid_t,Op*>::iterator p = op_osd.begin();
- p != op_osd.end();
- p++) {
- if (p->second->paused) {
- p->second->paused = false;
- op_submit(p->second);
- }
- }
- }
- dump_active();
assert(e == osdmap->get_epoch());
}
if (m->maps.count(m->get_last())) {
dout(3) << "handle_osd_map decoding full epoch " << m->get_last() << dendl;
osdmap->decode(m->maps[m->get_last()]);
-
- scan_pgs(changed_pgs);
} else {
dout(3) << "handle_osd_map hmm, i want a full map, requesting" << dendl;
monc->sub_want("osdmap", 0, CEPH_SUBSCRIBE_ONETIME);
monc->renew_subs();
}
}
+ }
- // kick requests who might be timing out on the wrong osds
- if (!changed_pgs.empty())
- kick_requests(changed_pgs);
+ if (was_pauserd || was_pausewr || was_full)
+ maybe_request_map();
+
+ bool kick_paused =
+ (was_pauserd && !osdmap->test_flag(CEPH_OSDMAP_PAUSERD)) ||
+ (was_pausewr && !osdmap->test_flag(CEPH_OSDMAP_PAUSEWR)) ||
+ (was_full && !osdmap->test_flag(CEPH_OSDMAP_FULL));
+
+ // osd addr changes?
+ for (map<int,OSDSession*>::iterator p = osd_sessions.begin();
+ p != osd_sessions.end();
+ p++) {
+ OSDSession *s = p->second;
+ if (osdmap->is_up(s->osd)) {
+ entity_inst_t inst = osdmap->get_inst(s->osd);
+ if (s->con->get_peer_addr() != osdmap->get_inst(s->osd).addr) {
+ dout(10) << " reopening osd" << s->osd << " session, addr now " << inst << dendl;
+ messenger->mark_down(s->con);
+ s->con->put();
+ s->con = messenger->get_connection(inst);
+ s->incarnation++;
+ }
+ }
+ }
+
+ // active requests
+ for (hash_map<tid_t,Op*>::iterator p = op_osd.begin();
+ p != op_osd.end();
+ p++) {
+ Op *op = p->second;
+
+ // calc target
+ vector<int> acting;
+ if (op->oid.name.length())
+ op->pgid = osdmap->object_locator_to_pg(op->oid, op->oloc);
+ osdmap->pg_to_acting_osds(op->pgid, acting);
+ OSDSession *s = 0;
+
+ if (acting.size())
+ s = get_session(acting[0]);
+ if (op->session != s) {
+ dout(10) << " redirecting tid " << op->tid << " to osd" << (s ? s->osd : -1) << dendl;
+ if (!op->session)
+ num_homeless_ops--;
+ op->session_item.remove_myself();
+ op->session = s;
+ if (s) {
+ op->session->ops.push_back(&op->session_item);
+ send_op(op);
+ } else {
+ num_homeless_ops++;
+ }
+ } else if (s) {
+ if (op->incarnation != s->incarnation) {
+ dout(10) << " resending tid " << op->tid << " to (reopened) osd" << s->osd << dendl;
+ send_op(op);
+ } else if (op->paused && kick_paused) {
+ dout(10) << " kicking paused tid " << op->tid << " on osd" << s->osd << dendl;
+ send_op(op);
+ }
+ }
}
- //now check if the map is full -- we want to subscribe if it is!
+ // lingers
+ for (map<uint64_t,LingerOp*>::iterator p = op_linger_info.begin();
+ p != op_linger_info.end();
+ p++) {
+ LingerOp *op = p->second;
+ vector<int> acting;
+ pg_t pgid = osdmap->object_locator_to_pg(op->oid, op->oloc);
+ osdmap->pg_to_acting_osds(pgid, acting);
+ OSDSession *s = acting.size() ? get_session(acting[0]) : NULL;
+ if (op->session != s) {
+ dout(10) << " redirecting linger id " << op->linger_id << " to osd" << (s ? s->osd : -1) << dendl;
+ op->session_item.remove_myself();
+ op->session = s;
+ if (s) {
+ send_linger(op);
+ }
+ }
+ }
+
+ dump_active();
+
+ // now check if the map is full -- we want to subscribe if it is!
if (osdmap->test_flag(CEPH_OSDMAP_FULL) & CEPH_OSDMAP_FULL)
maybe_request_map();
- //finish any Contexts that were waiting on a map update
+ // finish any Contexts that were waiting on a map update
map<epoch_t,list< pair< Context*, int > > >::iterator p =
waiting_for_map.begin();
while (p != waiting_for_map.end() &&
}
-Objecter::PG &Objecter::get_pg(pg_t pgid)
-{
- if (!pg_map.count(pgid)) {
- osdmap->pg_to_acting_osds(pgid, pg_map[pgid].acting);
- dout(10) << "get_pg " << pgid << " is new, " << pg_map[pgid].acting << dendl;
- } else {
- dout(10) << "get_pg " << pgid << " is old, " << pg_map[pgid].acting << dendl;
- }
- return pg_map[pgid];
-}
-
-
-void Objecter::scan_pgs_for(set<pg_t>& pgs, int osd)
-{
- dout(10) << "scan_pgs_for osd" << osd << dendl;
-
- for (hash_map<pg_t,PG>::iterator i = pg_map.begin();
- i != pg_map.end();
- i++) {
- pg_t pgid = i->first;
- PG& pg = i->second;
- if (pg.acting.size() && pg.acting[0] == osd)
- pgs.insert(pgid);
- }
-}
-
-void Objecter::scan_pgs(set<pg_t>& changed_pgs)
+void Objecter::kick_requests(OSDSession *session)
{
- dout(10) << "scan_pgs" << dendl;
-
- for (hash_map<pg_t,PG>::iterator i = pg_map.begin();
- i != pg_map.end();
- i++) {
- pg_t pgid = i->first;
- PG& pg = i->second;
-
- // calc new.
- vector<int> other;
- osdmap->pg_to_acting_osds(pgid, other);
-
- if (other == pg.acting)
- continue; // no change.
+ dout(10) << "kick_requests for osd" << session->osd << dendl;
- dout(10) << "scan_pgs " << pgid << " " << pg.acting << " -> " << other << dendl;
-
- other.swap(pg.acting);
-
- if (other.size() && pg.acting.size() &&
- other[0] == pg.acting[0])
- continue; // same primary.
-
- // changed significantly.
- dout(10) << "scan_pgs pg " << pgid
- << " (" << pg.active_tids << ")"
- << " " << other << " -> " << pg.acting
- << dendl;
- changed_pgs.insert(pgid);
- }
-}
+ // resend ops
+ for (xlist<Op*>::iterator p = session->ops.begin(); !p.end(); ++p)
+ send_op(*p);
-void Objecter::kick_requests(set<pg_t>& changed_pgs)
-{
- dout(10) << "kick_requests in pgs " << changed_pgs << dendl;
- dout(0) << "*** kick_requests in pgs " << changed_pgs << dendl;
-
- for (set<pg_t>::iterator i = changed_pgs.begin();
- i != changed_pgs.end();
- i++) {
- pg_t pgid = *i;
- PG& pg = pg_map[pgid];
-
- // resubmit ops!
- set<tid_t> tids;
- tids.swap( pg.active_tids );
-
- // resend lingers
- for (xlist<LingerOp*>::iterator j = pg.linger_ops.begin(); !j.end(); ++j)
- resend_linger(*j);
-
- if (pg.linger_ops.empty())
- close_pg( pgid ); // will pbly reopen, unless it's just commits we're missing
-
- dout(10) << "kick_requests pg " << pgid << " tids " << tids << dendl;
- for (set<tid_t>::iterator p = tids.begin();
- p != tids.end();
- p++) {
- tid_t tid = *p;
-
- hash_map<tid_t, Op*>::iterator p = op_osd.find(tid);
- if (p != op_osd.end()) {
- Op *op = p->second;
- put_op_budget(op);
- op_osd.erase(p);
-
- if (op->onack)
- num_unacked--;
- if (op->oncommit)
- num_uncommitted--;
-
- dout(0) << "kick_requests tid=" << tid << " linger=" << op->linger << dendl;
- // WRITE
- if (op->onack) {
- dout(3) << "kick_requests missing ack, resub " << tid << dendl;
- op_submit(op, false);
- } else {
- if (!op->linger) {
- assert(op->oncommit);
- dout(3) << "kick_requests missing commit, resub " << tid << dendl;
- dout(0) << "kick_requests missing commit, resub " << tid << dendl;
- op_submit(op, false);
- }
- }
- } else
- assert(0);
- }
- }
- dout(0) << "*** kick_requests done" << dendl;
+ // resend lingers
+ for (xlist<LingerOp*>::iterator j = session->linger_ops.begin(); !j.end(); ++j)
+ send_linger(*j);
}
{
dout(10) << "tick" << dendl;
- set<int> ping;
+ set<OSDSession*> toping;
- // look for laggy pgs
+ // look for laggy requests
utime_t cutoff = g_clock.now();
cutoff -= g_conf.objecter_timeout; // timeout
- for (hash_map<pg_t,PG>::iterator i = pg_map.begin();
- i != pg_map.end();
- i++) {
- if (!i->second.active_tids.empty() &&
- i->second.last < cutoff) {
- dout(1) << " pg " << i->first << " on " << i->second.acting
- << " is laggy: " << i->second.active_tids << dendl;
- maybe_request_map();
- //break;
-
- // send a ping to this osd, to ensure we detect any session resets
- // (osd reply message policy is lossy)
- if (i->second.acting.size())
- ping.insert(i->second.acting[0]);
+
+ for (hash_map<tid_t,Op*>::iterator p = op_osd.begin();
+ p != op_osd.end();
+ p++) {
+ Op *op = p->second;
+ if (op->session && op->stamp < cutoff) {
+ dout(2) << " tid " << p->first << " on osd" << op->session->osd << " is laggy" << dendl;
+ toping.insert(op->session);
}
}
- for (set<int>::iterator p = ping.begin(); p != ping.end(); p++)
- messenger->send_message(new MPing, osdmap->get_inst(*p));
+ if (num_homeless_ops || !toping.empty())
+ maybe_request_map();
+ if (!toping.empty()) {
+ // send a ping to these osds, to ensure we detect any session resets
+ // (osd reply message policy is lossy)
+ for (set<OSDSession*>::iterator i = toping.begin();
+ i != toping.end();
+ i++)
+ messenger->send_message(new MPing, osdmap->get_inst((*i)->osd));
+ }
+
// reschedule
timer.add_event_after(g_conf.objecter_tick_interval, new C_Tick(this));
}
// take_op_budget() may drop our lock while it blocks.
take_op_budget(op);
- if (op->oid.name.length())
- op->pgid = osdmap->object_locator_to_pg(op->oid, op->oloc);
-
- // find
- PG &pg = get_pg(op->pgid);
-
- if (linger_op)
- pg.linger_ops.push_back(&linger_op->pg_item);
-
// pick tid
op->tid = ++last_tid;
assert(client_inc >= 0);
+ // pick target
+ vector<int> acting;
+ if (op->oid.name.length())
+ op->pgid = osdmap->object_locator_to_pg(op->oid, op->oloc);
+ osdmap->pg_to_acting_osds(op->pgid, acting);
+
+ if (acting.size()) {
+ op->session = get_session(acting[0]);
+ op->session->ops.push_back(&op->session_item);
+ if (linger_op)
+ op->session->linger_ops.push_back(&linger_op->session_item);
+ } else {
+ op->session = NULL;
+ num_homeless_ops++;
+ }
+
// add to gather set(s)
int flags = op->flags;
if (op->onack) {
}
op_osd[op->tid] = op;
- pg.active_tids.insert(op->tid);
- pg.last = g_clock.now();
-
// send?
- dout(0) << "op_submit oid " << op->oid
- << " " << op->oloc
- << " " << op->ops << " tid " << op->tid
- << " osd" << pg.primary()
- << dendl;
dout(10) << "op_submit oid " << op->oid
<< " " << op->oloc
<< " " << op->ops << " tid " << op->tid
- << " osd" << pg.primary()
+ << " osd" << (op->session ? op->session->osd : -1)
<< dendl;
assert(op->flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE));
dout(10) << " FULL, paused modify " << op << " tid " << last_tid << dendl;
op->paused = true;
maybe_request_map();
- } else if (pg.primary() >= 0) {
- int flags = op->flags;
- if (op->oncommit)
- flags |= CEPH_OSD_FLAG_ONDISK;
- if (op->onack)
- flags |= CEPH_OSD_FLAG_ACK;
+ } else if (op->session) {
+ send_op(op);
+ } else
+ maybe_request_map();
+
+ dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
+
+ return op->tid;
+}
- if (op->con) {
- if (op->outbl && op->outbl->length()) {
- dout(20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
- op->con->revoke_rx_buffer(op->tid);
- }
- op->con->put();
- }
- op->con = messenger->get_connection(osdmap->get_inst(pg.primary()));
- assert(op->con);
- if (op->outbl && op->outbl->length()) {
- dout(20) << " posting rx buffer for " << op->tid << " on " << op->con << dendl;
- op->con->post_rx_buffer(op->tid, *op->outbl);
- }
+void Objecter::send_op(Op *op)
+{
+ dout(15) << "send_op " << op->tid << " to osd" << op->session->osd << dendl;
+
+ int flags = op->flags;
+ if (op->oncommit)
+ flags |= CEPH_OSD_FLAG_ONDISK;
+ if (op->onack)
+ flags |= CEPH_OSD_FLAG_ACK;
+
+ if (!op->session->con)
+ op->session->con = messenger->get_connection(osdmap->get_inst(op->session->osd));
+ assert(op->session->con);
- ceph_object_layout ol;
- ol.ol_pgid = op->pgid.v;
- ol.ol_stripe_unit = 0;
+ // preallocated rx buffer?
+ if (op->con) {
+ dout(20) << " revoking rx buffer for " << op->tid << " on " << op->con << dendl;
+ op->con->revoke_rx_buffer(op->tid);
+ op->con->put();
+ }
+ if (op->outbl && op->outbl->length()) {
+ dout(20) << " posting rx buffer for " << op->tid << " on " << op->session->con << dendl;
+ op->con = op->session->con->get();
+ op->con->post_rx_buffer(op->tid, *op->outbl);
+ }
- MOSDOp *m = new MOSDOp(client_inc, op->tid,
- op->oid, ol, osdmap->get_epoch(),
- flags);
+ op->paused = false;
+ op->incarnation = op->session->incarnation;
+ op->stamp = g_clock.now();
- m->set_snapid(op->snapid);
- m->set_snap_seq(op->snapc.seq);
- m->get_snaps() = op->snapc.snaps;
+ ceph_object_layout ol;
+ ol.ol_pgid = op->pgid.v;
+ ol.ol_stripe_unit = 0;
- m->ops = op->ops;
- m->set_mtime(op->mtime);
- m->set_retry_attempt(op->attempts++);
+ MOSDOp *m = new MOSDOp(client_inc, op->tid,
+ op->oid, ol, osdmap->get_epoch(),
+ flags);
- if (op->version != eversion_t())
- m->set_version(op->version); // we're replaying this op!
+ m->set_snapid(op->snapid);
+ m->set_snap_seq(op->snapc.seq);
+ m->get_snaps() = op->snapc.snaps;
- if (op->priority)
- m->set_priority(op->priority);
+ m->ops = op->ops;
+ m->set_mtime(op->mtime);
+ m->set_retry_attempt(op->attempts++);
- messenger->send_message(m, op->con);
- } else
- maybe_request_map();
-
- dout(5) << num_unacked << " unacked, " << num_uncommitted << " uncommitted" << dendl;
-
- return op->tid;
+ if (op->version != eversion_t())
+ m->set_version(op->version); // we're replaying this op!
+
+ if (op->priority)
+ m->set_priority(op->priority);
+
+ messenger->send_message(m, op->session->con);
}
int Objecter::calc_op_budget(Op *op)
<< (m->is_ondisk() ? " ondisk":(m->is_onnvram() ? " onnvram":" ack"))
<< " v " << m->get_version() << " in " << m->get_pg()
<< dendl;
- Op *op = op_osd[ tid ];
+ Op *op = op_osd[tid];
- if (op->con != m->get_connection()) {
+ if (op->session->con != m->get_connection()) {
dout(7) << " ignoring reply from " << m->get_source_inst()
- << ", i last sent to " << op->con->get_peer_addr() << dendl;
+ << ", i last sent to " << op->session->con->get_peer_addr() << dendl;
m->put();
return;
}
// got data?
if (op->outbl) {
- if (op->outbl->length())
+ if (op->con)
op->con->revoke_rx_buffer(op->tid);
m->claim_data(*op->outbl);
op->outbl = 0;
// done with this tid?
if (!op->onack && !op->oncommit) {
- PG &pg = get_pg( m->get_pg() );
- assert(pg.active_tids.count(tid));
- pg.active_tids.erase(tid);
- dout(15) << "handle_osd_op_reply completed tid " << tid << ", pg " << m->get_pg()
- << " still has " << pg.active_tids << dendl;
- if (pg.active_tids.empty() && pg.linger_ops.empty())
- close_pg( m->get_pg() );
+ op->session_item.remove_myself();
+ dout(15) << "handle_osd_op_reply completed tid " << tid << dendl;
put_op_budget(op);
- op_osd.erase( tid );
+ op_osd.erase(tid);
if (op->con)
op->con->put();
delete op;
object_locator_t oloc(list_context->pool_id);
//
- Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL, NULL, false);
+ Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL, NULL);
o->priority = op.priority;
o->snapid = list_context->pool_snap_seq;
o->outbl = bl;
int osd = osdmap->identify_osd(con->get_peer_addr());
if (osd >= 0) {
dout(1) << "ms_handle_reset on osd" << osd << dendl;
- set<pg_t> changed_pgs;
- scan_pgs_for(changed_pgs, osd);
- kick_requests(changed_pgs);
- maybe_request_map();
+ map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
+ if (p != osd_sessions.end()) {
+ OSDSession *session = p->second;
+ messenger->mark_down(session->con->get_peer_addr());
+ session->con->put();
+ session->con = NULL;
+ kick_requests(session);
+ maybe_request_map();
+ }
} else {
dout(10) << "ms_handle_reset on unknown osd addr " << con->get_peer_addr() << dendl;
}
// read
public:
+ class OSDSession;
+
struct Op {
+ OSDSession *session;
xlist<Op*>::item session_item;
+ int incarnation;
object_t oid;
object_locator_t oloc;
eversion_t *objver;
- bool linger;
+ utime_t stamp;
Op(const object_t& o, const object_locator_t& ol, vector<OSDOp>& op,
- int f, Context *ac, Context *co, eversion_t *ov, bool ln = false) :
- session_item(this),
+ int f, Context *ac, Context *co, eversion_t *ov) :
+ session(NULL), session_item(this), incarnation(0),
oid(o), oloc(ol),
con(NULL),
snapid(CEPH_NOSNAP), outbl(0), flags(f), priority(0), onack(ac), oncommit(co),
tid(0), attempts(0),
- paused(false), objver(ov), linger(ln) {
+ paused(false), objver(ov) {
ops.swap(op);
}
};
auid(0), crush_rule(0), snapid(0), blp(NULL) {}
};
- // -- osd sessions --
- struct Session {
- xlist<Op*> ops;
- int osd;
- int incarnation;
- };
- map<int,Session*> sessions;
-
-
- private:
- // pending ops
- hash_map<tid_t,Op*> op_osd;
- map<tid_t,PoolStatOp*> op_poolstat;
- map<tid_t,StatfsOp*> op_statfs;
- map<tid_t,PoolOp*> op_pool;
-
- map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
-
- /**
- * track pending ops by pg
- * ...so we can cope with failures, map changes
- */
- class LingerOp;
-
- class PG {
- public:
- vector<int> acting;
- set<tid_t> active_tids; // active ops
- utime_t last;
- xlist<LingerOp*> linger_ops;
-
- PG() {}
-
- // primary - where i write
- int primary() {
- if (acting.empty()) return -1;
- return acting[0];
- }
- // acker - where i read, and receive acks from
- int acker() {
- if (acting.empty()) return -1;
- return acting[0];
- }
- };
-
- hash_map<pg_t,PG> pg_map;
- // ---
- // lingering ops
+ // -- lingering ops --
struct LingerOp {
uint64_t linger_id;
bool registered;
Context *on_reg_ack, *on_reg_commit;
- PG *pg;
- xlist<LingerOp*>::item pg_item;
+ OSDSession *session;
+ xlist<LingerOp*>::item session_item;
LingerOp() : linger_id(0), flags(0), poutbl(NULL), pobjver(NULL),
registered(false), on_reg_ack(NULL), on_reg_commit(NULL),
- pg(NULL), pg_item(this) {}
+ session(NULL), session_item(this) {}
// no copy!
const LingerOp &operator=(const LingerOp& r);
LingerOp(const LingerOp& o);
};
- map<uint64_t, LingerOp*> op_linger_info;
-
struct C_Linger_Ack : public Context {
Objecter *objecter;
LingerOp *info;
}
};
-
- PG &get_pg(pg_t pgid);
- void close_pg(pg_t pgid) {
- assert(pg_map.count(pgid));
- assert(pg_map[pgid].active_tids.empty());
- pg_map.erase(pgid);
+
+ // -- osd sessions --
+ struct OSDSession {
+ xlist<Op*> ops;
+ xlist<LingerOp*> linger_ops;
+ int osd;
+ int incarnation;
+ Connection *con;
+
+ OSDSession(int o) : osd(o), incarnation(0), con(NULL) {}
+ };
+ map<int,OSDSession*> osd_sessions;
+
+ OSDSession *get_session(int osd) {
+ map<int,OSDSession*>::iterator p = osd_sessions.find(osd);
+ if (p != osd_sessions.end())
+ return p->second;
+ OSDSession *s = new OSDSession(osd);
+ osd_sessions[osd] = s;
+ return s;
}
- void scan_pgs(set<pg_t>& changed_pgs);
- void scan_pgs_for(set<pg_t>& changed_pgs, int osd);
- void kick_requests(set<pg_t>& changed_pgs);
+
+ private:
+ // pending ops
+ hash_map<tid_t,Op*> op_osd;
+ int num_homeless_ops;
+ map<uint64_t, LingerOp*> op_linger_info;
+ map<tid_t,PoolStatOp*> op_poolstat;
+ map<tid_t,StatfsOp*> op_statfs;
+ map<tid_t,PoolOp*> op_pool;
+
+ map<epoch_t,list< pair<Context*, int> > > waiting_for_map;
+
+ void send_op(Op *op);
+ void kick_requests(OSDSession *session);
void _list_reply(ListContext *list_context, bufferlist *bl, Context *final_finish);
last_seen_osdmap_version(0),
last_seen_pgmap_version(0),
client_lock(l), timer(t),
+ num_homeless_ops(0),
op_throttler(g_conf.objecter_inflight_op_bytes)
{ }
~Objecter() { }
// low-level
tid_t op_submit(Op *op, LingerOp *linger_op = NULL);
- tid_t resend_linger(LingerOp *info);
+ tid_t send_linger(LingerOp *info);
void _linger_ack(LingerOp *info, int r);
void _linger_commit(LingerOp *info, int r);
const SnapContext& snapc, int flags,
Context *onack, Context *oncommit,
eversion_t *objver = NULL) {
- Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver, false);
+ Op *o = new Op(oid, oloc, ops, flags | CEPH_OSD_FLAG_WRITE, onack, oncommit, objver);
o->mtime = mtime;
o->snapc = snapc;
return op_submit(o);