From 1ee58d1aacefb2e6bc62723e8b65faf93a6051a5 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Wed, 21 May 2008 16:46:33 -0700 Subject: [PATCH] osd: some push/pull cleanup --- src/osd/PG.cc | 27 ++++++++++++++++++++++----- src/osd/PG.h | 20 ++++++++------------ src/osd/ReplicatedPG.cc | 31 +++++++++++++++---------------- src/osd/ReplicatedPG.h | 6 +++--- 4 files changed, 48 insertions(+), 36 deletions(-) diff --git a/src/osd/PG.cc b/src/osd/PG.cc index e6090bb9c9af..0ac260a31730 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -176,8 +176,12 @@ void PG::proc_replica_log(Log &olog, Missing& omissing, int from) list::reverse_iterator pp = olog.log.rbegin(); eversion_t lu = peer_info[from].last_update; while (pp != olog.log.rend()) { - if (!log.objects.count(pp->oid)) { + if (!log.logged_object(pp->oid)) { dout(10) << " divergent " << *pp << " not in our log, generating backlog" << dendl; + //dout(0) << "log" << dendl; + //log.print(*_dout); + //dout(0) << "olog" << dendl; + //olog.print(*_dout); generate_backlog(); } @@ -228,9 +232,9 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd) << " into " << log << dendl; //dout(0) << "log" << dendl; - //log.print(cout); + //log.print(*_dout); //dout(0) << "olog" << dendl; - //olog.print(cout); + //olog.print(*_dout); if (log.empty() || (olog.bottom > log.top && olog.backlog)) { // e.g. log=(0,20] olog=(40,50]+backlog) @@ -492,11 +496,24 @@ void PG::drop_backlog() ostream& PG::Log::print(ostream& out) const { - out << *this << dendl; + out << *this << std::endl; for (list::const_iterator p = log.begin(); p != log.end(); p++) - out << *p << dendl; + out << *p << std::endl; + return out; +} + +ostream& PG::IndexedLog::print(ostream& out) const +{ + out << *this << std::endl; + for (list::const_iterator p = log.begin(); + p != log.end(); + p++) { + out << *p << " " << (logged_object(p->oid) ? "indexed":"NOT INDEXED") << std::endl; + assert(logged_object(p->oid)); + assert(logged_req(p->reqid)); + } return out; } diff --git a/src/osd/PG.h b/src/osd/PG.h index c0d9d149f53d..35335f5feda3 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -361,10 +361,10 @@ public: Log::clear(); } - bool logged_object(object_t oid) { + bool logged_object(object_t oid) const { return objects.count(oid); } - bool logged_req(const osd_reqid_t &r) { + bool logged_req(const osd_reqid_t &r) const { return caller_ops.count(r); } @@ -423,6 +423,8 @@ public: void trim(ObjectStore::Transaction &t, eversion_t s); void trim_write_ahead(eversion_t last_update); + + ostream& print(ostream& out) const; }; @@ -536,10 +538,6 @@ protected: bool block_if_wrlocked(MOSDOp* op); - // recovery - map objects_pulling; // which objects are currently being pulled - - // stats off_t stat_num_bytes; off_t stat_num_blocks; @@ -665,10 +663,6 @@ public: bool is_empty() const { return info.last_update == eversion_t(0,0); } - int num_active_ops() const { - return objects_pulling.size(); - } - // pg on-disk state void write_log(ObjectStore::Transaction& t); void append_log(ObjectStore::Transaction &t, @@ -787,8 +781,10 @@ inline ostream& operator<<(ostream& out, const PG& pg) } } - if (pg.get_role() == 0) out << " pct " << pg.peers_complete_thru; - if (!pg.have_master_log) out << " !hml"; + if (pg.get_role() == 0) { + out << " pct " << pg.peers_complete_thru; + if (!pg.have_master_log) out << " !hml"; + } if (pg.is_active()) out << " active"; if (pg.is_crashed()) out << " crashed"; if (pg.is_replay()) out << " replay"; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 3fa75815a6ce..bf61abaf0ab7 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -77,7 +77,7 @@ void ReplicatedPG::wait_for_missing_object(object_t oid, Message *m) // we don't have it (yet). eversion_t v = missing.missing[oid]; - if (objects_pulling.count(oid)) { + if (pulling.count(oid)) { dout(7) << "missing " << poid << " v " << v @@ -1395,9 +1395,8 @@ void ReplicatedPG::pull(pobject_t poid) osd->messenger->send_message(subop, osd->osdmap->get_inst(fromosd)); // take note - assert(objects_pulling.count(poid.oid) == 0); - num_pulling++; - objects_pulling[poid.oid] = v; + assert(pulling.count(poid.oid) == 0); + pulling[poid.oid] = v; } @@ -1416,7 +1415,6 @@ void ReplicatedPG::push(pobject_t poid, int peer) t.getattr(info.pgid, poid, "version", &v, &vlen); t.getattrs(info.pgid, poid, attrset); unsigned tr = osd->store->apply_transaction(t); - assert(tr == 0); // !!! // ok @@ -1424,6 +1422,7 @@ void ReplicatedPG::push(pobject_t poid, int peer) << " size " << bl.length() << " to osd" << peer << dendl; + assert(vlen == sizeof(v)); osd->logger->inc("r_push"); osd->logger->inc("r_pushb", bl.length()); @@ -1525,6 +1524,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) if (!is_missing_object(poid.oid)) { dout(7) << "sub_op_push not missing " << poid << dendl; + dout(15) << " but i AM missing " << missing.missing << dendl; return; } @@ -1544,9 +1544,9 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) //t.collection_add(info.pgid, poid); // close out pull op? - num_pulling--; - if (objects_pulling.count(poid.oid)) - objects_pulling.erase(poid.oid); + if (pulling.count(poid.oid)) + pulling.erase(poid.oid); + missing.got(poid.oid, v); @@ -1663,9 +1663,8 @@ void ReplicatedPG::cancel_recovery() { // forget about where missing items are, or anything we're pulling missing.loc.clear(); - osd->num_pulling -= objects_pulling.size(); - objects_pulling.clear(); - num_pulling = 0; + osd->num_pulling -= pulling.size(); + pulling.clear(); pushing.clear(); } @@ -1677,13 +1676,13 @@ bool ReplicatedPG::do_recovery() { assert(is_primary()); - dout(-10) << "do_recovery pulling " << objects_pulling.size() << " in pg, " + dout(-10) << "do_recovery pulling " << pulling.size() << " in pg, " << osd->num_pulling << "/" << g_conf.osd_max_pull << " total" << dendl; dout(10) << "do_recovery " << missing << dendl; // can we slow down on this PG? - if (osd->num_pulling >= g_conf.osd_max_pull && !objects_pulling.empty()) { + if (osd->num_pulling >= g_conf.osd_max_pull && !pulling.empty()) { dout(-10) << "do_recovery already pulling max, waiting" << dendl; return true; } @@ -1698,11 +1697,11 @@ bool ReplicatedPG::do_recovery() dout(10) << "do_recovery " << *log.requested_to - << (objects_pulling.count(latest->oid) ? " (pulling)":"") + << (pulling.count(latest->oid) ? " (pulling)":"") << dendl; if (latest->is_update() && - !objects_pulling.count(latest->oid) && + !pulling.count(latest->oid) && missing.is_missing(latest->oid)) { pobject_t poid(info.pgid.pool(), 0, latest->oid); pull(poid); @@ -1712,7 +1711,7 @@ bool ReplicatedPG::do_recovery() log.requested_to++; } - if (!objects_pulling.empty()) { + if (!pulling.empty()) { dout(7) << "do_recovery requested everything, still waiting" << dendl; return false; } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 848f3715f941..f8fe241360dc 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -86,9 +86,10 @@ protected: int fromosd, eversion_t pg_complete_thru=eversion_t(0,0)); // push/pull - int num_pulling; + map pulling; // which objects are currently being pulled map > pushing; + void push(pobject_t oid, int dest); void pull(pobject_t oid); @@ -130,8 +131,7 @@ protected: public: ReplicatedPG(OSD *o, pg_t p) : - PG(o,p), - num_pulling(0) + PG(o,p) { } ~ReplicatedPG() {} -- 2.47.3