From 284a61df43e21bec3aa8ea6647e53177ab2964a7 Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 6 Mar 2007 22:51:00 +0000 Subject: [PATCH] big cleanup of OSD, generic PG interface. do_op moved into PG. new ObjectLayout added to Objecter interface (and ObjectCacher), pg in MOSDOp rolled into that. new PG locking architecture. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1175 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/pgs/config.cc | 2 +- branches/sage/pgs/include/types.h | 8 +- branches/sage/pgs/messages/MOSDOp.h | 19 +- branches/sage/pgs/messages/MOSDOpReply.h | 8 +- branches/sage/pgs/msg/FakeMessenger.cc | 6 +- branches/sage/pgs/osd/OSD.cc | 483 ++++++++++------------- branches/sage/pgs/osd/OSD.h | 151 +++---- branches/sage/pgs/osd/OSDMap.h | 7 +- branches/sage/pgs/osd/PG.cc | 3 + branches/sage/pgs/osd/PG.h | 53 ++- branches/sage/pgs/osd/RAID4PG.cc | 87 ++++ branches/sage/pgs/osd/RAID4PG.h | 19 +- branches/sage/pgs/osd/ReplicatedPG.cc | 158 ++++++-- branches/sage/pgs/osd/ReplicatedPG.h | 19 +- branches/sage/pgs/osd/osd_types.h | 31 +- branches/sage/pgs/osdc/Filer.cc | 6 +- branches/sage/pgs/osdc/ObjectCacher.cc | 22 +- branches/sage/pgs/osdc/ObjectCacher.h | 12 +- branches/sage/pgs/osdc/Objecter.cc | 39 +- branches/sage/pgs/osdc/Objecter.h | 10 +- 20 files changed, 653 insertions(+), 490 deletions(-) diff --git a/branches/sage/pgs/config.cc b/branches/sage/pgs/config.cc index f197a549792b3..57b8bd85ec453 100644 --- a/branches/sage/pgs/config.cc +++ b/branches/sage/pgs/config.cc @@ -161,7 +161,7 @@ md_config_t g_conf = { mds_decay_halflife: 30, mds_beacon_interval: 5.0, - mds_beacon_grace: 10.0, + mds_beacon_grace: 100000.0, mds_log: true, mds_log_max_len: MDS_CACHE_SIZE / 3, diff --git a/branches/sage/pgs/include/types.h b/branches/sage/pgs/include/types.h index e6996fe2d2fe9..9bcd4c6417f75 100644 --- a/branches/sage/pgs/include/types.h +++ b/branches/sage/pgs/include/types.h @@ -141,11 +141,15 @@ struct FileLayout { char pg_size; // pg size (num replicas, or raid4 stripe width) int preferred; // preferred primary osd? + // -- pg -> disk layout -- + int object_stripe_unit; // for per-object raid + FileLayout() { } FileLayout(int su, int sc, int os, int pgt, int pgs, int o=-1) : stripe_unit(su), stripe_count(sc), object_size(os), - pg_type(pgt), pg_size(pgs), - preferred(o) { + pg_type(pgt), pg_size(pgs), preferred(o), + object_stripe_unit(su) // note: bad default, we pbly want su/(pgs-1) + { assert(object_size % stripe_unit == 0); } diff --git a/branches/sage/pgs/messages/MOSDOp.h b/branches/sage/pgs/messages/MOSDOp.h index 6139df56d833e..954d0a1353e67 100644 --- a/branches/sage/pgs/messages/MOSDOp.h +++ b/branches/sage/pgs/messages/MOSDOp.h @@ -75,8 +75,6 @@ public: private: struct { - long pcid; - // who's asking? entity_inst_t client; reqid_t reqid; // minor weirdness: entity_name_t is in reqid_t too. @@ -86,7 +84,7 @@ private: object_t oid; objectrev_t rev; - pg_t pg; + ObjectLayout layout; epoch_t map_epoch; @@ -119,7 +117,8 @@ private: void set_rep_tid(tid_t t) { st.rep_tid = t; } const object_t get_oid() { return st.oid; } - const pg_t get_pg() { return st.pg; } + const pg_t get_pg() { return st.layout.pgid; } + const ObjectLayout& get_layout() { return st.layout; } const epoch_t get_map_epoch() { return st.map_epoch; } //const int get_pg_role() { return st.pg_role; } // who am i asking for? @@ -154,12 +153,9 @@ private: size_t get_data_len() { return data.length(); } - // keep a pcid (procedure call id) to match up request+reply - void set_pcid(long pcid) { this->st.pcid = pcid; } - long get_pcid() { return st.pcid; } MOSDOp(entity_inst_t asker, int inc, long tid, - object_t oid, pg_t pg, epoch_t mapepoch, int op) : + object_t oid, ObjectLayout ol, epoch_t mapepoch, int op) : Message(MSG_OSD_OP) { memset(&st, 0, sizeof(st)); this->st.client = asker; @@ -168,7 +164,7 @@ private: this->st.reqid.tid = tid; this->st.oid = oid; - this->st.pg = pg; + this->st.layout = ol; this->st.map_epoch = mapepoch; this->st.op = op; @@ -182,6 +178,8 @@ private: //void set_pg_role(int r) { st.pg_role = r; } //void set_rg_nrep(int n) { st.rg_nrep = n; } + void set_layout(const ObjectLayout& l) { st.layout = l; } + void set_length(size_t l) { st.length = l; } void set_offset(size_t o) { st.offset = o; } void set_version(eversion_t v) { st.version = v; } @@ -204,8 +202,7 @@ private: ::_encode(data, payload); } - virtual char *get_type_name() { return "oop"; } - + virtual char *get_type_name() { return "osd_op"; } void print(ostream& out) { out << "osd_op(" << st.reqid << " " << get_opname(st.op) diff --git a/branches/sage/pgs/messages/MOSDOpReply.h b/branches/sage/pgs/messages/MOSDOpReply.h index 05106e096d176..6ec15861dcbdd 100644 --- a/branches/sage/pgs/messages/MOSDOpReply.h +++ b/branches/sage/pgs/messages/MOSDOpReply.h @@ -36,7 +36,7 @@ class MOSDOpReply : public Message { tid_t rep_tid; object_t oid; - pg_t pg; + ObjectLayout layout; // pgid, etc. int op; @@ -60,7 +60,7 @@ class MOSDOpReply : public Message { long get_tid() { return st.reqid.tid; } long get_rep_tid() { return st.rep_tid; } object_t get_oid() { return st.oid; } - pg_t get_pg() { return st.pg; } + pg_t get_pg() { return st.layout.pgid; } int get_op() { return st.op; } bool get_commit() { return st.commit; } @@ -105,7 +105,7 @@ public: this->st.rep_tid = req->st.rep_tid; this->st.oid = req->st.oid; - this->st.pg = req->st.pg; + this->st.layout = req->st.layout; this->st.result = result; this->st.commit = commit; @@ -132,7 +132,7 @@ public: ::_encode(data, payload); } - virtual char *get_type_name() { return "oopr"; } + virtual char *get_type_name() { return "osd_op_reply"; } void print(ostream& out) { out << "osd_op_reply(" << st.reqid diff --git a/branches/sage/pgs/msg/FakeMessenger.cc b/branches/sage/pgs/msg/FakeMessenger.cc index d2db8c8f7e11c..ae267dfb76323 100644 --- a/branches/sage/pgs/msg/FakeMessenger.cc +++ b/branches/sage/pgs/msg/FakeMessenger.cc @@ -155,9 +155,9 @@ int fakemessenger_do_loop_2() if (m) { //dout(18) << "got " << m << endl; - dout(1) << "---- " << m->get_dest() + dout(1) << "==== " << m->get_dest() << " <- " << m->get_source() - << " ---- " << *m + << " ==== " << *m << endl; if (g_conf.fakemessenger_serialize) { @@ -329,7 +329,7 @@ int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fr } dm->queue_incoming(m); - dout(1) << "--> " << get_myname() << " -> " << inst.name << " " << *m << endl; + dout(1) << "--> " << get_myname() << " -> " << inst.name << " --- " << *m << endl; } catch (...) { diff --git a/branches/sage/pgs/osd/OSD.cc b/branches/sage/pgs/osd/OSD.cc index 1d4392748f207..75d822b63d985 100644 --- a/branches/sage/pgs/osd/OSD.cc +++ b/branches/sage/pgs/osd/OSD.cc @@ -361,20 +361,55 @@ int OSD::read_superblock() } -// object locks -PG *OSD::lock_pg(pg_t pgid) + + +// ====================================================== +// PG's + +PG *OSD::_create_lock_pg(pg_t pgid, ObjectStore::Transaction& t) { - osd_lock.Lock(); - PG *pg = _lock_pg(pgid); - osd_lock.Unlock(); + dout(10) << "_create_lock_pg " << pgid << endl; + + if (pg_map.count(pgid)) + dout(0) << "_create_lock_pg on " << pgid << ", already have " << *pg_map[pgid] << endl; + + // create + PG *pg; + if (pgid.is_rep()) + pg = new ReplicatedPG(this, pgid); + else if (pgid.is_raid4()) + pg = new RAID4PG(this, pgid); + else + assert(0); + + assert(pg_map.count(pgid) == 0); + pg_map[pgid] = pg; + + // lock + pg->lock(); + pg_lock.insert(pgid); + + pg->get(); // because it's in pg_map + pg->get(); // because we're locking it + + // create collection + assert(!store->collection_exists(pgid)); + t.create_collection(pgid); + return pg; } +bool OSD::_have_pg(pg_t pgid) +{ + return pg_map.count(pgid); +} + PG *OSD::_lock_pg(pg_t pgid) { assert(pg_map.count(pgid)); + // wait? if (pg_lock.count(pgid)) { Cond c; dout(15) << "lock_pg " << pgid << " waiting as " << &c << endl; @@ -396,14 +431,10 @@ PG *OSD::_lock_pg(pg_t pgid) dout(15) << "lock_pg " << pgid << endl; pg_lock.insert(pgid); - return pg_map[pgid]; -} - -void OSD::unlock_pg(pg_t pgid) -{ - osd_lock.Lock(); - _unlock_pg(pgid); - osd_lock.Unlock(); + PG *pg = pg_map[pgid]; + pg->lock(); + pg->get(); // because we're "locking" it and returning a pointer copy. + return pg; } void OSD::_unlock_pg(pg_t pgid) @@ -412,6 +443,8 @@ void OSD::_unlock_pg(pg_t pgid) assert(pg_lock.count(pgid)); pg_lock.erase(pgid); + pg_map[pgid]->put_unlock(); + if (pg_lock_waiters.count(pgid)) { // someone is in line Cond *c = pg_lock_waiters[pgid].front(); @@ -424,9 +457,14 @@ void OSD::_unlock_pg(pg_t pgid) } } -void OSD::_remove_pg(pg_t pgid) +void OSD::_remove_unlock_pg(PG *pg) { - dout(10) << "_remove_pg " << pgid << endl; + pg_t pgid = pg->info.pgid; + + dout(10) << "_remove_unlock_pg " << pgid << endl; + + // there shouldn't be any waiters, since we're a stray, and pg is presumably clean0. + assert(pg_lock_waiters.count(pgid) == 0); // remove from store list olist; @@ -435,19 +473,126 @@ void OSD::_remove_pg(pg_t pgid) ObjectStore::Transaction t; { for (list::iterator p = olist.begin(); - p != olist.end(); - p++) + p != olist.end(); + p++) t.remove(*p); t.remove_collection(pgid); t.remove(pgid.to_object()); // log too } store->apply_transaction(t); - - // hose from memory - delete pg_map[pgid]; + + // mark deleted + pg->mark_deleted(); + + // unlock + pg_lock.erase(pgid); + pg->put(); + + // remove from map pg_map.erase(pgid); + pg->put_unlock(); // will delete, if last reference +} + + + +void OSD::load_pgs() +{ + dout(10) << "load_pgs" << endl; + assert(pg_map.empty()); + + list ls; + store->list_collections(ls); + + for (list::iterator it = ls.begin(); + it != ls.end(); + it++) { + pg_t pgid = *it; + + PG *pg = 0; + if (pgid.is_rep()) + new ReplicatedPG(this, pgid); + else if (pgid.is_raid4()) + new RAID4PG(this, pgid); + else + assert(0); + pg_map[pgid] = pg; + pg->get(); + + // read pg info + store->collection_getattr(pgid, "info", &pg->info, sizeof(pg->info)); + + // read pg log + pg->read_log(store); + + // generate state for current mapping + int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting); + int role = osdmap->calc_pg_role(whoami, pg->acting, nrep); + pg->set_role(role); + + dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl; + } } + + + +/** + * check epochs starting from start to verify the pg acting set hasn't changed + * up until now + */ +void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from) +{ + dout(15) << "project_pg_history " << pgid + << " from " << from << " to " << osdmap->get_epoch() + << ", start " << h + << endl; + + vector last; + osdmap->pg_to_acting_osds(pgid, last); + + for (epoch_t e = osdmap->get_epoch()-1; + e >= from; + e--) { + // verify during intermediate epoch + OSDMap oldmap; + get_map(e, oldmap); + + vector acting; + oldmap.pg_to_acting_osds(pgid, acting); + + // acting set change? + if (acting != last && + e <= h.same_since) { + dout(15) << "project_pg_history " << pgid << " changed in " << e+1 + << " from " << acting << " -> " << last << endl; + h.same_since = e+1; + } + + // primary change? + if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) && + e <= h.same_primary_since) { + dout(15) << "project_pg_history " << pgid << " primary changed in " << e+1 << endl; + h.same_primary_since = e+1; + + if (g_conf.osd_rep == OSD_REP_PRIMARY) + h.same_acker_since = h.same_primary_since; + } + // acker change? + if (g_conf.osd_rep != OSD_REP_PRIMARY) { + if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) && + e <= h.same_acker_since) { + dout(15) << "project_pg_history " << pgid << " acker changed in " << e+1 << endl; + h.same_acker_since = e+1; + } + } + + if (h.same_since > e && + h.same_primary_since > e && + h.same_acker_since > e) break; + } + + dout(15) << "project_pg_history end " << h << endl; +} void OSD::activate_pg(pg_t pgid, epoch_t epoch) { @@ -1016,7 +1161,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) int role = osdmap->calc_pg_role(whoami, acting, nrep); if (role < 0) continue; - PG *pg = create_pg(pgid, t); + PG *pg = _create_lock_pg(pgid, t); pg->set_role(role); pg->acting.swap(acting); pg->last_epoch_started_any = @@ -1025,8 +1170,9 @@ void OSD::advance_map(ObjectStore::Transaction& t) pg->info.history.same_primary_since = pg->info.history.same_acker_since = osdmap->get_epoch(); pg->activate(t); - + dout(7) << "created " << *pg << endl; + _unlock_pg(pgid); } for (ps_t ps = 0; ps < maxlps; ++ps) { @@ -1036,7 +1182,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) int nrep = osdmap->pg_to_acting_osds(pgid, acting); int role = osdmap->calc_pg_role(whoami, acting, nrep); - PG *pg = create_pg(pgid, t); + PG *pg = _create_lock_pg(pgid, t); pg->acting.swap(acting); pg->set_role(role); pg->last_epoch_started_any = @@ -1047,11 +1193,11 @@ void OSD::advance_map(ObjectStore::Transaction& t) pg->activate(t); dout(7) << "created " << *pg << endl; + _unlock_pg(pgid); } } // raided - /* for (int size = g_conf.osd_min_raid_width; size <= g_conf.osd_max_raid_width; size++) { @@ -1062,7 +1208,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) int role = osdmap->calc_pg_role(whoami, acting, nrep); if (role < 0) continue; - PG *pg = create_pg(pgid, t); + PG *pg = _create_lock_pg(pgid, t); pg->set_role(role); pg->acting.swap(acting); pg->last_epoch_started_any = @@ -1071,8 +1217,9 @@ void OSD::advance_map(ObjectStore::Transaction& t) pg->info.history.same_primary_since = pg->info.history.same_acker_since = osdmap->get_epoch(); pg->activate(t); - + dout(7) << "created " << *pg << endl; + _unlock_pg(pgid); } for (ps_t ps = 0; ps < maxlps; ++ps) { @@ -1082,7 +1229,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) int nrep = osdmap->pg_to_acting_osds(pgid, acting); int role = osdmap->calc_pg_role(whoami, acting, nrep); - PG *pg = create_pg(pgid, t); + PG *pg = _create_lock_pg(pgid, t); pg->acting.swap(acting); pg->set_role(role); pg->last_epoch_started_any = @@ -1093,9 +1240,9 @@ void OSD::advance_map(ObjectStore::Transaction& t) pg->activate(t); dout(7) << "created " << *pg << endl; + _unlock_pg(pgid); } } - */ dout(1) << "mkfs done, created " << pg_map.size() << " pgs" << endl; } else { @@ -1392,142 +1539,6 @@ bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch) -// ====================================================== -// PG's - -bool OSD::pg_exists(pg_t pgid) -{ - return store->collection_exists(pgid); -} - -PG *OSD::create_pg(pg_t pgid, ObjectStore::Transaction& t) -{ - if (pg_map.count(pgid)) { - dout(0) << "create_pg on " << pgid << ", already have " << *pg_map[pgid] << endl; - } - assert(pg_map.count(pgid) == 0); - assert(!pg_exists(pgid)); - - PG *pg; - if (pgid.is_rep()) - pg = new ReplicatedPG(this, pgid); - else if (pgid.is_raid4()) - assert(0); //pg = new RAID4PG(this, pgid); - else - assert(0); - pg_map[pgid] = pg; - - t.create_collection(pgid); - - return pg; -} - - -PG *OSD::get_pg(pg_t pgid) -{ - if (pg_map.count(pgid)) - return pg_map[pgid]; - return 0; -} - -void OSD::load_pgs() -{ - dout(10) << "load_pgs" << endl; - assert(pg_map.empty()); - - list ls; - store->list_collections(ls); - - for (list::iterator it = ls.begin(); - it != ls.end(); - it++) { - pg_t pgid = *it; - - PG *pg = 0; - if (pgid.is_rep()) - new ReplicatedPG(this, pgid); - else if (pgid.is_raid4()) - assert(0); //new RAID4PG(this, pgid); - else - assert(0); - pg_map[pgid] = pg; - - // read pg info - store->collection_getattr(pgid, "info", &pg->info, sizeof(pg->info)); - - // read pg log - pg->read_log(store); - - // generate state for current mapping - int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting); - int role = osdmap->calc_pg_role(whoami, pg->acting, nrep); - pg->set_role(role); - - dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl; - } -} - - - -/** - * check epochs starting from start to verify the pg acting set hasn't changed - * up until now - */ -void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from) -{ - dout(15) << "project_pg_history " << pgid - << " from " << from << " to " << osdmap->get_epoch() - << ", start " << h - << endl; - - vector last; - osdmap->pg_to_acting_osds(pgid, last); - - for (epoch_t e = osdmap->get_epoch()-1; - e >= from; - e--) { - // verify during intermediate epoch - OSDMap oldmap; - get_map(e, oldmap); - - vector acting; - oldmap.pg_to_acting_osds(pgid, acting); - - // acting set change? - if (acting != last && - e <= h.same_since) { - dout(15) << "project_pg_history " << pgid << " changed in " << e+1 - << " from " << acting << " -> " << last << endl; - h.same_since = e+1; - } - - // primary change? - if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) && - e <= h.same_primary_since) { - dout(15) << "project_pg_history " << pgid << " primary changed in " << e+1 << endl; - h.same_primary_since = e+1; - - if (g_conf.osd_rep == OSD_REP_PRIMARY) - h.same_acker_since = h.same_primary_since; - } - - // acker change? - if (g_conf.osd_rep != OSD_REP_PRIMARY) { - if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) && - e <= h.same_acker_since) { - dout(15) << "project_pg_history " << pgid << " acker changed in " << e+1 << endl; - h.same_acker_since = e+1; - } - } - - if (h.same_since > e && - h.same_primary_since > e && - h.same_acker_since > e) break; - } - - dout(15) << "project_pg_history end " << h << endl; -} - /** do_notifies * Send an MOSDPGNotify to a primary, with a list of PGs that I have @@ -1608,7 +1619,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) } // ok, create PG! - pg = create_pg(pgid, t); + pg = _create_lock_pg(pgid, t); osdmap->pg_to_acting_osds(pgid, pg->acting); pg->set_role(0); pg->info.history = history; @@ -1625,8 +1636,6 @@ void OSD::handle_pg_notify(MOSDPGNotify *m) take_waiters(waiting_for_pg[pgid]); waiting_for_pg.erase(pgid); } - - _lock_pg(pgid); } else { // already had it. am i (still) the primary? pg = _lock_pg(pgid); @@ -1809,7 +1818,7 @@ void OSD::handle_pg_query(MOSDPGQuery *m) assert(role > 0); ObjectStore::Transaction t; - pg = create_pg(pgid, t); + pg = _create_lock_pg(pgid, t); pg->acting.swap( acting ); pg->set_role(role); pg->info.history = history; @@ -1818,7 +1827,6 @@ void OSD::handle_pg_query(MOSDPGQuery *m) store->apply_transaction(t); dout(10) << *pg << " dne (before), but i am role " << role << endl; - _lock_pg(pgid); } else { pg = _lock_pg(pgid); @@ -1908,11 +1916,7 @@ void OSD::handle_pg_remove(MOSDPGRemove *m) dout(10) << *pg << " removing." << endl; assert(pg->get_role() == -1); - _remove_pg(pgid); - - // unlock. there shouldn't be any waiters, since we're a stray, and pg is presumably clean0. - assert(pg_lock_waiters.count(pgid) == 0); - _unlock_pg(pgid); + _remove_unlock_pg(pg); } delete m; @@ -1929,7 +1933,7 @@ void OSD::handle_pg_remove(MOSDPGRemove *m) void OSD::handle_op(MOSDOp *op) { const pg_t pgid = op->get_pg(); - PG *pg = get_pg(pgid); + PG *pg = _have_pg(pgid) ? _lock_pg(pgid):0; logger->set("buf", buffer_total_alloc); @@ -1940,7 +1944,10 @@ void OSD::handle_op(MOSDOp *op) // require same or newer map - if (!require_same_or_newer_map(op, op->get_map_epoch())) return; + if (!require_same_or_newer_map(op, op->get_map_epoch())) { + _unlock_pg(pgid); + return; + } // share our map with sender, if they're old _share_map_incoming(op->get_source_inst(), op->get_map_epoch()); @@ -1961,6 +1968,7 @@ void OSD::handle_op(MOSDOp *op) << pgid << ", waiting" << endl; waiting_for_pg[pgid].push_back(op); + _unlock_pg(pgid); return; } @@ -1970,6 +1978,7 @@ void OSD::handle_op(MOSDOp *op) << " after " << op->get_map_epoch() << ", dropping" << endl; assert(op->get_map_epoch() < osdmap->get_epoch()); + _unlock_pg(pgid); delete op; return; } @@ -1979,6 +1988,7 @@ void OSD::handle_op(MOSDOp *op) << " after " << op->get_map_epoch() << ", dropping" << endl; assert(op->get_map_epoch() < osdmap->get_epoch()); + _unlock_pg(pgid); delete op; return; } @@ -1991,6 +2001,7 @@ void OSD::handle_op(MOSDOp *op) dout(7) << *pg << " queueing replay at " << op->get_version() << " for " << *op << endl; pg->replay_queue[op->get_version()] = op; + _unlock_pg(pgid); return; } else { dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update @@ -2001,12 +2012,14 @@ void OSD::handle_op(MOSDOp *op) dout(7) << *pg << " not active (yet)" << endl; pg->waiting_for_active.push_back(op); + _unlock_pg(pgid); return; } // missing object? if (pg->is_missing_object(op->get_oid())) { pg->wait_for_missing_object(op->get_oid(), op); + _unlock_pg(pgid); return; } /* @@ -2054,6 +2067,7 @@ void OSD::handle_op(MOSDOp *op) int peer = pg->acting[1]; dout(-10) << "fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << endl; messenger->send_message(op, osdmap->get_inst(peer)); + _unlock_pg(pgid); return; } } @@ -2077,6 +2091,7 @@ void OSD::handle_op(MOSDOp *op) << " osd" << peer << endl; messenger->send_message(op, osdmap->get_inst(peer)); + _unlock_pg(pgid); return; } } @@ -2101,6 +2116,7 @@ void OSD::handle_op(MOSDOp *op) dout(10) << "handle_rep_op pg changed " << pg->info.history << " after " << op->get_map_epoch() << ", dropping" << endl; + _unlock_pg(pgid); delete op; return; } @@ -2110,10 +2126,17 @@ void OSD::handle_op(MOSDOp *op) } if (g_conf.osd_maxthreads < 1) { - _lock_pg(pgid); - do_op(op, pg); // do it now + + if (op->get_type() == MSG_OSD_OP) + pg->do_op((MOSDOp*)op); // do it now + else if (op->get_type() == MSG_OSD_OPREPLY) + pg->do_op_reply((MOSDOpReply*)op); + else + assert(0); + _unlock_pg(pgid); } else { + _unlock_pg(pgid); // queue for worker threads /*if (read) enqueue_op(0, op); // no locking needed for reads @@ -2136,7 +2159,6 @@ void OSD::handle_op_reply(MOSDOpReply *op) // make sure we have the pg const pg_t pgid = op->get_pg(); - PG *pg = get_pg(pgid); // require same or newer map if (!require_same_or_newer_map(op, op->get_map_epoch())) return; @@ -2144,14 +2166,15 @@ void OSD::handle_op_reply(MOSDOpReply *op) // share our map with sender, if they're old _share_map_incoming(op->get_source_inst(), op->get_map_epoch()); - if (!pg) { + if (!_have_pg(pgid)) { // hmm. delete op; - } + return; + } if (g_conf.osd_maxthreads < 1) { - _lock_pg(pgid); - do_op(op, pg); // do it now + PG *pg = _lock_pg(pgid); + pg->do_op_reply(op); // do it now _unlock_pg(pgid); } else { enqueue_op(pgid, op); // queue for worker threads @@ -2211,7 +2234,12 @@ void OSD::dequeue_op(pg_t pgid) osd_lock.Unlock(); // do it - do_op(op, pg); + if (op->get_type() == MSG_OSD_OP) + pg->do_op((MOSDOp*)op); // do it now + else if (op->get_type() == MSG_OSD_OPREPLY) + pg->do_op_reply((MOSDOpReply*)op); + else + assert(0); // finish osd_lock.Lock(); @@ -2237,93 +2265,6 @@ void OSD::dequeue_op(pg_t pgid) -/** do_op - do an op - * object lock will be held (if multithreaded) - * osd_lock NOT held. - */ -void OSD::do_op(Message *m, PG *pg) -{ - //dout(15) << "do_op " << *m << endl; - - if (m->get_type() == MSG_OSD_OP) { - MOSDOp *op = (MOSDOp*)m; - - logger->inc("op"); - - switch (op->get_op()) { - - // reads - case OSD_OP_READ: - if (block_if_wrlocked(op)) - return; - pg->op_read(op); - break; - case OSD_OP_STAT: - if (block_if_wrlocked(op)) - return; - pg->op_stat(op); - break; - - // rep stuff - case OSD_OP_PULL: - pg->op_pull(op); - break; - case OSD_OP_PUSH: - pg->op_push(op); - break; - - // writes - case OSD_OP_WRNOOP: - case OSD_OP_WRITE: - case OSD_OP_ZERO: - case OSD_OP_DELETE: - case OSD_OP_TRUNCATE: - case OSD_OP_WRLOCK: - case OSD_OP_WRUNLOCK: - case OSD_OP_RDLOCK: - case OSD_OP_RDUNLOCK: - case OSD_OP_UPLOCK: - case OSD_OP_DNLOCK: - if (op->get_source().is_osd()) { - pg->op_rep_modify(op); - } else { - // locked by someone else? - // for _any_ op type -- eg only the locker can unlock! - if (op->get_op() != OSD_OP_WRNOOP && // except WRNOOP; we just want to flush - block_if_wrlocked(op)) - return; // op will be handled later, after the object unlocks - - // share latest osd map with rest of pg? - osd_lock.Lock(); - { - for (unsigned i=1; iacting.size(); i++) { - int osd = pg->acting[i]; - _share_map_outgoing( osdmap->get_inst(osd) ); - } - } - osd_lock.Unlock(); - - // go go gadget pg - pg->op_modify(op); - - if (op->get_op() == OSD_OP_WRITE) { - logger->inc("c_wr"); - logger->inc("c_wrb", op->get_length()); - } - } - break; - - default: - assert(0); - } - } - else if (m->get_type() == MSG_OSD_OPREPLY) { - pg->op_reply((MOSDOpReply*)m); - } else - assert(0); -} - - void OSD::wait_for_no_ops() { diff --git a/branches/sage/pgs/osd/OSD.h b/branches/sage/pgs/osd/OSD.h index 1fa0752712a32..df9c710f3dd88 100644 --- a/branches/sage/pgs/osd/OSD.h +++ b/branches/sage/pgs/osd/OSD.h @@ -25,25 +25,50 @@ #include "ObjectStore.h" #include "PG.h" +#include "messages/MOSDOp.h" + + #include using namespace std; + #include #include using namespace __gnu_cxx; -#include "messages/MOSDOp.h" class Messenger; class Message; +class Logger; +class ObjectStore; +class OSDMap; - - class OSD : public Dispatcher { public: + // -- states -- + static const int STATE_BOOTING = 1; + static const int STATE_ACTIVE = 2; + static const int STATE_STOPPING = 3; - /** superblock - */ + + /** OSD **/ +protected: + Mutex osd_lock; // global lock + SafeTimer timer; // safe timer + + Messenger *messenger; + Logger *logger; + ObjectStore *store; + MonMap *monmap; + + int whoami; + char dev_path[100]; + +public: + int get_nodeid() { return whoami; } + +private: + /** superblock **/ OSDSuperblock superblock; epoch_t boot_epoch; @@ -55,29 +80,15 @@ public: int read_superblock(); - /** OSD **/ - protected: - Messenger *messenger; - int whoami; - - static const int STATE_BOOTING = 1; - static const int STATE_ACTIVE = 2; - static const int STATE_STOPPING = 3; - + // -- state -- int state; +public: bool is_booting() { return state == STATE_BOOTING; } bool is_active() { return state == STATE_ACTIVE; } bool is_stopping() { return state == STATE_STOPPING; } - - MonMap *monmap; - - class Logger *logger; - - // local store - char dev_path[100]; - class ObjectStore *store; +private: // heartbeat void heartbeat(); @@ -91,9 +102,6 @@ public: } }; - // global lock - Mutex osd_lock; - SafeTimer timer; // -- stats -- int hb_stat_ops; // ops since last heartbeat @@ -101,26 +109,21 @@ public: hash_map peer_qlen; - // per-pg locking (serializing) - hash_set pg_lock; - hash_map > pg_lock_waiters; - PG *lock_pg(pg_t pgid); - PG *_lock_pg(pg_t pgid); - void unlock_pg(pg_t pgid); - void _unlock_pg(pg_t pgid); - // finished waiting messages, that will go at tail of dispatch() + // -- waiters -- list finished; + void take_waiters(list& ls) { finished.splice(finished.end(), ls); } - // object locking - hash_map > waiting_for_wr_unlock; /** list of operations for each object waiting for 'wrunlock' */ - + // -- object locking -- + hash_map > waiting_for_wr_unlock; + bool block_if_wrlocked(MOSDOp* op); - // -- ops -- + + // -- op queue -- class ThreadPool *threadpool; hash_map > op_queue; int pending_ops; @@ -136,26 +139,17 @@ public: o->dequeue_op(pgid); }; - void do_op(Message *m, PG *pg); // actually do it - void prepare_log_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version, - objectrev_t crev, objectrev_t rev, PG *pg, eversion_t trim_to); - void prepare_op_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version, - objectrev_t crev, objectrev_t rev, PG *pg); - - bool waitfor_missing_object(MOSDOp *op, PG *pg); - - - friend class PG; friend class ReplicatedPG; - friend class C_OSD_WriteCommit; + friend class RAID4PG; + protected: // -- osd map -- - class OSDMap *osdmap; - list waiting_for_osdmap; + OSDMap *osdmap; + list waiting_for_osdmap; hash_map peer_map_epoch; // FIXME types bool _share_map_incoming(const entity_inst_t& inst, epoch_t epoch); @@ -176,18 +170,23 @@ public: - // -- replication -- + // -- placement groups -- + hash_map pg_map; + hash_map > waiting_for_pg; - // PG - hash_map pg_map; - void load_pgs(); - bool pg_exists(pg_t pg); - PG *create_pg(pg_t pg, ObjectStore::Transaction& t); // create new PG - PG *get_pg(pg_t pg); // return existing PG, or null - void _remove_pg(pg_t pg); // remove from store and memory + // per-pg locking (serializes AND acquired pg lock) + hash_set pg_lock; + hash_map > pg_lock_waiters; + + PG *_lock_pg(pg_t pgid); + void _unlock_pg(pg_t pgid); - void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from); + PG *_create_lock_pg(pg_t pg, ObjectStore::Transaction& t); // create new PG + bool _have_pg(pg_t pgid); + void _remove_unlock_pg(PG *pg); // remove from store and memory + void load_pgs(); + void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from); void activate_pg(pg_t pgid, epoch_t epoch); class C_Activate : public Context { @@ -202,10 +201,9 @@ public: }; + // -- tids -- + // for ops i issue tid_t last_tid; - int num_pulling; - - hash_map > waiting_for_pg; Mutex tid_lock; tid_t get_tid() { @@ -216,18 +214,14 @@ public: return t; } - - //void handle_rep_op_ack(MOSDOpReply *m); - // recovery + // -- generic pg recovery -- + int num_pulling; + void do_notifies(map< int, list >& notify_list); void do_queries(map< int, map >& query_map); void repeer(PG *pg, map< int, map >& query_map); - /* - void pull(PG *pg, object_t oid); - void push(PG *pg, object_t oid, int dest); - */ bool require_current_map(Message *m, epoch_t v); bool require_same_or_newer_map(Message *m, epoch_t e); @@ -236,23 +230,11 @@ public: void handle_pg_log(class MOSDPGLog *m); void handle_pg_remove(class MOSDPGRemove *m); - /* - void op_pull(class MOSDOp *op, PG *pg); - void op_push(class MOSDOp *op, PG *pg); - void op_rep_modify(class MOSDOp *op, PG *pg); // write, trucnate, delete - void op_rep_modify_commit(class MOSDOp *op, int ackerosd, - eversion_t last_complete); - */ - - friend class C_OSD_RepModifyCommit; - public: OSD(int id, Messenger *m, MonMap *mm, char *dev = 0); ~OSD(); - int get_nodeid() { return whoami; } - // startup/shutdown int init(); int shutdown(); @@ -263,13 +245,6 @@ public: void handle_osd_ping(class MOSDPing *m); void handle_op(class MOSDOp *m); - - void op_read(class MOSDOp *m);//, PG *pg); - void op_stat(class MOSDOp *m);//, PG *pg); - void op_modify(class MOSDOp *m, PG *pg); - void op_modify_commit(pg_t pgid, tid_t rep_tid, eversion_t pg_complete_thru); - - // for replication void handle_op_reply(class MOSDOpReply *m); void force_remount(); diff --git a/branches/sage/pgs/osd/OSDMap.h b/branches/sage/pgs/osd/OSDMap.h index bc8027007047a..58085e7da4843 100644 --- a/branches/sage/pgs/osd/OSDMap.h +++ b/branches/sage/pgs/osd/OSDMap.h @@ -264,7 +264,7 @@ private: /**** mapping facilities ****/ // oid -> pg - pg_t object_to_pg(object_t oid, FileLayout& layout) { + ObjectLayout file_to_object_layout(object_t oid, FileLayout& layout) { static crush::Hash H(777); // calculate ps (placement seed) @@ -299,8 +299,9 @@ private: assert(0); } - // construct pg - return pg_t(layout.pg_type, layout.pg_size, ps, layout.preferred); + // construct object layout + return ObjectLayout(pg_t(layout.pg_type, layout.pg_size, ps, layout.preferred), + layout.object_stripe_unit); } diff --git a/branches/sage/pgs/osd/PG.cc b/branches/sage/pgs/osd/PG.cc index 1d27afee541ca..7421c6cfb2d87 100644 --- a/branches/sage/pgs/osd/PG.cc +++ b/branches/sage/pgs/osd/PG.cc @@ -1196,3 +1196,6 @@ bool PG::pick_object_rev(object_t& oid) return false; } + + + diff --git a/branches/sage/pgs/osd/PG.h b/branches/sage/pgs/osd/PG.h index 32e511fdb73b7..d1c5c34a6a7f9 100644 --- a/branches/sage/pgs/osd/PG.h +++ b/branches/sage/pgs/osd/PG.h @@ -379,9 +379,47 @@ public: static const int STATE_STRAY = 16; // i must notify the primary i exist. - protected: +protected: OSD *osd; + /** locking and reference counting. + * I destroy myself when the reference count hits zero. + * lock() should be called before doing anything. + * get() should be called on pointer copy (to another thread, etc.). + * put() should be called on destruction of some previously copied pointer. + * put_unlock() when done with the current pointer (_most common_). + */ + Mutex _lock; + int ref; + bool deleted; + +public: + void lock() { + //cout << info.pgid << " lock" << endl; + _lock.Lock(); + } + void get() { + //cout << info.pgid << " get " << ref << endl; + assert(_lock.is_locked()); + ++ref; + } + void put() { + //cout << info.pgid << " put " << ref << endl; + assert(_lock.is_locked()); + --ref; + assert(ref > 0); // last put must be a put_unlock. + } + void put_unlock() { + //cout << info.pgid << " put_unlock " << ref << endl; + assert(_lock.is_locked()); + --ref; + _lock.Unlock(); + if (ref == 0) delete this; + } + + void mark_deleted() { deleted = true; } + bool is_deleted() { return deleted; } + public: // pg state Info info; @@ -481,12 +519,12 @@ public: return 0; } - virtual void op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete) = 0; friend class C_OSD_RepModify_Commit; public: PG(OSD *o, pg_t p) : osd(o), + ref(0), deleted(false), info(p), role(0), state(0), @@ -558,16 +596,9 @@ public: - - // abstract bits - virtual void op_stat(MOSDOp *op) = 0; - virtual int op_read(MOSDOp *op) = 0; - virtual void op_modify(MOSDOp *op) = 0; - virtual void op_rep_modify(MOSDOp *op) = 0; - virtual void op_push(MOSDOp *op) = 0; - virtual void op_pull(MOSDOp *op) = 0; - virtual void op_reply(MOSDOpReply *op) = 0; + virtual void do_op(MOSDOp *op) = 0; + virtual void do_op_reply(MOSDOpReply *op) = 0; virtual bool same_for_read_since(epoch_t e) = 0; virtual bool same_for_modify_since(epoch_t e) = 0; diff --git a/branches/sage/pgs/osd/RAID4PG.cc b/branches/sage/pgs/osd/RAID4PG.cc index d1ad6c93fa8f9..e63fd1b1433ae 100644 --- a/branches/sage/pgs/osd/RAID4PG.cc +++ b/branches/sage/pgs/osd/RAID4PG.cc @@ -33,5 +33,92 @@ +void RAID4PG::do_op(MOSDOp *op) +{ + + +} + + + +void RAID4PG::do_op_reply(MOSDOpReply *reply) +{ + +} + + + +// ----------------- +// pg changes + +bool RAID4PG::same_for_read_since(epoch_t e) +{ + return e >= info.history.same_since; // whole pg set same +} + +bool RAID4PG::same_for_modify_since(epoch_t e) +{ + return e >= info.history.same_since; // whole pg set same +} + +bool RAID4PG::same_for_rep_modify_since(epoch_t e) +{ + return e >= info.history.same_since; // whole pg set same +} + + +// ----------------- +// RECOVERY + +bool RAID4PG::is_missing_object(object_t oid) +{ + return false; +} + +void RAID4PG::wait_for_missing_object(object_t oid, MOSDOp *op) +{ + assert(0); +} + +void RAID4PG::note_failed_osd(int o) +{ + dout(10) << "note_failed_osd osd" << o << endl; + assert(0); +} + +void RAID4PG::on_acker_change() +{ + dout(10) << "on_acker_change" << endl; + assert(0); +} + + +void RAID4PG::on_role_change() +{ + dout(10) << "on_role_change" << endl; + assert(0); +} + + +void RAID4PG::clean_up_local(ObjectStore::Transaction&) +{ +} + +void RAID4PG::cancel_recovery() +{ + assert(0); +} + +bool RAID4PG::do_recovery() +{ + assert(0); + return false; +} + +void RAID4PG::clean_replicas() +{ + assert(0); +} + diff --git a/branches/sage/pgs/osd/RAID4PG.h b/branches/sage/pgs/osd/RAID4PG.h index 2a6f3a8a14889..9c75118c06069 100644 --- a/branches/sage/pgs/osd/RAID4PG.h +++ b/branches/sage/pgs/osd/RAID4PG.h @@ -32,9 +32,6 @@ protected: void prepare_op_transaction(ObjectStore::Transaction& t, MOSDOp *op, eversion_t& version, objectrev_t crev, objectrev_t rev); - -public: - RAID4PG(OSD *o, pg_t p) : PG(o,p) { } void op_stat(MOSDOp *op); int op_read(MOSDOp *op); @@ -43,7 +40,13 @@ public: void op_push(MOSDOp *op); void op_pull(MOSDOp *op); - void op_reply(MOSDOpReply *r); + + +public: + RAID4PG(OSD *o, pg_t p) : PG(o,p) { } + + void do_op(MOSDOp *op); + void do_op_reply(MOSDOpReply *r); bool same_for_read_since(epoch_t e); bool same_for_modify_since(epoch_t e); @@ -57,6 +60,14 @@ public: void on_acker_change(); void on_role_change(); + void clean_up_local(ObjectStore::Transaction& t); + + void cancel_recovery(); + bool do_recovery(); + + void clean_replicas(); + + }; diff --git a/branches/sage/pgs/osd/ReplicatedPG.cc b/branches/sage/pgs/osd/ReplicatedPG.cc index 967b80f7353b3..3d2a88a90b08d 100644 --- a/branches/sage/pgs/osd/ReplicatedPG.cc +++ b/branches/sage/pgs/osd/ReplicatedPG.cc @@ -94,6 +94,91 @@ void ReplicatedPG::wait_for_missing_object(object_t oid, MOSDOp *op) + +/** do_op - do an op + * pg lock will be held (if multithreaded) + * osd_lock NOT held. + */ +void ReplicatedPG::do_op(MOSDOp *op) +{ + //dout(15) << "do_op " << *op << endl; + + osd->logger->inc("op"); + + switch (op->get_op()) { + + // reads + case OSD_OP_READ: + if (osd->block_if_wrlocked(op)) + return; + op_read(op); + break; + case OSD_OP_STAT: + if (osd->block_if_wrlocked(op)) + return; + op_stat(op); + break; + + // rep stuff + case OSD_OP_PULL: + op_pull(op); + break; + case OSD_OP_PUSH: + op_push(op); + break; + + // writes + case OSD_OP_WRNOOP: + case OSD_OP_WRITE: + case OSD_OP_ZERO: + case OSD_OP_DELETE: + case OSD_OP_TRUNCATE: + case OSD_OP_WRLOCK: + case OSD_OP_WRUNLOCK: + case OSD_OP_RDLOCK: + case OSD_OP_RDUNLOCK: + case OSD_OP_UPLOCK: + case OSD_OP_DNLOCK: + if (op->get_source().is_osd()) { + op_rep_modify(op); + } else { + // go go gadget pg + op_modify(op); + + if (op->get_op() == OSD_OP_WRITE) { + osd->logger->inc("c_wr"); + osd->logger->inc("c_wrb", op->get_length()); + } + } + break; + + default: + assert(0); + } +} + +void ReplicatedPG::do_op_reply(MOSDOpReply *r) +{ + // must be replication. + tid_t rep_tid = r->get_rep_tid(); + + if (rep_gather.count(rep_tid)) { + // oh, good. + int fromosd = r->get_source().num(); + repop_ack(rep_gather[rep_tid], + r->get_result(), r->get_commit(), + fromosd, + r->get_pg_complete_thru()); + delete r; + } else { + // early ack. + waiting_for_repop[rep_tid].push_back(r); + } +} + + + + // ======================================================================== // READS @@ -358,19 +443,19 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, // ======================================================================== // rep op gather -class C_OSD_WriteCommit : public Context { +class C_OSD_ModifyCommit : public Context { public: - OSD *osd; - pg_t pgid; + ReplicatedPG *pg; tid_t rep_tid; eversion_t pg_last_complete; - C_OSD_WriteCommit(OSD *o, pg_t p, tid_t rt, eversion_t lc) : osd(o), pgid(p), rep_tid(rt), pg_last_complete(lc) {} + C_OSD_ModifyCommit(ReplicatedPG *p, tid_t rt, eversion_t lc) : pg(p), rep_tid(rt), pg_last_complete(lc) { + pg->get(); // we're copying the pointer + } void finish(int r) { - ReplicatedPG *pg = (ReplicatedPG*)(osd->_lock_pg(pgid)); - if (pg) { + pg->lock(); + if (!pg->is_deleted()) pg->op_modify_commit(rep_tid, pg_last_complete); - osd->_unlock_pg(pg->info.pgid); - } + pg->put_unlock(); } }; @@ -386,7 +471,7 @@ void ReplicatedPG::apply_repop(RepGather *repop) dout(10) << "apply_repop applying update on " << *repop << endl; assert(!repop->applied); - Context *oncommit = new C_OSD_WriteCommit(osd, info.pgid, repop->rep_tid, repop->pg_local_last_complete); + Context *oncommit = new C_OSD_ModifyCommit(this, repop->rep_tid, repop->pg_local_last_complete); unsigned r = osd->store->apply_transaction(repop->t, oncommit); if (r) dout(-10) << "apply_repop apply transaction return " << r << " on " << *repop << endl; @@ -474,7 +559,7 @@ void ReplicatedPG::issue_repop(MOSDOp *op, int dest) // forward the write/update/whatever MOSDOp *wr = new MOSDOp(op->get_client_inst(), op->get_client_inc(), op->get_reqid().tid, oid, - info.pgid, + ObjectLayout(info.pgid), osd->osdmap->get_epoch(), op->get_op()); wr->get_data() = op->get_data(); // _copy_ bufferlist @@ -662,7 +747,7 @@ objectrev_t ReplicatedPG::assign_version(MOSDOp *op) // commit (to disk) callback class C_OSD_RepModifyCommit : public Context { public: - OSD *osd; + ReplicatedPG *pg; MOSDOp *op; int destosd; @@ -673,9 +758,11 @@ public: bool acked; bool waiting; - C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo, int dosd, eversion_t lc) : - osd(o), op(oo), destosd(dosd), pg_last_complete(lc), - acked(false), waiting(false) { } + C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDOp *oo, int dosd, eversion_t lc) : + pg(p), op(oo), destosd(dosd), pg_last_complete(lc), + acked(false), waiting(false) { + pg->get(); // we're copying the pointer. + } void finish(int r) { lock.Lock(); assert(!waiting); @@ -686,9 +773,9 @@ public: assert(acked); lock.Unlock(); - PG *pg = osd->lock_pg(op->get_pg()); + pg->lock(); pg->op_rep_modify_commit(op, destosd, pg_last_complete); - osd->unlock_pg(op->get_pg()); + pg->put_unlock(); } void ack() { lock.Lock(); @@ -710,6 +797,22 @@ void ReplicatedPG::op_modify(MOSDOp *op) object_t oid = op->get_oid(); const char *opname = MOSDOp::get_opname(op->get_op()); + // locked by someone else? + // for _any_ op type -- eg only the locker can unlock! + if (op->get_op() != OSD_OP_WRNOOP && // except WRNOOP; we just want to flush + osd->block_if_wrlocked(op)) + return; // op will be handled later, after the object unlocks + + // share latest osd map with rest of pg? + osd->osd_lock.Lock(); + { + for (unsigned i=1; i_share_map_outgoing( osd->osdmap->get_inst(acting[i]) ); + } + } + osd->osd_lock.Unlock(); + + // dup op? if (is_dup(op->get_reqid())) { dout(-3) << "op_modify " << opname << " dup op " << op->get_reqid() @@ -793,7 +896,7 @@ void ReplicatedPG::op_modify(MOSDOp *op) prepare_log_transaction(t, op, nv, crev, op->get_rev(), peers_complete_thru); prepare_op_transaction(t, op, nv, crev, op->get_rev()); - C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(osd, op, get_acker(), + C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, get_acker(), info.last_complete); unsigned r = osd->store->apply_transaction(t, oncommit); if (r != 0 && // no errors @@ -897,7 +1000,7 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op) prepare_op_transaction(t, op, nv, crev, op->get_rev()); } - oncommit = new C_OSD_RepModifyCommit(osd, op, ackerosd, info.last_complete); + oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete); // apply log update. and possibly update itself. unsigned tr = osd->store->apply_transaction(t, oncommit); @@ -1152,25 +1255,6 @@ void ReplicatedPG::op_push(MOSDOp *op) -void ReplicatedPG::op_reply(MOSDOpReply *r) -{ - // must be replication. - tid_t rep_tid = r->get_rep_tid(); - - if (rep_gather.count(rep_tid)) { - // oh, good. - int fromosd = r->get_source().num(); - repop_ack(rep_gather[rep_tid], - r->get_result(), r->get_commit(), - fromosd, - r->get_pg_complete_thru()); - delete r; - } else { - // early ack. - waiting_for_repop[rep_tid].push_back(r); - } -} - void ReplicatedPG::note_failed_osd(int o) diff --git a/branches/sage/pgs/osd/ReplicatedPG.h b/branches/sage/pgs/osd/ReplicatedPG.h index 555bf0580fcce..38b287e065ea0 100644 --- a/branches/sage/pgs/osd/ReplicatedPG.h +++ b/branches/sage/pgs/osd/ReplicatedPG.h @@ -100,7 +100,8 @@ protected: MOSDOp *op, eversion_t& version, objectrev_t crev, objectrev_t rev); - friend class C_OSD_WriteCommit; + friend class C_OSD_ModifyCommit; + friend class C_OSD_RepModifyCommit; // pg on-disk content @@ -113,6 +114,13 @@ protected: void clean_replicas(); + void op_stat(MOSDOp *op); + int op_read(MOSDOp *op); + void op_modify(MOSDOp *op); + void op_rep_modify(MOSDOp *op); + void op_push(MOSDOp *op); + void op_pull(MOSDOp *op); + @@ -123,14 +131,9 @@ public: { } ~ReplicatedPG() {} - void op_stat(MOSDOp *op); - int op_read(MOSDOp *op); - void op_modify(MOSDOp *op); - void op_rep_modify(MOSDOp *op); - void op_push(MOSDOp *op); - void op_pull(MOSDOp *op); - void op_reply(MOSDOpReply *r); + void do_op(MOSDOp *op); + void do_op_reply(MOSDOpReply *r); bool same_for_read_since(epoch_t e); bool same_for_modify_since(epoch_t e); diff --git a/branches/sage/pgs/osd/osd_types.h b/branches/sage/pgs/osd/osd_types.h index 6ca4fe5839822..d3d34896eddfe 100644 --- a/branches/sage/pgs/osd/osd_types.h +++ b/branches/sage/pgs/osd/osd_types.h @@ -110,7 +110,7 @@ inline ostream& operator<<(ostream& out, pg_t pg) out << pg.preferred() << 'p'; out << hex << pg.ps() << dec; - out << "=" << hex << (__uint64_t)pg << dec; + //out << "=" << hex << (__uint64_t)pg << dec; return out; } @@ -126,7 +126,27 @@ namespace __gnu_cxx { } +/** ObjectLayout + * + * describes an object's placement and layout in the storage cluster. + * most importatly, which pg it belongs to. + * if that pg is raided, it also specifies the object's stripe_unit. + */ +struct ObjectLayout { + pg_t pgid; // what pg do i belong to + int stripe_unit; // for object raid in raid pgs + + ObjectLayout() : pgid(0), stripe_unit(0) { } + ObjectLayout(pg_t p, int su=0) : pgid(p), stripe_unit(su) { } +}; +inline ostream& operator<<(ostream& out, const ObjectLayout &ol) +{ + out << "pg" << ol.pgid; + if (ol.stripe_unit) + out << ".su=" << ol.stripe_unit; + return out; +} @@ -173,18 +193,19 @@ class ObjectExtent { size_t length; // in object objectrev_t rev; // which revision? - pg_t pgid; // where to find the object + + ObjectLayout layout; // object layout (pgid, etc.) map buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!) - ObjectExtent() : start(0), length(0), rev(0), pgid(0) {} - ObjectExtent(object_t o, off_t s=0, size_t l=0) : oid(o), start(s), length(l), rev(0), pgid(0) { } + ObjectExtent() : start(0), length(0), rev(0) {} + ObjectExtent(object_t o, off_t s=0, size_t l=0) : oid(o), start(s), length(l), rev(0) { } }; inline ostream& operator<<(ostream& out, ObjectExtent &ex) { return out << "extent(" - << ex.oid << " in " << hex << ex.pgid << dec + << ex.oid << " in " << ex.layout << " " << ex.start << "~" << ex.length << ")"; } diff --git a/branches/sage/pgs/osdc/Filer.cc b/branches/sage/pgs/osdc/Filer.cc index 368c9d3a9555d..240c5c0aef8d8 100644 --- a/branches/sage/pgs/osdc/Filer.cc +++ b/branches/sage/pgs/osdc/Filer.cc @@ -79,7 +79,7 @@ void Filer::_probe(Probe *probe) p++) { dout(10) << "_probe probing " << p->oid << endl; C_Probe *c = new C_Probe(this, probe, p->oid); - probe->ops[p->oid] = objecter->stat(p->oid, &c->size, c); + probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, c); } } @@ -192,7 +192,7 @@ void Filer::file_to_extents(inode_t inode, ex = &object_extents[oid]; ex->oid = oid; ex->rev = rev; - ex->pgid = objecter->osdmap->object_to_pg( oid, inode.layout ); + ex->layout = objecter->osdmap->file_to_object_layout( oid, inode.layout ); } // map range into object @@ -219,7 +219,7 @@ void Filer::file_to_extents(inode_t inode, } ex->buffer_extents[cur-offset] = x_len; - dout(15) << "file_to_extents " << *ex << " in " << ex->pgid << endl; + dout(15) << "file_to_extents " << *ex << " in " << ex->layout << endl; //cout << "map: ino " << ino << " oid " << ex.oid << " osd " << ex.osd << " offset " << ex.offset << " len " << ex.len << " ... left " << left << endl; left -= x_len; diff --git a/branches/sage/pgs/osdc/ObjectCacher.cc b/branches/sage/pgs/osdc/ObjectCacher.cc index e2520f595096d..91a67165ae3c5 100644 --- a/branches/sage/pgs/osdc/ObjectCacher.cc +++ b/branches/sage/pgs/osdc/ObjectCacher.cc @@ -386,7 +386,7 @@ void ObjectCacher::bh_read(BufferHead *bh) C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length()); // go - objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), &onfinish->bl, + objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), &onfinish->bl, onfinish); } @@ -463,7 +463,7 @@ void ObjectCacher::bh_write(BufferHead *bh) C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_oid(), bh->start(), bh->length()); // go - tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->bl, + tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), bh->bl, onack, oncommit); // set bh last_write_tid @@ -701,7 +701,7 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish) dout(10) << "readx " << *ex_it << endl; // get Object cache - Object *o = get_object(ex_it->oid, ino); + Object *o = get_object(ex_it->oid, ino, ex_it->layout); // map extent into bufferheads map hits, missing, rx; @@ -826,7 +826,7 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino) ex_it != wr->extents.end(); ex_it++) { // get object cache - Object *o = get_object(ex_it->oid, ino); + Object *o = get_object(ex_it->oid, ino, ex_it->layout); // map it all into a single bufferhead. BufferHead *bh = o->map_write(wr); @@ -967,7 +967,7 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& for (map::iterator i = by_oid.begin(); i != by_oid.end(); i++) { - Object *o = get_object(i->first, ino); + Object *o = get_object(i->first, ino, i->second.layout); rdlock(o); } @@ -1008,7 +1008,7 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute // make sure we aren't already locking/locked... object_t oid = wr->extents.front().oid; Object *o = 0; - if (objects.count(oid)) o = get_object(oid, ino); + if (objects.count(oid)) o = get_object(oid, ino, wr->extents.front().layout); if (!o || (o->lock_state != Object::LOCK_WRLOCK && o->lock_state != Object::LOCK_WRLOCKING && @@ -1041,7 +1041,7 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute for (map::iterator i = by_oid.begin(); i != by_oid.end(); i++) { - Object *o = get_object(i->first, ino); + Object *o = get_object(i->first, ino, i->second.layout); wrlock(o); } @@ -1085,7 +1085,7 @@ void ObjectCacher::rdlock(Object *o) commit->tid = ack->tid = o->last_write_tid = - objecter->lock(OSD_OP_RDLOCK, o->get_oid(), ack, commit); + objecter->lock(OSD_OP_RDLOCK, o->get_oid(), o->get_layout(), ack, commit); } // stake our claim. @@ -1129,7 +1129,7 @@ void ObjectCacher::wrlock(Object *o) commit->tid = ack->tid = o->last_write_tid = - objecter->lock(op, o->get_oid(), ack, commit); + objecter->lock(op, o->get_oid(), o->get_layout(), ack, commit); } // stake our claim. @@ -1173,7 +1173,7 @@ void ObjectCacher::rdunlock(Object *o) commit->tid = lockack->tid = o->last_write_tid = - objecter->lock(OSD_OP_RDUNLOCK, o->get_oid(), lockack, commit); + objecter->lock(OSD_OP_RDUNLOCK, o->get_oid(), o->get_layout(), lockack, commit); } void ObjectCacher::wrunlock(Object *o) @@ -1206,7 +1206,7 @@ void ObjectCacher::wrunlock(Object *o) commit->tid = lockack->tid = o->last_write_tid = - objecter->lock(op, o->get_oid(), lockack, commit); + objecter->lock(op, o->get_oid(), o->get_layout(), lockack, commit); } diff --git a/branches/sage/pgs/osdc/ObjectCacher.h b/branches/sage/pgs/osdc/ObjectCacher.h index 27b154023209d..e435913e96009 100644 --- a/branches/sage/pgs/osdc/ObjectCacher.h +++ b/branches/sage/pgs/osdc/ObjectCacher.h @@ -99,6 +99,7 @@ class ObjectCacher { object_t oid; // this _always_ is oid.rev=0 inodeno_t ino; objectrev_t rev; // last rev we're written + ObjectLayout layout; public: map data; @@ -127,9 +128,9 @@ class ObjectCacher { int rdlock_ref; // how many ppl want or are using a READ lock public: - Object(ObjectCacher *_oc, object_t o, inodeno_t i) : + Object(ObjectCacher *_oc, object_t o, inodeno_t i, ObjectLayout& l) : oc(_oc), - oid(o), ino(i), + oid(o), ino(i), layout(l), last_write_tid(0), last_ack_tid(0), last_commit_tid(0), lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0) {} @@ -137,6 +138,9 @@ class ObjectCacher { object_t get_oid() { return oid; } inodeno_t get_ino() { return ino; } + ObjectLayout& get_layout() { return layout; } + void set_layout(ObjectLayout& l) { layout = l; } + bool can_close() { return data.empty() && lock_state == LOCK_NONE && waitfor_ack.empty() && waitfor_commit.empty() && @@ -216,13 +220,13 @@ class ObjectCacher { // objects - Object *get_object(object_t oid, inodeno_t ino) { + Object *get_object(object_t oid, inodeno_t ino, ObjectLayout &l) { // have it? if (objects.count(oid)) return objects[oid]; // create it. - Object *o = new Object(this, oid, ino); + Object *o = new Object(this, oid, ino, l); objects[oid] = o; objects_by_ino[ino].insert(o); return o; diff --git a/branches/sage/pgs/osdc/Objecter.cc b/branches/sage/pgs/osdc/Objecter.cc index c531a840803b1..b24b10393e48a 100644 --- a/branches/sage/pgs/osdc/Objecter.cc +++ b/branches/sage/pgs/osdc/Objecter.cc @@ -251,12 +251,12 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m) // stat ----------------------------------- -tid_t Objecter::stat(object_t oid, off_t *size, Context *onfinish, +tid_t Objecter::stat(object_t oid, off_t *size, ObjectLayout& ol, Context *onfinish, objectrev_t rev) { OSDStat *st = new OSDStat(size); st->extents.push_back(ObjectExtent(oid, 0, 0)); - st->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); + st->extents.front().layout = ol; st->extents.front().rev = rev; st->onfinish = onfinish; @@ -267,17 +267,18 @@ tid_t Objecter::stat_submit(OSDStat *st) { // find OSD ObjectExtent &ex = st->extents.front(); - PG &pg = get_pg( ex.pgid ); + PG &pg = get_pg( ex.layout.pgid ); // send last_tid++; assert(client_inc >= 0); MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, - ex.oid, ex.pgid, osdmap->get_epoch(), + ex.oid, ex.layout, osdmap->get_epoch(), OSD_OP_STAT); + dout(10) << "stat_submit " << st << " tid " << last_tid << " oid " << ex.oid - << " pg " << ex.pgid + << " " << ex.layout << " osd" << pg.acker() << endl; @@ -350,13 +351,13 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m) // read ----------------------------------- -tid_t Objecter::read(object_t oid, off_t off, size_t len, bufferlist *bl, +tid_t Objecter::read(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist *bl, Context *onfinish, objectrev_t rev) { OSDRead *rd = new OSDRead(bl); rd->extents.push_back(ObjectExtent(oid, off, len)); - rd->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); + rd->extents.front().layout = ol; rd->extents.front().rev = rev; readx(rd, onfinish); return last_tid; @@ -379,20 +380,20 @@ tid_t Objecter::readx(OSDRead *rd, Context *onfinish) tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex) { // find OSD - PG &pg = get_pg( ex.pgid ); + PG &pg = get_pg( ex.layout.pgid ); // send last_tid++; assert(client_inc >= 0); MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid, - ex.oid, ex.pgid, osdmap->get_epoch(), + ex.oid, ex.layout, osdmap->get_epoch(), OSD_OP_READ); m->set_length(ex.length); m->set_offset(ex.start); dout(10) << "readx_submit " << rd << " tid " << last_tid << " oid " << ex.oid << " " << ex.start << "~" << ex.length << " (" << ex.buffer_extents.size() << " buffer fragments)" - << " pg " << ex.pgid + << " " << ex.layout << " osd" << pg.acker() << endl; @@ -576,13 +577,13 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m) // write ------------------------------------ -tid_t Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl, +tid_t Objecter::write(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist &bl, Context *onack, Context *oncommit, objectrev_t rev) { OSDWrite *wr = new OSDWrite(bl); wr->extents.push_back(ObjectExtent(oid, off, len)); - wr->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); + wr->extents.front().layout = ol; wr->extents.front().buffer_extents[0] = len; wr->extents.front().rev = rev; modifyx(wr, onack, oncommit); @@ -592,13 +593,13 @@ tid_t Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl, // zero -tid_t Objecter::zero(object_t oid, off_t off, size_t len, +tid_t Objecter::zero(object_t oid, off_t off, size_t len, ObjectLayout& ol, Context *onack, Context *oncommit, objectrev_t rev) { OSDModify *z = new OSDModify(OSD_OP_ZERO); z->extents.push_back(ObjectExtent(oid, off, len)); - z->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); + z->extents.front().layout = ol; z->extents.front().rev = rev; modifyx(z, onack, oncommit); return last_tid; @@ -607,12 +608,12 @@ tid_t Objecter::zero(object_t oid, off_t off, size_t len, // lock ops -tid_t Objecter::lock(int op, object_t oid, +tid_t Objecter::lock(int op, object_t oid, ObjectLayout& ol, Context *onack, Context *oncommit) { OSDModify *l = new OSDModify(op); l->extents.push_back(ObjectExtent(oid, 0, 0)); - l->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout ); + l->extents.front().layout = ol; modifyx(l, onack, oncommit); return last_tid; } @@ -639,7 +640,7 @@ tid_t Objecter::modifyx(OSDModify *wr, Context *onack, Context *oncommit) tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) { // find - PG &pg = get_pg( ex.pgid ); + PG &pg = get_pg( ex.layout.pgid ); // send tid_t tid; @@ -649,7 +650,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) tid = ++last_tid; MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid, - ex.oid, ex.pgid, osdmap->get_epoch(), + ex.oid, ex.layout, osdmap->get_epoch(), wr->op); m->set_length(ex.length); m->set_offset(ex.start); @@ -691,7 +692,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid) dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid << " oid " << ex.oid << " " << ex.start << "~" << ex.length - << " pg " << ex.pgid + << " " << ex.layout << " osd" << pg.primary() << endl; if (pg.primary() >= 0) diff --git a/branches/sage/pgs/osdc/Objecter.h b/branches/sage/pgs/osdc/Objecter.h index f74081eafc312..4887ed08648e7 100644 --- a/branches/sage/pgs/osdc/Objecter.h +++ b/branches/sage/pgs/osdc/Objecter.h @@ -174,19 +174,19 @@ class Objecter { //tid_t lockx(OSDLock *l, Context *onack, Context *oncommit); // even lazier - tid_t read(object_t oid, off_t off, size_t len, bufferlist *bl, + tid_t read(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist *bl, Context *onfinish, objectrev_t rev=0); - tid_t write(object_t oid, off_t off, size_t len, bufferlist &bl, + tid_t write(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist &bl, Context *onack, Context *oncommit, objectrev_t rev=0); - tid_t zero(object_t oid, off_t off, size_t len, + tid_t zero(object_t oid, off_t off, size_t len, ObjectLayout& ol, Context *onack, Context *oncommit, objectrev_t rev=0); - tid_t stat(object_t oid, off_t *size, Context *onfinish, + tid_t stat(object_t oid, off_t *size, ObjectLayout& ol, Context *onfinish, objectrev_t rev=0); - tid_t lock(int op, object_t oid, Context *onack, Context *oncommit); + tid_t lock(int op, object_t oid, ObjectLayout& ol, Context *onack, Context *oncommit); void ms_handle_failure(Message *m, entity_name_t dest, const entity_inst_t& inst); -- 2.39.5