mds_decay_halflife: 30,
mds_beacon_interval: 5.0,
- mds_beacon_grace: 10.0,
+ mds_beacon_grace: 100000.0,
mds_log: true,
mds_log_max_len: MDS_CACHE_SIZE / 3,
char pg_size; // pg size (num replicas, or raid4 stripe width)
int preferred; // preferred primary osd?
+ // -- pg -> disk layout --
+ int object_stripe_unit; // for per-object raid
+
FileLayout() { }
FileLayout(int su, int sc, int os, int pgt, int pgs, int o=-1) :
stripe_unit(su), stripe_count(sc), object_size(os),
- pg_type(pgt), pg_size(pgs),
- preferred(o) {
+ pg_type(pgt), pg_size(pgs), preferred(o),
+ object_stripe_unit(su) // note: bad default, we pbly want su/(pgs-1)
+ {
assert(object_size % stripe_unit == 0);
}
private:
struct {
- long pcid;
-
// who's asking?
entity_inst_t client;
reqid_t reqid; // minor weirdness: entity_name_t is in reqid_t too.
object_t oid;
objectrev_t rev;
- pg_t pg;
+ ObjectLayout layout;
epoch_t map_epoch;
void set_rep_tid(tid_t t) { st.rep_tid = t; }
const object_t get_oid() { return st.oid; }
- const pg_t get_pg() { return st.pg; }
+ const pg_t get_pg() { return st.layout.pgid; }
+ const ObjectLayout& get_layout() { return st.layout; }
const epoch_t get_map_epoch() { return st.map_epoch; }
//const int get_pg_role() { return st.pg_role; } // who am i asking for?
size_t get_data_len() { return data.length(); }
- // keep a pcid (procedure call id) to match up request+reply
- void set_pcid(long pcid) { this->st.pcid = pcid; }
- long get_pcid() { return st.pcid; }
MOSDOp(entity_inst_t asker, int inc, long tid,
- object_t oid, pg_t pg, epoch_t mapepoch, int op) :
+ object_t oid, ObjectLayout ol, epoch_t mapepoch, int op) :
Message(MSG_OSD_OP) {
memset(&st, 0, sizeof(st));
this->st.client = asker;
this->st.reqid.tid = tid;
this->st.oid = oid;
- this->st.pg = pg;
+ this->st.layout = ol;
this->st.map_epoch = mapepoch;
this->st.op = op;
//void set_pg_role(int r) { st.pg_role = r; }
//void set_rg_nrep(int n) { st.rg_nrep = n; }
+ void set_layout(const ObjectLayout& l) { st.layout = l; }
+
void set_length(size_t l) { st.length = l; }
void set_offset(size_t o) { st.offset = o; }
void set_version(eversion_t v) { st.version = v; }
::_encode(data, payload);
}
- virtual char *get_type_name() { return "oop"; }
-
+ virtual char *get_type_name() { return "osd_op"; }
void print(ostream& out) {
out << "osd_op(" << st.reqid
<< " " << get_opname(st.op)
tid_t rep_tid;
object_t oid;
- pg_t pg;
+ ObjectLayout layout; // pgid, etc.
int op;
long get_tid() { return st.reqid.tid; }
long get_rep_tid() { return st.rep_tid; }
object_t get_oid() { return st.oid; }
- pg_t get_pg() { return st.pg; }
+ pg_t get_pg() { return st.layout.pgid; }
int get_op() { return st.op; }
bool get_commit() { return st.commit; }
this->st.rep_tid = req->st.rep_tid;
this->st.oid = req->st.oid;
- this->st.pg = req->st.pg;
+ this->st.layout = req->st.layout;
this->st.result = result;
this->st.commit = commit;
::_encode(data, payload);
}
- virtual char *get_type_name() { return "oopr"; }
+ virtual char *get_type_name() { return "osd_op_reply"; }
void print(ostream& out) {
out << "osd_op_reply(" << st.reqid
if (m) {
//dout(18) << "got " << m << endl;
- dout(1) << "---- " << m->get_dest()
+ dout(1) << "==== " << m->get_dest()
<< " <- " << m->get_source()
- << " ---- " << *m
+ << " ==== " << *m
<< endl;
if (g_conf.fakemessenger_serialize) {
}
dm->queue_incoming(m);
- dout(1) << "--> " << get_myname() << " -> " << inst.name << " " << *m << endl;
+ dout(1) << "--> " << get_myname() << " -> " << inst.name << " --- " << *m << endl;
}
catch (...) {
}
-// object locks
-PG *OSD::lock_pg(pg_t pgid)
+
+
+// ======================================================
+// PG's
+
+PG *OSD::_create_lock_pg(pg_t pgid, ObjectStore::Transaction& t)
{
- osd_lock.Lock();
- PG *pg = _lock_pg(pgid);
- osd_lock.Unlock();
+ dout(10) << "_create_lock_pg " << pgid << endl;
+
+ if (pg_map.count(pgid))
+ dout(0) << "_create_lock_pg on " << pgid << ", already have " << *pg_map[pgid] << endl;
+
+ // create
+ PG *pg;
+ if (pgid.is_rep())
+ pg = new ReplicatedPG(this, pgid);
+ else if (pgid.is_raid4())
+ pg = new RAID4PG(this, pgid);
+ else
+ assert(0);
+
+ assert(pg_map.count(pgid) == 0);
+ pg_map[pgid] = pg;
+
+ // lock
+ pg->lock();
+ pg_lock.insert(pgid);
+
+ pg->get(); // because it's in pg_map
+ pg->get(); // because we're locking it
+
+ // create collection
+ assert(!store->collection_exists(pgid));
+ t.create_collection(pgid);
+
return pg;
}
+bool OSD::_have_pg(pg_t pgid)
+{
+ return pg_map.count(pgid);
+}
+
PG *OSD::_lock_pg(pg_t pgid)
{
assert(pg_map.count(pgid));
+ // wait?
if (pg_lock.count(pgid)) {
Cond c;
dout(15) << "lock_pg " << pgid << " waiting as " << &c << endl;
dout(15) << "lock_pg " << pgid << endl;
pg_lock.insert(pgid);
- return pg_map[pgid];
-}
-
-void OSD::unlock_pg(pg_t pgid)
-{
- osd_lock.Lock();
- _unlock_pg(pgid);
- osd_lock.Unlock();
+ PG *pg = pg_map[pgid];
+ pg->lock();
+ pg->get(); // because we're "locking" it and returning a pointer copy.
+ return pg;
}
void OSD::_unlock_pg(pg_t pgid)
assert(pg_lock.count(pgid));
pg_lock.erase(pgid);
+ pg_map[pgid]->put_unlock();
+
if (pg_lock_waiters.count(pgid)) {
// someone is in line
Cond *c = pg_lock_waiters[pgid].front();
}
}
-void OSD::_remove_pg(pg_t pgid)
+void OSD::_remove_unlock_pg(PG *pg)
{
- dout(10) << "_remove_pg " << pgid << endl;
+ pg_t pgid = pg->info.pgid;
+
+ dout(10) << "_remove_unlock_pg " << pgid << endl;
+
+ // there shouldn't be any waiters, since we're a stray, and pg is presumably clean0.
+ assert(pg_lock_waiters.count(pgid) == 0);
// remove from store
list<object_t> olist;
ObjectStore::Transaction t;
{
for (list<object_t>::iterator p = olist.begin();
- p != olist.end();
- p++)
+ p != olist.end();
+ p++)
t.remove(*p);
t.remove_collection(pgid);
t.remove(pgid.to_object()); // log too
}
store->apply_transaction(t);
-
- // hose from memory
- delete pg_map[pgid];
+
+ // mark deleted
+ pg->mark_deleted();
+
+ // unlock
+ pg_lock.erase(pgid);
+ pg->put();
+
+ // remove from map
pg_map.erase(pgid);
+ pg->put_unlock(); // will delete, if last reference
+}
+
+
+
+void OSD::load_pgs()
+{
+ dout(10) << "load_pgs" << endl;
+ assert(pg_map.empty());
+
+ list<coll_t> ls;
+ store->list_collections(ls);
+
+ for (list<coll_t>::iterator it = ls.begin();
+ it != ls.end();
+ it++) {
+ pg_t pgid = *it;
+
+ PG *pg = 0;
+ if (pgid.is_rep())
+ new ReplicatedPG(this, pgid);
+ else if (pgid.is_raid4())
+ new RAID4PG(this, pgid);
+ else
+ assert(0);
+ pg_map[pgid] = pg;
+ pg->get();
+
+ // read pg info
+ store->collection_getattr(pgid, "info", &pg->info, sizeof(pg->info));
+
+ // read pg log
+ pg->read_log(store);
+
+ // generate state for current mapping
+ int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting);
+ int role = osdmap->calc_pg_role(whoami, pg->acting, nrep);
+ pg->set_role(role);
+
+ dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl;
+ }
}
+
+
+
+/**
+ * check epochs starting from start to verify the pg acting set hasn't changed
+ * up until now
+ */
+void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
+{
+ dout(15) << "project_pg_history " << pgid
+ << " from " << from << " to " << osdmap->get_epoch()
+ << ", start " << h
+ << endl;
+
+ vector<int> last;
+ osdmap->pg_to_acting_osds(pgid, last);
+
+ for (epoch_t e = osdmap->get_epoch()-1;
+ e >= from;
+ e--) {
+ // verify during intermediate epoch
+ OSDMap oldmap;
+ get_map(e, oldmap);
+
+ vector<int> acting;
+ oldmap.pg_to_acting_osds(pgid, acting);
+
+ // acting set change?
+ if (acting != last &&
+ e <= h.same_since) {
+ dout(15) << "project_pg_history " << pgid << " changed in " << e+1
+ << " from " << acting << " -> " << last << endl;
+ h.same_since = e+1;
+ }
+
+ // primary change?
+ if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) &&
+ e <= h.same_primary_since) {
+ dout(15) << "project_pg_history " << pgid << " primary changed in " << e+1 << endl;
+ h.same_primary_since = e+1;
+
+ if (g_conf.osd_rep == OSD_REP_PRIMARY)
+ h.same_acker_since = h.same_primary_since;
+ }
+ // acker change?
+ if (g_conf.osd_rep != OSD_REP_PRIMARY) {
+ if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) &&
+ e <= h.same_acker_since) {
+ dout(15) << "project_pg_history " << pgid << " acker changed in " << e+1 << endl;
+ h.same_acker_since = e+1;
+ }
+ }
+
+ if (h.same_since > e &&
+ h.same_primary_since > e &&
+ h.same_acker_since > e) break;
+ }
+
+ dout(15) << "project_pg_history end " << h << endl;
+}
void OSD::activate_pg(pg_t pgid, epoch_t epoch)
{
int role = osdmap->calc_pg_role(whoami, acting, nrep);
if (role < 0) continue;
- PG *pg = create_pg(pgid, t);
+ PG *pg = _create_lock_pg(pgid, t);
pg->set_role(role);
pg->acting.swap(acting);
pg->last_epoch_started_any =
pg->info.history.same_primary_since =
pg->info.history.same_acker_since = osdmap->get_epoch();
pg->activate(t);
-
+
dout(7) << "created " << *pg << endl;
+ _unlock_pg(pgid);
}
for (ps_t ps = 0; ps < maxlps; ++ps) {
int nrep = osdmap->pg_to_acting_osds(pgid, acting);
int role = osdmap->calc_pg_role(whoami, acting, nrep);
- PG *pg = create_pg(pgid, t);
+ PG *pg = _create_lock_pg(pgid, t);
pg->acting.swap(acting);
pg->set_role(role);
pg->last_epoch_started_any =
pg->activate(t);
dout(7) << "created " << *pg << endl;
+ _unlock_pg(pgid);
}
}
// raided
- /*
for (int size = g_conf.osd_min_raid_width;
size <= g_conf.osd_max_raid_width;
size++) {
int role = osdmap->calc_pg_role(whoami, acting, nrep);
if (role < 0) continue;
- PG *pg = create_pg(pgid, t);
+ PG *pg = _create_lock_pg(pgid, t);
pg->set_role(role);
pg->acting.swap(acting);
pg->last_epoch_started_any =
pg->info.history.same_primary_since =
pg->info.history.same_acker_since = osdmap->get_epoch();
pg->activate(t);
-
+
dout(7) << "created " << *pg << endl;
+ _unlock_pg(pgid);
}
for (ps_t ps = 0; ps < maxlps; ++ps) {
int nrep = osdmap->pg_to_acting_osds(pgid, acting);
int role = osdmap->calc_pg_role(whoami, acting, nrep);
- PG *pg = create_pg(pgid, t);
+ PG *pg = _create_lock_pg(pgid, t);
pg->acting.swap(acting);
pg->set_role(role);
pg->last_epoch_started_any =
pg->activate(t);
dout(7) << "created " << *pg << endl;
+ _unlock_pg(pgid);
}
}
- */
dout(1) << "mkfs done, created " << pg_map.size() << " pgs" << endl;
} else {
-// ======================================================
-// PG's
-
-bool OSD::pg_exists(pg_t pgid)
-{
- return store->collection_exists(pgid);
-}
-
-PG *OSD::create_pg(pg_t pgid, ObjectStore::Transaction& t)
-{
- if (pg_map.count(pgid)) {
- dout(0) << "create_pg on " << pgid << ", already have " << *pg_map[pgid] << endl;
- }
- assert(pg_map.count(pgid) == 0);
- assert(!pg_exists(pgid));
-
- PG *pg;
- if (pgid.is_rep())
- pg = new ReplicatedPG(this, pgid);
- else if (pgid.is_raid4())
- assert(0); //pg = new RAID4PG(this, pgid);
- else
- assert(0);
- pg_map[pgid] = pg;
-
- t.create_collection(pgid);
-
- return pg;
-}
-
-
-PG *OSD::get_pg(pg_t pgid)
-{
- if (pg_map.count(pgid))
- return pg_map[pgid];
- return 0;
-}
-
-void OSD::load_pgs()
-{
- dout(10) << "load_pgs" << endl;
- assert(pg_map.empty());
-
- list<coll_t> ls;
- store->list_collections(ls);
-
- for (list<coll_t>::iterator it = ls.begin();
- it != ls.end();
- it++) {
- pg_t pgid = *it;
-
- PG *pg = 0;
- if (pgid.is_rep())
- new ReplicatedPG(this, pgid);
- else if (pgid.is_raid4())
- assert(0); //new RAID4PG(this, pgid);
- else
- assert(0);
- pg_map[pgid] = pg;
-
- // read pg info
- store->collection_getattr(pgid, "info", &pg->info, sizeof(pg->info));
-
- // read pg log
- pg->read_log(store);
-
- // generate state for current mapping
- int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting);
- int role = osdmap->calc_pg_role(whoami, pg->acting, nrep);
- pg->set_role(role);
-
- dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl;
- }
-}
-
-
-
-/**
- * check epochs starting from start to verify the pg acting set hasn't changed
- * up until now
- */
-void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
-{
- dout(15) << "project_pg_history " << pgid
- << " from " << from << " to " << osdmap->get_epoch()
- << ", start " << h
- << endl;
-
- vector<int> last;
- osdmap->pg_to_acting_osds(pgid, last);
-
- for (epoch_t e = osdmap->get_epoch()-1;
- e >= from;
- e--) {
- // verify during intermediate epoch
- OSDMap oldmap;
- get_map(e, oldmap);
-
- vector<int> acting;
- oldmap.pg_to_acting_osds(pgid, acting);
-
- // acting set change?
- if (acting != last &&
- e <= h.same_since) {
- dout(15) << "project_pg_history " << pgid << " changed in " << e+1
- << " from " << acting << " -> " << last << endl;
- h.same_since = e+1;
- }
-
- // primary change?
- if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) &&
- e <= h.same_primary_since) {
- dout(15) << "project_pg_history " << pgid << " primary changed in " << e+1 << endl;
- h.same_primary_since = e+1;
-
- if (g_conf.osd_rep == OSD_REP_PRIMARY)
- h.same_acker_since = h.same_primary_since;
- }
-
- // acker change?
- if (g_conf.osd_rep != OSD_REP_PRIMARY) {
- if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) &&
- e <= h.same_acker_since) {
- dout(15) << "project_pg_history " << pgid << " acker changed in " << e+1 << endl;
- h.same_acker_since = e+1;
- }
- }
-
- if (h.same_since > e &&
- h.same_primary_since > e &&
- h.same_acker_since > e) break;
- }
-
- dout(15) << "project_pg_history end " << h << endl;
-}
-
/** do_notifies
* Send an MOSDPGNotify to a primary, with a list of PGs that I have
}
// ok, create PG!
- pg = create_pg(pgid, t);
+ pg = _create_lock_pg(pgid, t);
osdmap->pg_to_acting_osds(pgid, pg->acting);
pg->set_role(0);
pg->info.history = history;
take_waiters(waiting_for_pg[pgid]);
waiting_for_pg.erase(pgid);
}
-
- _lock_pg(pgid);
} else {
// already had it. am i (still) the primary?
pg = _lock_pg(pgid);
assert(role > 0);
ObjectStore::Transaction t;
- pg = create_pg(pgid, t);
+ pg = _create_lock_pg(pgid, t);
pg->acting.swap( acting );
pg->set_role(role);
pg->info.history = history;
store->apply_transaction(t);
dout(10) << *pg << " dne (before), but i am role " << role << endl;
- _lock_pg(pgid);
} else {
pg = _lock_pg(pgid);
dout(10) << *pg << " removing." << endl;
assert(pg->get_role() == -1);
- _remove_pg(pgid);
-
- // unlock. there shouldn't be any waiters, since we're a stray, and pg is presumably clean0.
- assert(pg_lock_waiters.count(pgid) == 0);
- _unlock_pg(pgid);
+ _remove_unlock_pg(pg);
}
delete m;
void OSD::handle_op(MOSDOp *op)
{
const pg_t pgid = op->get_pg();
- PG *pg = get_pg(pgid);
+ PG *pg = _have_pg(pgid) ? _lock_pg(pgid):0;
logger->set("buf", buffer_total_alloc);
// require same or newer map
- if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
+ if (!require_same_or_newer_map(op, op->get_map_epoch())) {
+ _unlock_pg(pgid);
+ return;
+ }
// share our map with sender, if they're old
_share_map_incoming(op->get_source_inst(), op->get_map_epoch());
<< pgid
<< ", waiting" << endl;
waiting_for_pg[pgid].push_back(op);
+ _unlock_pg(pgid);
return;
}
<< " after " << op->get_map_epoch()
<< ", dropping" << endl;
assert(op->get_map_epoch() < osdmap->get_epoch());
+ _unlock_pg(pgid);
delete op;
return;
}
<< " after " << op->get_map_epoch()
<< ", dropping" << endl;
assert(op->get_map_epoch() < osdmap->get_epoch());
+ _unlock_pg(pgid);
delete op;
return;
}
dout(7) << *pg << " queueing replay at " << op->get_version()
<< " for " << *op << endl;
pg->replay_queue[op->get_version()] = op;
+ _unlock_pg(pgid);
return;
} else {
dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update
dout(7) << *pg << " not active (yet)" << endl;
pg->waiting_for_active.push_back(op);
+ _unlock_pg(pgid);
return;
}
// missing object?
if (pg->is_missing_object(op->get_oid())) {
pg->wait_for_missing_object(op->get_oid(), op);
+ _unlock_pg(pgid);
return;
}
/*
int peer = pg->acting[1];
dout(-10) << "fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << endl;
messenger->send_message(op, osdmap->get_inst(peer));
+ _unlock_pg(pgid);
return;
}
}
<< " osd" << peer
<< endl;
messenger->send_message(op, osdmap->get_inst(peer));
+ _unlock_pg(pgid);
return;
}
}
dout(10) << "handle_rep_op pg changed " << pg->info.history
<< " after " << op->get_map_epoch()
<< ", dropping" << endl;
+ _unlock_pg(pgid);
delete op;
return;
}
}
if (g_conf.osd_maxthreads < 1) {
- _lock_pg(pgid);
- do_op(op, pg); // do it now
+
+ if (op->get_type() == MSG_OSD_OP)
+ pg->do_op((MOSDOp*)op); // do it now
+ else if (op->get_type() == MSG_OSD_OPREPLY)
+ pg->do_op_reply((MOSDOpReply*)op);
+ else
+ assert(0);
+
_unlock_pg(pgid);
} else {
+ _unlock_pg(pgid);
// queue for worker threads
/*if (read)
enqueue_op(0, op); // no locking needed for reads
// make sure we have the pg
const pg_t pgid = op->get_pg();
- PG *pg = get_pg(pgid);
// require same or newer map
if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
// share our map with sender, if they're old
_share_map_incoming(op->get_source_inst(), op->get_map_epoch());
- if (!pg) {
+ if (!_have_pg(pgid)) {
// hmm.
delete op;
- }
+ return;
+ }
if (g_conf.osd_maxthreads < 1) {
- _lock_pg(pgid);
- do_op(op, pg); // do it now
+ PG *pg = _lock_pg(pgid);
+ pg->do_op_reply(op); // do it now
_unlock_pg(pgid);
} else {
enqueue_op(pgid, op); // queue for worker threads
osd_lock.Unlock();
// do it
- do_op(op, pg);
+ if (op->get_type() == MSG_OSD_OP)
+ pg->do_op((MOSDOp*)op); // do it now
+ else if (op->get_type() == MSG_OSD_OPREPLY)
+ pg->do_op_reply((MOSDOpReply*)op);
+ else
+ assert(0);
// finish
osd_lock.Lock();
-/** do_op - do an op
- * object lock will be held (if multithreaded)
- * osd_lock NOT held.
- */
-void OSD::do_op(Message *m, PG *pg)
-{
- //dout(15) << "do_op " << *m << endl;
-
- if (m->get_type() == MSG_OSD_OP) {
- MOSDOp *op = (MOSDOp*)m;
-
- logger->inc("op");
-
- switch (op->get_op()) {
-
- // reads
- case OSD_OP_READ:
- if (block_if_wrlocked(op))
- return;
- pg->op_read(op);
- break;
- case OSD_OP_STAT:
- if (block_if_wrlocked(op))
- return;
- pg->op_stat(op);
- break;
-
- // rep stuff
- case OSD_OP_PULL:
- pg->op_pull(op);
- break;
- case OSD_OP_PUSH:
- pg->op_push(op);
- break;
-
- // writes
- case OSD_OP_WRNOOP:
- case OSD_OP_WRITE:
- case OSD_OP_ZERO:
- case OSD_OP_DELETE:
- case OSD_OP_TRUNCATE:
- case OSD_OP_WRLOCK:
- case OSD_OP_WRUNLOCK:
- case OSD_OP_RDLOCK:
- case OSD_OP_RDUNLOCK:
- case OSD_OP_UPLOCK:
- case OSD_OP_DNLOCK:
- if (op->get_source().is_osd()) {
- pg->op_rep_modify(op);
- } else {
- // locked by someone else?
- // for _any_ op type -- eg only the locker can unlock!
- if (op->get_op() != OSD_OP_WRNOOP && // except WRNOOP; we just want to flush
- block_if_wrlocked(op))
- return; // op will be handled later, after the object unlocks
-
- // share latest osd map with rest of pg?
- osd_lock.Lock();
- {
- for (unsigned i=1; i<pg->acting.size(); i++) {
- int osd = pg->acting[i];
- _share_map_outgoing( osdmap->get_inst(osd) );
- }
- }
- osd_lock.Unlock();
-
- // go go gadget pg
- pg->op_modify(op);
-
- if (op->get_op() == OSD_OP_WRITE) {
- logger->inc("c_wr");
- logger->inc("c_wrb", op->get_length());
- }
- }
- break;
-
- default:
- assert(0);
- }
- }
- else if (m->get_type() == MSG_OSD_OPREPLY) {
- pg->op_reply((MOSDOpReply*)m);
- } else
- assert(0);
-}
-
-
void OSD::wait_for_no_ops()
{
#include "ObjectStore.h"
#include "PG.h"
+#include "messages/MOSDOp.h"
+
+
#include <map>
using namespace std;
+
#include <ext/hash_map>
#include <ext/hash_set>
using namespace __gnu_cxx;
-#include "messages/MOSDOp.h"
class Messenger;
class Message;
+class Logger;
+class ObjectStore;
+class OSDMap;
-
-
class OSD : public Dispatcher {
public:
+ // -- states --
+ static const int STATE_BOOTING = 1;
+ static const int STATE_ACTIVE = 2;
+ static const int STATE_STOPPING = 3;
- /** superblock
- */
+
+ /** OSD **/
+protected:
+ Mutex osd_lock; // global lock
+ SafeTimer timer; // safe timer
+
+ Messenger *messenger;
+ Logger *logger;
+ ObjectStore *store;
+ MonMap *monmap;
+
+ int whoami;
+ char dev_path[100];
+
+public:
+ int get_nodeid() { return whoami; }
+
+private:
+ /** superblock **/
OSDSuperblock superblock;
epoch_t boot_epoch;
int read_superblock();
- /** OSD **/
- protected:
- Messenger *messenger;
- int whoami;
-
- static const int STATE_BOOTING = 1;
- static const int STATE_ACTIVE = 2;
- static const int STATE_STOPPING = 3;
-
+ // -- state --
int state;
+public:
bool is_booting() { return state == STATE_BOOTING; }
bool is_active() { return state == STATE_ACTIVE; }
bool is_stopping() { return state == STATE_STOPPING; }
-
- MonMap *monmap;
-
- class Logger *logger;
-
- // local store
- char dev_path[100];
- class ObjectStore *store;
+private:
// heartbeat
void heartbeat();
}
};
- // global lock
- Mutex osd_lock;
- SafeTimer timer;
// -- stats --
int hb_stat_ops; // ops since last heartbeat
hash_map<int, float> peer_qlen;
- // per-pg locking (serializing)
- hash_set<pg_t> pg_lock;
- hash_map<pg_t, list<Cond*> > pg_lock_waiters;
- PG *lock_pg(pg_t pgid);
- PG *_lock_pg(pg_t pgid);
- void unlock_pg(pg_t pgid);
- void _unlock_pg(pg_t pgid);
- // finished waiting messages, that will go at tail of dispatch()
+ // -- waiters --
list<class Message*> finished;
+
void take_waiters(list<class Message*>& ls) {
finished.splice(finished.end(), ls);
}
- // object locking
- hash_map<object_t, list<Message*> > waiting_for_wr_unlock; /** list of operations for each object waiting for 'wrunlock' */
-
+ // -- object locking --
+ hash_map<object_t, list<Message*> > waiting_for_wr_unlock;
+
bool block_if_wrlocked(MOSDOp* op);
- // -- ops --
+
+ // -- op queue --
class ThreadPool<class OSD*, pg_t> *threadpool;
hash_map<pg_t, list<Message*> > op_queue;
int pending_ops;
o->dequeue_op(pgid);
};
- void do_op(Message *m, PG *pg); // actually do it
- void prepare_log_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version,
- objectrev_t crev, objectrev_t rev, PG *pg, eversion_t trim_to);
- void prepare_op_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version,
- objectrev_t crev, objectrev_t rev, PG *pg);
-
- bool waitfor_missing_object(MOSDOp *op, PG *pg);
-
-
-
friend class PG;
friend class ReplicatedPG;
- friend class C_OSD_WriteCommit;
+ friend class RAID4PG;
+
protected:
// -- osd map --
- class OSDMap *osdmap;
- list<class Message*> waiting_for_osdmap;
+ OSDMap *osdmap;
+ list<Message*> waiting_for_osdmap;
hash_map<entity_name_t, epoch_t> peer_map_epoch; // FIXME types
bool _share_map_incoming(const entity_inst_t& inst, epoch_t epoch);
- // -- replication --
+ // -- placement groups --
+ hash_map<pg_t, PG*> pg_map;
+ hash_map<pg_t, list<Message*> > waiting_for_pg;
- // PG
- hash_map<pg_t, PG*> pg_map;
- void load_pgs();
- bool pg_exists(pg_t pg);
- PG *create_pg(pg_t pg, ObjectStore::Transaction& t); // create new PG
- PG *get_pg(pg_t pg); // return existing PG, or null
- void _remove_pg(pg_t pg); // remove from store and memory
+ // per-pg locking (serializes AND acquired pg lock)
+ hash_set<pg_t> pg_lock;
+ hash_map<pg_t, list<Cond*> > pg_lock_waiters;
+
+ PG *_lock_pg(pg_t pgid);
+ void _unlock_pg(pg_t pgid);
- void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from);
+ PG *_create_lock_pg(pg_t pg, ObjectStore::Transaction& t); // create new PG
+ bool _have_pg(pg_t pgid);
+ void _remove_unlock_pg(PG *pg); // remove from store and memory
+ void load_pgs();
+ void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from);
void activate_pg(pg_t pgid, epoch_t epoch);
class C_Activate : public Context {
};
+ // -- tids --
+ // for ops i issue
tid_t last_tid;
- int num_pulling;
-
- hash_map<pg_t, list<Message*> > waiting_for_pg;
Mutex tid_lock;
tid_t get_tid() {
return t;
}
-
- //void handle_rep_op_ack(MOSDOpReply *m);
- // recovery
+ // -- generic pg recovery --
+ int num_pulling;
+
void do_notifies(map< int, list<PG::Info> >& notify_list);
void do_queries(map< int, map<pg_t,PG::Query> >& query_map);
void repeer(PG *pg, map< int, map<pg_t,PG::Query> >& query_map);
- /*
- void pull(PG *pg, object_t oid);
- void push(PG *pg, object_t oid, int dest);
- */
bool require_current_map(Message *m, epoch_t v);
bool require_same_or_newer_map(Message *m, epoch_t e);
void handle_pg_log(class MOSDPGLog *m);
void handle_pg_remove(class MOSDPGRemove *m);
- /*
- void op_pull(class MOSDOp *op, PG *pg);
- void op_push(class MOSDOp *op, PG *pg);
- void op_rep_modify(class MOSDOp *op, PG *pg); // write, trucnate, delete
- void op_rep_modify_commit(class MOSDOp *op, int ackerosd,
- eversion_t last_complete);
- */
-
- friend class C_OSD_RepModifyCommit;
-
public:
OSD(int id, Messenger *m, MonMap *mm, char *dev = 0);
~OSD();
- int get_nodeid() { return whoami; }
-
// startup/shutdown
int init();
int shutdown();
void handle_osd_ping(class MOSDPing *m);
void handle_op(class MOSDOp *m);
-
- void op_read(class MOSDOp *m);//, PG *pg);
- void op_stat(class MOSDOp *m);//, PG *pg);
- void op_modify(class MOSDOp *m, PG *pg);
- void op_modify_commit(pg_t pgid, tid_t rep_tid, eversion_t pg_complete_thru);
-
- // for replication
void handle_op_reply(class MOSDOpReply *m);
void force_remount();
/**** mapping facilities ****/
// oid -> pg
- pg_t object_to_pg(object_t oid, FileLayout& layout) {
+ ObjectLayout file_to_object_layout(object_t oid, FileLayout& layout) {
static crush::Hash H(777);
// calculate ps (placement seed)
assert(0);
}
- // construct pg
- return pg_t(layout.pg_type, layout.pg_size, ps, layout.preferred);
+ // construct object layout
+ return ObjectLayout(pg_t(layout.pg_type, layout.pg_size, ps, layout.preferred),
+ layout.object_stripe_unit);
}
static const int STATE_STRAY = 16; // i must notify the primary i exist.
- protected:
+protected:
OSD *osd;
+ /** locking and reference counting.
+ * I destroy myself when the reference count hits zero.
+ * lock() should be called before doing anything.
+ * get() should be called on pointer copy (to another thread, etc.).
+ * put() should be called on destruction of some previously copied pointer.
+ * put_unlock() when done with the current pointer (_most common_).
+ */
+ Mutex _lock;
+ int ref;
+ bool deleted;
+
+public:
+ void lock() {
+ //cout << info.pgid << " lock" << endl;
+ _lock.Lock();
+ }
+ void get() {
+ //cout << info.pgid << " get " << ref << endl;
+ assert(_lock.is_locked());
+ ++ref;
+ }
+ void put() {
+ //cout << info.pgid << " put " << ref << endl;
+ assert(_lock.is_locked());
+ --ref;
+ assert(ref > 0); // last put must be a put_unlock.
+ }
+ void put_unlock() {
+ //cout << info.pgid << " put_unlock " << ref << endl;
+ assert(_lock.is_locked());
+ --ref;
+ _lock.Unlock();
+ if (ref == 0) delete this;
+ }
+
+ void mark_deleted() { deleted = true; }
+ bool is_deleted() { return deleted; }
+
public:
// pg state
Info info;
return 0;
}
- virtual void op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete) = 0;
friend class C_OSD_RepModify_Commit;
public:
PG(OSD *o, pg_t p) :
osd(o),
+ ref(0), deleted(false),
info(p),
role(0),
state(0),
-
-
// abstract bits
- virtual void op_stat(MOSDOp *op) = 0;
- virtual int op_read(MOSDOp *op) = 0;
- virtual void op_modify(MOSDOp *op) = 0;
- virtual void op_rep_modify(MOSDOp *op) = 0;
- virtual void op_push(MOSDOp *op) = 0;
- virtual void op_pull(MOSDOp *op) = 0;
- virtual void op_reply(MOSDOpReply *op) = 0;
+ virtual void do_op(MOSDOp *op) = 0;
+ virtual void do_op_reply(MOSDOpReply *op) = 0;
virtual bool same_for_read_since(epoch_t e) = 0;
virtual bool same_for_modify_since(epoch_t e) = 0;
+void RAID4PG::do_op(MOSDOp *op)
+{
+
+
+}
+
+
+
+void RAID4PG::do_op_reply(MOSDOpReply *reply)
+{
+
+}
+
+
+
+// -----------------
+// pg changes
+
+bool RAID4PG::same_for_read_since(epoch_t e)
+{
+ return e >= info.history.same_since; // whole pg set same
+}
+
+bool RAID4PG::same_for_modify_since(epoch_t e)
+{
+ return e >= info.history.same_since; // whole pg set same
+}
+
+bool RAID4PG::same_for_rep_modify_since(epoch_t e)
+{
+ return e >= info.history.same_since; // whole pg set same
+}
+
+
+// -----------------
+// RECOVERY
+
+bool RAID4PG::is_missing_object(object_t oid)
+{
+ return false;
+}
+
+void RAID4PG::wait_for_missing_object(object_t oid, MOSDOp *op)
+{
+ assert(0);
+}
+
+void RAID4PG::note_failed_osd(int o)
+{
+ dout(10) << "note_failed_osd osd" << o << endl;
+ assert(0);
+}
+
+void RAID4PG::on_acker_change()
+{
+ dout(10) << "on_acker_change" << endl;
+ assert(0);
+}
+
+
+void RAID4PG::on_role_change()
+{
+ dout(10) << "on_role_change" << endl;
+ assert(0);
+}
+
+
+void RAID4PG::clean_up_local(ObjectStore::Transaction&)
+{
+}
+
+void RAID4PG::cancel_recovery()
+{
+ assert(0);
+}
+
+bool RAID4PG::do_recovery()
+{
+ assert(0);
+ return false;
+}
+
+void RAID4PG::clean_replicas()
+{
+ assert(0);
+}
+
void prepare_op_transaction(ObjectStore::Transaction& t,
MOSDOp *op, eversion_t& version,
objectrev_t crev, objectrev_t rev);
-
-public:
- RAID4PG(OSD *o, pg_t p) : PG(o,p) { }
void op_stat(MOSDOp *op);
int op_read(MOSDOp *op);
void op_push(MOSDOp *op);
void op_pull(MOSDOp *op);
- void op_reply(MOSDOpReply *r);
+
+
+public:
+ RAID4PG(OSD *o, pg_t p) : PG(o,p) { }
+
+ void do_op(MOSDOp *op);
+ void do_op_reply(MOSDOpReply *r);
bool same_for_read_since(epoch_t e);
bool same_for_modify_since(epoch_t e);
void on_acker_change();
void on_role_change();
+ void clean_up_local(ObjectStore::Transaction& t);
+
+ void cancel_recovery();
+ bool do_recovery();
+
+ void clean_replicas();
+
+
};
+
+/** do_op - do an op
+ * pg lock will be held (if multithreaded)
+ * osd_lock NOT held.
+ */
+void ReplicatedPG::do_op(MOSDOp *op)
+{
+ //dout(15) << "do_op " << *op << endl;
+
+ osd->logger->inc("op");
+
+ switch (op->get_op()) {
+
+ // reads
+ case OSD_OP_READ:
+ if (osd->block_if_wrlocked(op))
+ return;
+ op_read(op);
+ break;
+ case OSD_OP_STAT:
+ if (osd->block_if_wrlocked(op))
+ return;
+ op_stat(op);
+ break;
+
+ // rep stuff
+ case OSD_OP_PULL:
+ op_pull(op);
+ break;
+ case OSD_OP_PUSH:
+ op_push(op);
+ break;
+
+ // writes
+ case OSD_OP_WRNOOP:
+ case OSD_OP_WRITE:
+ case OSD_OP_ZERO:
+ case OSD_OP_DELETE:
+ case OSD_OP_TRUNCATE:
+ case OSD_OP_WRLOCK:
+ case OSD_OP_WRUNLOCK:
+ case OSD_OP_RDLOCK:
+ case OSD_OP_RDUNLOCK:
+ case OSD_OP_UPLOCK:
+ case OSD_OP_DNLOCK:
+ if (op->get_source().is_osd()) {
+ op_rep_modify(op);
+ } else {
+ // go go gadget pg
+ op_modify(op);
+
+ if (op->get_op() == OSD_OP_WRITE) {
+ osd->logger->inc("c_wr");
+ osd->logger->inc("c_wrb", op->get_length());
+ }
+ }
+ break;
+
+ default:
+ assert(0);
+ }
+}
+
+void ReplicatedPG::do_op_reply(MOSDOpReply *r)
+{
+ // must be replication.
+ tid_t rep_tid = r->get_rep_tid();
+
+ if (rep_gather.count(rep_tid)) {
+ // oh, good.
+ int fromosd = r->get_source().num();
+ repop_ack(rep_gather[rep_tid],
+ r->get_result(), r->get_commit(),
+ fromosd,
+ r->get_pg_complete_thru());
+ delete r;
+ } else {
+ // early ack.
+ waiting_for_repop[rep_tid].push_back(r);
+ }
+}
+
+
+
+
// ========================================================================
// READS
// ========================================================================
// rep op gather
-class C_OSD_WriteCommit : public Context {
+class C_OSD_ModifyCommit : public Context {
public:
- OSD *osd;
- pg_t pgid;
+ ReplicatedPG *pg;
tid_t rep_tid;
eversion_t pg_last_complete;
- C_OSD_WriteCommit(OSD *o, pg_t p, tid_t rt, eversion_t lc) : osd(o), pgid(p), rep_tid(rt), pg_last_complete(lc) {}
+ C_OSD_ModifyCommit(ReplicatedPG *p, tid_t rt, eversion_t lc) : pg(p), rep_tid(rt), pg_last_complete(lc) {
+ pg->get(); // we're copying the pointer
+ }
void finish(int r) {
- ReplicatedPG *pg = (ReplicatedPG*)(osd->_lock_pg(pgid));
- if (pg) {
+ pg->lock();
+ if (!pg->is_deleted())
pg->op_modify_commit(rep_tid, pg_last_complete);
- osd->_unlock_pg(pg->info.pgid);
- }
+ pg->put_unlock();
}
};
dout(10) << "apply_repop applying update on " << *repop << endl;
assert(!repop->applied);
- Context *oncommit = new C_OSD_WriteCommit(osd, info.pgid, repop->rep_tid, repop->pg_local_last_complete);
+ Context *oncommit = new C_OSD_ModifyCommit(this, repop->rep_tid, repop->pg_local_last_complete);
unsigned r = osd->store->apply_transaction(repop->t, oncommit);
if (r)
dout(-10) << "apply_repop apply transaction return " << r << " on " << *repop << endl;
// forward the write/update/whatever
MOSDOp *wr = new MOSDOp(op->get_client_inst(), op->get_client_inc(), op->get_reqid().tid,
oid,
- info.pgid,
+ ObjectLayout(info.pgid),
osd->osdmap->get_epoch(),
op->get_op());
wr->get_data() = op->get_data(); // _copy_ bufferlist
// commit (to disk) callback
class C_OSD_RepModifyCommit : public Context {
public:
- OSD *osd;
+ ReplicatedPG *pg;
MOSDOp *op;
int destosd;
bool acked;
bool waiting;
- C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo, int dosd, eversion_t lc) :
- osd(o), op(oo), destosd(dosd), pg_last_complete(lc),
- acked(false), waiting(false) { }
+ C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDOp *oo, int dosd, eversion_t lc) :
+ pg(p), op(oo), destosd(dosd), pg_last_complete(lc),
+ acked(false), waiting(false) {
+ pg->get(); // we're copying the pointer.
+ }
void finish(int r) {
lock.Lock();
assert(!waiting);
assert(acked);
lock.Unlock();
- PG *pg = osd->lock_pg(op->get_pg());
+ pg->lock();
pg->op_rep_modify_commit(op, destosd, pg_last_complete);
- osd->unlock_pg(op->get_pg());
+ pg->put_unlock();
}
void ack() {
lock.Lock();
object_t oid = op->get_oid();
const char *opname = MOSDOp::get_opname(op->get_op());
+ // locked by someone else?
+ // for _any_ op type -- eg only the locker can unlock!
+ if (op->get_op() != OSD_OP_WRNOOP && // except WRNOOP; we just want to flush
+ osd->block_if_wrlocked(op))
+ return; // op will be handled later, after the object unlocks
+
+ // share latest osd map with rest of pg?
+ osd->osd_lock.Lock();
+ {
+ for (unsigned i=1; i<acting.size(); i++) {
+ osd->_share_map_outgoing( osd->osdmap->get_inst(acting[i]) );
+ }
+ }
+ osd->osd_lock.Unlock();
+
+
// dup op?
if (is_dup(op->get_reqid())) {
dout(-3) << "op_modify " << opname << " dup op " << op->get_reqid()
prepare_log_transaction(t, op, nv, crev, op->get_rev(), peers_complete_thru);
prepare_op_transaction(t, op, nv, crev, op->get_rev());
- C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(osd, op, get_acker(),
+ C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, get_acker(),
info.last_complete);
unsigned r = osd->store->apply_transaction(t, oncommit);
if (r != 0 && // no errors
prepare_op_transaction(t, op, nv, crev, op->get_rev());
}
- oncommit = new C_OSD_RepModifyCommit(osd, op, ackerosd, info.last_complete);
+ oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete);
// apply log update. and possibly update itself.
unsigned tr = osd->store->apply_transaction(t, oncommit);
-void ReplicatedPG::op_reply(MOSDOpReply *r)
-{
- // must be replication.
- tid_t rep_tid = r->get_rep_tid();
-
- if (rep_gather.count(rep_tid)) {
- // oh, good.
- int fromosd = r->get_source().num();
- repop_ack(rep_gather[rep_tid],
- r->get_result(), r->get_commit(),
- fromosd,
- r->get_pg_complete_thru());
- delete r;
- } else {
- // early ack.
- waiting_for_repop[rep_tid].push_back(r);
- }
-}
-
void ReplicatedPG::note_failed_osd(int o)
MOSDOp *op, eversion_t& version,
objectrev_t crev, objectrev_t rev);
- friend class C_OSD_WriteCommit;
+ friend class C_OSD_ModifyCommit;
+ friend class C_OSD_RepModifyCommit;
// pg on-disk content
void clean_replicas();
+ void op_stat(MOSDOp *op);
+ int op_read(MOSDOp *op);
+ void op_modify(MOSDOp *op);
+ void op_rep_modify(MOSDOp *op);
+ void op_push(MOSDOp *op);
+ void op_pull(MOSDOp *op);
+
{ }
~ReplicatedPG() {}
- void op_stat(MOSDOp *op);
- int op_read(MOSDOp *op);
- void op_modify(MOSDOp *op);
- void op_rep_modify(MOSDOp *op);
- void op_push(MOSDOp *op);
- void op_pull(MOSDOp *op);
- void op_reply(MOSDOpReply *r);
+ void do_op(MOSDOp *op);
+ void do_op_reply(MOSDOpReply *r);
bool same_for_read_since(epoch_t e);
bool same_for_modify_since(epoch_t e);
out << pg.preferred() << 'p';
out << hex << pg.ps() << dec;
- out << "=" << hex << (__uint64_t)pg << dec;
+ //out << "=" << hex << (__uint64_t)pg << dec;
return out;
}
}
+/** ObjectLayout
+ *
+ * describes an object's placement and layout in the storage cluster.
+ * most importatly, which pg it belongs to.
+ * if that pg is raided, it also specifies the object's stripe_unit.
+ */
+struct ObjectLayout {
+ pg_t pgid; // what pg do i belong to
+ int stripe_unit; // for object raid in raid pgs
+
+ ObjectLayout() : pgid(0), stripe_unit(0) { }
+ ObjectLayout(pg_t p, int su=0) : pgid(p), stripe_unit(su) { }
+};
+inline ostream& operator<<(ostream& out, const ObjectLayout &ol)
+{
+ out << "pg" << ol.pgid;
+ if (ol.stripe_unit)
+ out << ".su=" << ol.stripe_unit;
+ return out;
+}
size_t length; // in object
objectrev_t rev; // which revision?
- pg_t pgid; // where to find the object
+
+ ObjectLayout layout; // object layout (pgid, etc.)
map<size_t, size_t> buffer_extents; // off -> len. extents in buffer being mapped (may be fragmented bc of striping!)
- ObjectExtent() : start(0), length(0), rev(0), pgid(0) {}
- ObjectExtent(object_t o, off_t s=0, size_t l=0) : oid(o), start(s), length(l), rev(0), pgid(0) { }
+ ObjectExtent() : start(0), length(0), rev(0) {}
+ ObjectExtent(object_t o, off_t s=0, size_t l=0) : oid(o), start(s), length(l), rev(0) { }
};
inline ostream& operator<<(ostream& out, ObjectExtent &ex)
{
return out << "extent("
- << ex.oid << " in " << hex << ex.pgid << dec
+ << ex.oid << " in " << ex.layout
<< " " << ex.start << "~" << ex.length
<< ")";
}
p++) {
dout(10) << "_probe probing " << p->oid << endl;
C_Probe *c = new C_Probe(this, probe, p->oid);
- probe->ops[p->oid] = objecter->stat(p->oid, &c->size, c);
+ probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, c);
}
}
ex = &object_extents[oid];
ex->oid = oid;
ex->rev = rev;
- ex->pgid = objecter->osdmap->object_to_pg( oid, inode.layout );
+ ex->layout = objecter->osdmap->file_to_object_layout( oid, inode.layout );
}
// map range into object
}
ex->buffer_extents[cur-offset] = x_len;
- dout(15) << "file_to_extents " << *ex << " in " << ex->pgid << endl;
+ dout(15) << "file_to_extents " << *ex << " in " << ex->layout << endl;
//cout << "map: ino " << ino << " oid " << ex.oid << " osd " << ex.osd << " offset " << ex.offset << " len " << ex.len << " ... left " << left << endl;
left -= x_len;
C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length());
// go
- objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), &onfinish->bl,
+ objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), &onfinish->bl,
onfinish);
}
C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_oid(), bh->start(), bh->length());
// go
- tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->bl,
+ tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), bh->bl,
onack, oncommit);
// set bh last_write_tid
dout(10) << "readx " << *ex_it << endl;
// get Object cache
- Object *o = get_object(ex_it->oid, ino);
+ Object *o = get_object(ex_it->oid, ino, ex_it->layout);
// map extent into bufferheads
map<off_t, BufferHead*> hits, missing, rx;
ex_it != wr->extents.end();
ex_it++) {
// get object cache
- Object *o = get_object(ex_it->oid, ino);
+ Object *o = get_object(ex_it->oid, ino, ex_it->layout);
// map it all into a single bufferhead.
BufferHead *bh = o->map_write(wr);
for (map<object_t,ObjectExtent>::iterator i = by_oid.begin();
i != by_oid.end();
i++) {
- Object *o = get_object(i->first, ino);
+ Object *o = get_object(i->first, ino, i->second.layout);
rdlock(o);
}
// make sure we aren't already locking/locked...
object_t oid = wr->extents.front().oid;
Object *o = 0;
- if (objects.count(oid)) o = get_object(oid, ino);
+ if (objects.count(oid)) o = get_object(oid, ino, wr->extents.front().layout);
if (!o ||
(o->lock_state != Object::LOCK_WRLOCK &&
o->lock_state != Object::LOCK_WRLOCKING &&
for (map<object_t,ObjectExtent>::iterator i = by_oid.begin();
i != by_oid.end();
i++) {
- Object *o = get_object(i->first, ino);
+ Object *o = get_object(i->first, ino, i->second.layout);
wrlock(o);
}
commit->tid =
ack->tid =
o->last_write_tid =
- objecter->lock(OSD_OP_RDLOCK, o->get_oid(), ack, commit);
+ objecter->lock(OSD_OP_RDLOCK, o->get_oid(), o->get_layout(), ack, commit);
}
// stake our claim.
commit->tid =
ack->tid =
o->last_write_tid =
- objecter->lock(op, o->get_oid(), ack, commit);
+ objecter->lock(op, o->get_oid(), o->get_layout(), ack, commit);
}
// stake our claim.
commit->tid =
lockack->tid =
o->last_write_tid =
- objecter->lock(OSD_OP_RDUNLOCK, o->get_oid(), lockack, commit);
+ objecter->lock(OSD_OP_RDUNLOCK, o->get_oid(), o->get_layout(), lockack, commit);
}
void ObjectCacher::wrunlock(Object *o)
commit->tid =
lockack->tid =
o->last_write_tid =
- objecter->lock(op, o->get_oid(), lockack, commit);
+ objecter->lock(op, o->get_oid(), o->get_layout(), lockack, commit);
}
object_t oid; // this _always_ is oid.rev=0
inodeno_t ino;
objectrev_t rev; // last rev we're written
+ ObjectLayout layout;
public:
map<off_t, BufferHead*> data;
int rdlock_ref; // how many ppl want or are using a READ lock
public:
- Object(ObjectCacher *_oc, object_t o, inodeno_t i) :
+ Object(ObjectCacher *_oc, object_t o, inodeno_t i, ObjectLayout& l) :
oc(_oc),
- oid(o), ino(i),
+ oid(o), ino(i), layout(l),
last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0)
{}
object_t get_oid() { return oid; }
inodeno_t get_ino() { return ino; }
+ ObjectLayout& get_layout() { return layout; }
+ void set_layout(ObjectLayout& l) { layout = l; }
+
bool can_close() {
return data.empty() && lock_state == LOCK_NONE &&
waitfor_ack.empty() && waitfor_commit.empty() &&
// objects
- Object *get_object(object_t oid, inodeno_t ino) {
+ Object *get_object(object_t oid, inodeno_t ino, ObjectLayout &l) {
// have it?
if (objects.count(oid))
return objects[oid];
// create it.
- Object *o = new Object(this, oid, ino);
+ Object *o = new Object(this, oid, ino, l);
objects[oid] = o;
objects_by_ino[ino].insert(o);
return o;
// stat -----------------------------------
-tid_t Objecter::stat(object_t oid, off_t *size, Context *onfinish,
+tid_t Objecter::stat(object_t oid, off_t *size, ObjectLayout& ol, Context *onfinish,
objectrev_t rev)
{
OSDStat *st = new OSDStat(size);
st->extents.push_back(ObjectExtent(oid, 0, 0));
- st->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+ st->extents.front().layout = ol;
st->extents.front().rev = rev;
st->onfinish = onfinish;
{
// find OSD
ObjectExtent &ex = st->extents.front();
- PG &pg = get_pg( ex.pgid );
+ PG &pg = get_pg( ex.layout.pgid );
// send
last_tid++;
assert(client_inc >= 0);
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
- ex.oid, ex.pgid, osdmap->get_epoch(),
+ ex.oid, ex.layout, osdmap->get_epoch(),
OSD_OP_STAT);
+
dout(10) << "stat_submit " << st << " tid " << last_tid
<< " oid " << ex.oid
- << " pg " << ex.pgid
+ << " " << ex.layout
<< " osd" << pg.acker()
<< endl;
// read -----------------------------------
-tid_t Objecter::read(object_t oid, off_t off, size_t len, bufferlist *bl,
+tid_t Objecter::read(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist *bl,
Context *onfinish,
objectrev_t rev)
{
OSDRead *rd = new OSDRead(bl);
rd->extents.push_back(ObjectExtent(oid, off, len));
- rd->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+ rd->extents.front().layout = ol;
rd->extents.front().rev = rev;
readx(rd, onfinish);
return last_tid;
tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex)
{
// find OSD
- PG &pg = get_pg( ex.pgid );
+ PG &pg = get_pg( ex.layout.pgid );
// send
last_tid++;
assert(client_inc >= 0);
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
- ex.oid, ex.pgid, osdmap->get_epoch(),
+ ex.oid, ex.layout, osdmap->get_epoch(),
OSD_OP_READ);
m->set_length(ex.length);
m->set_offset(ex.start);
dout(10) << "readx_submit " << rd << " tid " << last_tid
<< " oid " << ex.oid << " " << ex.start << "~" << ex.length
<< " (" << ex.buffer_extents.size() << " buffer fragments)"
- << " pg " << ex.pgid
+ << " " << ex.layout
<< " osd" << pg.acker()
<< endl;
// write ------------------------------------
-tid_t Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl,
+tid_t Objecter::write(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist &bl,
Context *onack, Context *oncommit,
objectrev_t rev)
{
OSDWrite *wr = new OSDWrite(bl);
wr->extents.push_back(ObjectExtent(oid, off, len));
- wr->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+ wr->extents.front().layout = ol;
wr->extents.front().buffer_extents[0] = len;
wr->extents.front().rev = rev;
modifyx(wr, onack, oncommit);
// zero
-tid_t Objecter::zero(object_t oid, off_t off, size_t len,
+tid_t Objecter::zero(object_t oid, off_t off, size_t len, ObjectLayout& ol,
Context *onack, Context *oncommit,
objectrev_t rev)
{
OSDModify *z = new OSDModify(OSD_OP_ZERO);
z->extents.push_back(ObjectExtent(oid, off, len));
- z->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+ z->extents.front().layout = ol;
z->extents.front().rev = rev;
modifyx(z, onack, oncommit);
return last_tid;
// lock ops
-tid_t Objecter::lock(int op, object_t oid,
+tid_t Objecter::lock(int op, object_t oid, ObjectLayout& ol,
Context *onack, Context *oncommit)
{
OSDModify *l = new OSDModify(op);
l->extents.push_back(ObjectExtent(oid, 0, 0));
- l->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+ l->extents.front().layout = ol;
modifyx(l, onack, oncommit);
return last_tid;
}
tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
{
// find
- PG &pg = get_pg( ex.pgid );
+ PG &pg = get_pg( ex.layout.pgid );
// send
tid_t tid;
tid = ++last_tid;
MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
- ex.oid, ex.pgid, osdmap->get_epoch(),
+ ex.oid, ex.layout, osdmap->get_epoch(),
wr->op);
m->set_length(ex.length);
m->set_offset(ex.start);
dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid
<< " oid " << ex.oid
<< " " << ex.start << "~" << ex.length
- << " pg " << ex.pgid
+ << " " << ex.layout
<< " osd" << pg.primary()
<< endl;
if (pg.primary() >= 0)
//tid_t lockx(OSDLock *l, Context *onack, Context *oncommit);
// even lazier
- tid_t read(object_t oid, off_t off, size_t len, bufferlist *bl,
+ tid_t read(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist *bl,
Context *onfinish,
objectrev_t rev=0);
- tid_t write(object_t oid, off_t off, size_t len, bufferlist &bl,
+ tid_t write(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist &bl,
Context *onack, Context *oncommit,
objectrev_t rev=0);
- tid_t zero(object_t oid, off_t off, size_t len,
+ tid_t zero(object_t oid, off_t off, size_t len, ObjectLayout& ol,
Context *onack, Context *oncommit,
objectrev_t rev=0);
- tid_t stat(object_t oid, off_t *size, Context *onfinish,
+ tid_t stat(object_t oid, off_t *size, ObjectLayout& ol, Context *onfinish,
objectrev_t rev=0);
- tid_t lock(int op, object_t oid, Context *onack, Context *oncommit);
+ tid_t lock(int op, object_t oid, ObjectLayout& ol, Context *onack, Context *oncommit);
void ms_handle_failure(Message *m, entity_name_t dest, const entity_inst_t& inst);