From e339fe08d1e92870e5dcabb2408ff7af875cf509 Mon Sep 17 00:00:00 2001 From: sage Date: Tue, 13 Sep 2005 19:54:33 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@493 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/TODO | 71 ++++++++- ceph/messages/MOSDOp.h | 2 + ceph/osd/OSD.cc | 324 ++++++++++++++++++++++++----------------- ceph/osd/OSD.h | 21 ++- ceph/osd/PG.cc | 97 ++++++------ ceph/osd/PG.h | 14 +- 6 files changed, 336 insertions(+), 193 deletions(-) diff --git a/ceph/TODO b/ceph/TODO index 7f7161811cdd9..f3fce49a633b9 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -1,6 +1,40 @@ +cluster issues +- general problem: how to do posix ordering on object boundaries using an object store + +- osd states: + data placement vs liveness: active, inactive, down, failed +- failure model: + anything more than that? "temporarily unavailable" type state (for, say, fsck)? + +- replication latency. flush on replicas? +- what does 'complete' mean on new primary? + - apparently _need_ to examine objects lists always? would be nice if we didn't! +- "lazy" flush mode, for (just) doing read/write or write/write sharing? + + +- deleting objects + - osd's that rejoin + - must keep stray replicas clean + - + +- communications failure model.. is it appropriate? + - reliable, ordered, buffered and flushed on 'down' boundaries? + - ordered, unreliable? +- what about large messages? :( + + +distribution + +- osd imbalance? + + + + + osd fun -- op_delete +//- generalize rep stuff (op_delete, op_truncate, etc.) + - pull plan items need to be removed when they're obsolete. or, figure out what's old later. - osdmap history distribution - double commit? what does 'complete' mean on a replica? @@ -11,6 +45,17 @@ osd fun eg maphistory->same_pg_primary_since(pg, epoch) + +- osd states + + up placing data + active y y + inactive y n + down n y + failed n n + + + RUSH_P : prime numbers draw K from expanding set of clusters @@ -20,6 +65,30 @@ RUSH_R : removal RUSH_T : tree +two types of buckets: + + edge bucket .. list of identical disks + bucket .. list of whatever, structured as a tree. + +struct disk_set { + int id; + float disk_weight; + int ndisks; + vector disks; +}; + +struct bucket { + int id; + vector contents; + map weight; +}; + + + + + + + p 0 disk t 1 shelf diff --git a/ceph/messages/MOSDOp.h b/ceph/messages/MOSDOp.h index bbf791393f265..1428ec1206497 100644 --- a/ceph/messages/MOSDOp.h +++ b/ceph/messages/MOSDOp.h @@ -21,6 +21,8 @@ #define OSD_OP_TRUNCATE 12 #define OSD_OP_ZERORANGE 13 +#define OSD_OP_IS_REP(x) ((x) >= 30) + // replication/recovery -- these ops are relative to a specific object version # #define OSD_OP_REP_PULL 30 // whole object read #define OSD_OP_REP_PUSH 31 // whole object write diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 871a94a1d69a9..c697557ce8acc 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -1068,7 +1068,7 @@ void OSD::handle_pg_peer_ack(MOSDPGPeerAck *m) PG *pg = open_pg(it->first); assert(pg); - dout(10) << " " << *pg << " remote state " << it->second.state + dout(10) << " " << *pg << " osd" << from << " remote state " << it->second.state << " w/ " << it->second.objects.size() << " objects, " << it->second.deleted.size() << " deleted" << endl; @@ -1083,6 +1083,7 @@ void OSD::handle_pg_peer_ack(MOSDPGPeerAck *m) for (map::iterator pit = pg->get_peers().begin(); pit != pg->get_peers().end(); pit++) { + dout(10) << " " << *pg << " peer osd" << pit->first << " state " << pit->second->get_state() << endl; if (!pit->second->is_active()) fully = false; } @@ -1636,43 +1637,63 @@ void OSD::do_op(MOSDOp *op) logger->inc("op"); - // do the op - switch (op->get_op()) { + // replication ops? + if (OSD_OP_IS_REP(op->get_op())) { + // replication/recovery + switch (op->get_op()) { + case OSD_OP_REP_PULL: + op_rep_pull(op); + break; + case OSD_OP_REP_PUSH: + op_rep_push(op); + break; + case OSD_OP_REP_REMOVE: + op_rep_remove(op); + break; + case OSD_OP_REP_WRITE: + op_rep_write(op); + break; + default: + assert(0); + } + } else { + // regular op - // normal - - case OSD_OP_READ: - op_read(op); - break; - case OSD_OP_WRITE: - op_write(op); - break; - case OSD_OP_DELETE: - op_delete(op); - break; - case OSD_OP_TRUNCATE: - op_truncate(op); - break; - case OSD_OP_STAT: - op_stat(op); - break; + pg_t pgid = op->get_pg(); + PG *pg = open_pg(pgid); - // replication/recovery - case OSD_OP_REP_PULL: - op_rep_pull(op); - break; - case OSD_OP_REP_PUSH: - op_rep_push(op); - break; - case OSD_OP_REP_REMOVE: - op_rep_remove(op); - break; - case OSD_OP_REP_WRITE: - op_rep_write(op); - break; - - default: - assert(0); + // PG must be peered for all client ops. + if (!pg) { + dout(7) << "op_write pg " << hex << pgid << dec << " dne (yet)" << endl; + waiting_for_pg[pgid].push_back(op); + return; + } + if (!pg->is_peered()) { + dout(7) << "op_write " << *pg << " not peered (yet)" << endl; + waiting_for_pg_peered[pgid].push_back(op); + return; + } + + // do op + switch (op->get_op()) { + case OSD_OP_READ: + op_read(op, pg); + break; + case OSD_OP_WRITE: + op_write(op, pg); + break; + case OSD_OP_DELETE: + op_delete(op, pg); + break; + case OSD_OP_TRUNCATE: + op_truncate(op, pg); + break; + case OSD_OP_STAT: + op_stat(op, pg); + break; + default: + assert(0); + } } // finish @@ -1698,15 +1719,56 @@ void OSD::wait_for_no_ops() osd_lock.Unlock(); } -void OSD::op_read(MOSDOp *r) + + +// READ OPS + +bool OSD::object_complete(PG *pg, object_t oid, Message *op) { + //v = 0; + + if (pg->is_complete()) { + /* + if (store->exists(oid)) { + store->getattr(oid, "version", &v, sizeof(v)); + assert(v>0); + } + */ + } else { + if (pg->objects.count(oid)) { + //v = pg->objects[oid]; + + if (pg->objects_loc.count(oid)) { + // proxying, wait. + dout(7) << "object " << hex << oid << dec << /*" v " << v << */" in " << *pg + << " exists but not local (yet)" << endl; + waiting_for_object[oid].push_back(op); + return false; + } + } + } + + return true; +} + +void OSD::op_read(MOSDOp *op, PG *pg) +{ + object_t oid = op->get_oid(); + lock_object(oid); + + // version? clean? + if (!object_complete(pg, oid, op)) { + unlock_object(oid); + return; + } + // read into a buffer - bufferptr bptr = new buffer(r->get_length()); // prealloc space for entire read - long got = store->read(r->get_oid(), - r->get_length(), r->get_offset(), + bufferptr bptr = new buffer(op->get_length()); // prealloc space for entire read + long got = store->read(oid, + op->get_length(), op->get_offset(), bptr.c_str()); // set up reply - MOSDOpReply *reply = new MOSDOpReply(r, 0, osdmap); + MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap); if (got >= 0) { bptr.set_length(got); // properly size the buffer @@ -1723,20 +1785,49 @@ void OSD::op_read(MOSDOp *r) reply->set_length(0); } - dout(12) << "read got " << got << " / " << r->get_length() << " bytes from obj " << hex << r->get_oid() << dec << endl; + dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl; logger->inc("rd"); if (got >= 0) logger->inc("rdb", got); // send it - messenger->send_message(reply, r->get_asker()); + messenger->send_message(reply, op->get_asker()); + + delete op; + + unlock_object(oid); +} + +void OSD::op_stat(MOSDOp *op, PG *pg) +{ + object_t oid = op->get_oid(); + lock_object(oid); + + // version? clean? + if (!object_complete(pg, oid, op)) { + unlock_object(oid); + return; + } + + struct stat st; + memset(&st, sizeof(st), 0); + int r = store->stat(oid, &st); + + dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl; + + MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap); + reply->set_object_size(st.st_size); + messenger->send_message(reply, op->get_asker()); + + logger->inc("stat"); + delete op; - delete r; + unlock_object(oid); } -// -- osd_write +// WRITE OPS void OSD::apply_write(MOSDOp *op, bool write_sync, version_t v) { @@ -1765,35 +1856,15 @@ void OSD::apply_write(MOSDOp *op, bool write_sync, version_t v) store->setattr(op->get_oid(), "version", &v, sizeof(v)); } - -void OSD::op_write(MOSDOp *op) +bool OSD::object_clean(PG *pg, object_t oid, version_t& v, Message *op) { - // PG - pg_t pgid = op->get_pg(); - PG *pg = open_pg(pgid); - if (!pg) { - dout(7) << "op_write pg " << hex << pgid << dec << " dne (yet)" << endl; - waiting_for_pg[pgid].push_back(op); - return; - } - if (!pg->is_peered()) { - dout(7) << "op_write " << *pg << " not peered (yet)" << endl; - waiting_for_pg_peered[pgid].push_back(op); - return; - } - - object_t oid = op->get_oid(); - - lock_object(oid); - - // version - version_t v = 0; // 0 == dne (yet) - + v = 0; + if (pg->is_complete() && pg->is_clean()) { // PG is complete+clean, easy shmeasy! if (store->exists(oid)) { - // inc version store->getattr(oid, "version", &v, sizeof(v)); + assert(v>0); } } else { // PG is recovering|replicating, blech. @@ -1813,25 +1884,38 @@ void OSD::op_write(MOSDOp *op) dout(10) << " pg not clean, checking if " << hex << oid << dec << " v " << v << " is specifically clean yet!" << endl; // object (logically) exists if (!pg->existant_object_is_clean(oid, v)) { - dout(7) << "op_write " << hex << oid << dec << " v " << v << " in " << *pg + dout(7) << "object " << hex << oid << dec << " v " << v << " in " << *pg << " exists but is not clean" << endl; waiting_for_clean_object[oid].push_back(op); - unlock_object(oid); - return; + return false; } } else { // object (logically) dne if (store->exists(oid) || !pg->nonexistant_object_is_clean(oid)) { - dout(7) << "op_write " << hex << oid << dec << " v " << v << " in " << *pg + dout(7) << "object " << hex << oid << dec << " v " << v << " in " << *pg << " dne but is not clean" << endl; waiting_for_clean_object[oid].push_back(op); - unlock_object(oid); - return; + return false; } } } + return true; +} + +void OSD::op_write(MOSDOp *op, PG *pg) +{ + object_t oid = op->get_oid(); + + lock_object(oid); + + // version? clean? + version_t v = 0; // 0 == dne (yet) + if (!object_clean(pg, oid, v, op)) { + unlock_object(oid); + return; + } v++; // we're good! dout(12) << "op_write " << hex << oid << dec << " v " << v << endl; @@ -1860,17 +1944,16 @@ void OSD::op_write(MOSDOp *op) replica_write_tids[op].insert(tid); replica_writes[tid] = op; - replica_pg_osd_tids[pgid][osd].insert(tid); + replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid); } replica_write_lock.Unlock(); // write apply_write(op, true, v); - PG *r = open_pg(pgid); if (v == 1) { // put new object in proper collection - r->add_object(store, oid); + pg->add_object(store, oid); } // reply? @@ -1891,52 +1974,23 @@ void OSD::op_write(MOSDOp *op) unlock_object(oid); } -/* -void OSD::handle_mkfs(MOSDMkfs *op) -{ - dout(3) << "MKFS" << endl; - - // wipe store - int r = store->mkfs(); - - // create PGs - list pg_list; - for (int nrep = 2; nrep < 4; nrep++) { - ps_t maxps = 1LL << osdmap->get_pg_bits(); - for (pg_t ps = 0; ps < maxps; ps++) { - pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep); - vector acting; - osdmap->pg_to_acting_osds(pgid, acting); - - if (acting[0] == whoami) { - PG *pg = create_pg(pgid); - pg->acting = acting; - pg->calc_role(whoami); - pg->state_set(PG_STATE_COMPLETE); +void OSD::op_delete(MOSDOp *op, PG *pg) +{ + object_t oid = op->get_oid(); - dout(7) << "created " << *pg << endl; + lock_object(oid); - pg_list.push_back(pgid); - } - } + // version? clean? + version_t v = 0; // 0 == dne (yet) + if (!object_clean(pg, oid, v, op)) { + unlock_object(oid); + return; } + v++; // we're good! - // activate! - if (osdmap) - activate_map(pg_list); - - // reply! - messenger->send_message(new MOSDMkfsAck(op), op->get_asker()); - - delete op; -} -*/ - -void OSD::op_delete(MOSDOp *op) -{ - int r = store->remove(op->get_oid()); + int r = store->remove(oid); dout(12) << "delete on " << hex << op->get_oid() << dec << " r = " << r << endl; // "ack" @@ -1944,11 +1998,25 @@ void OSD::op_delete(MOSDOp *op) logger->inc("rm"); delete op; + + unlock_object(oid); } -void OSD::op_truncate(MOSDOp *op) +void OSD::op_truncate(MOSDOp *op, PG *pg) { - int r = store->truncate(op->get_oid(), op->get_offset()); + object_t oid = op->get_oid(); + + lock_object(oid); + + // version? clean? + version_t v = 0; // 0 == dne (yet) + if (!object_clean(pg, oid, v, op)) { + unlock_object(oid); + return; + } + v++; // we're good! + + int r = store->truncate(oid, op->get_offset()); dout(3) << "truncate on " << hex << op->get_oid() << dec << " at " << op->get_offset() << " r = " << r << endl; // "ack" @@ -1957,22 +2025,8 @@ void OSD::op_truncate(MOSDOp *op) logger->inc("trunc"); delete op; -} -void OSD::op_stat(MOSDOp *op) -{ - struct stat st; - memset(&st, sizeof(st), 0); - int r = store->stat(op->get_oid(), &st); - - dout(3) << "stat on " << hex << op->get_oid() << dec << " r = " << r << " size = " << st.st_size << endl; - - MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap); - reply->set_object_size(st.st_size); - messenger->send_message(reply, op->get_asker()); - - logger->inc("stat"); - delete op; + unlock_object(oid); } void doop(OSD *u, MOSDOp *p) { diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index e6de3d3f0fbfc..3b0893b2118c3 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -51,8 +51,11 @@ class OSD : public Dispatcher { } // -- objects -- - int read_onode(onode_t& onode); - int write_onode(onode_t& onode); + //int read_onode(onode_t& onode); + //int write_onode(onode_t& onode); + + bool object_complete(PG *pg, object_t oid, Message *op); + bool object_clean(PG *pg, object_t oid, version_t& v, Message *op); // -- ops -- @@ -172,12 +175,14 @@ class OSD : public Dispatcher { void handle_ping(class MPing *m); void handle_op(class MOSDOp *m); - void op_read(class MOSDOp *m); - void op_write(class MOSDOp *m); - void op_mkfs(class MOSDOp *m); - void op_delete(class MOSDOp *m); - void op_truncate(class MOSDOp *m); - void op_stat(class MOSDOp *m); + + void op_read(class MOSDOp *m, PG *pg); + void op_stat(class MOSDOp *m, PG *pg); + void op_write(class MOSDOp *m, PG *pg); + void op_delete(class MOSDOp *m, PG *pg); + void op_truncate(class MOSDOp *m, PG *pg); + + //void op_mkfs(class MOSDOp *m); // for replication void handle_op_reply(class MOSDOpReply *m); diff --git a/ceph/osd/PG.cc b/ceph/osd/PG.cc index abbb820b6a426..200f3e5d01cf6 100644 --- a/ceph/osd/PG.cc +++ b/ceph/osd/PG.cc @@ -96,7 +96,7 @@ void PG::removed(object_t oid, version_t v, PGPeer *p) bool PG::existant_object_is_clean(object_t o, version_t v) { assert(is_peered() && !is_clean()); - + return objects_unrep.count(o) ? false:true; /* @@ -122,6 +122,7 @@ bool PG::nonexistant_object_is_clean(object_t o) it != peers.end(); it++) { //if (!it->second->is_active()) continue; + if (it->second->is_complete()) continue; if (it->second->peer_state.objects.count(o)) { return false; } @@ -142,60 +143,65 @@ void PG::plan_recovery(ObjectStore *store) map local_objects; scan_local_objects(local_objects, store); + dout(10) << " " << local_objects.size() << " local objects" << endl; + objects = local_objects; // start w/ local object set. - // newest objects -> objects - for (map::iterator pit = peers.begin(); - pit != peers.end(); - pit++) { - for (map::iterator oit = pit->second->peer_state.objects.begin(); - oit != pit->second->peer_state.objects.end(); - oit++) { - // know this object? - if (objects.count(oit->first)) { - object_t v = objects[oit->first]; - if (oit->second < v) // older? - continue; // useless - else if (oit->second == v) // same? - objects_nrep[oit->first]++; // not quite accurate bc local_objects isn't included in nrep - else { // newer! + if (!is_complete()) { + // newest objects -> objects + for (map::iterator pit = peers.begin(); + pit != peers.end(); + pit++) { + for (map::iterator oit = pit->second->peer_state.objects.begin(); + oit != pit->second->peer_state.objects.end(); + oit++) { + // know this object? + if (objects.count(oit->first)) { + object_t v = objects[oit->first]; + if (oit->second < v) // older? + continue; // useless + else if (oit->second == v) // same? + objects_nrep[oit->first]++; // not quite accurate bc local_objects isn't included in nrep + else { // newer! + objects[oit->first] = oit->second; + objects_nrep[oit->first] = 0; + objects_loc[oit->first] = pit->first; // note location. this will overwrite and be lame. + } + } else { + // newly seen object! objects[oit->first] = oit->second; objects_nrep[oit->first] = 0; objects_loc[oit->first] = pit->first; // note location. this will overwrite and be lame. } - } else { - // newly seen object! - objects[oit->first] = oit->second; - objects_nrep[oit->first] = 0; - objects_loc[oit->first] = pit->first; // note location. this will overwrite and be lame. } } - } - /* - // remove deleted objects - assim_deleted_objects(deleted_objects); // locally + /* + // remove deleted objects + assim_deleted_objects(deleted_objects); // locally for (map::iterator pit = peers.begin(); - pit != peers.end(); + pit != peers.end(); pit++) assim_deleted_objects(pit->second->peer_state.deleted); // on peers */ - // just cleanup old local objects - // FIXME: do this async? - for (map::iterator it = local_objects.begin(); - it != local_objects.end(); - it++) { - if (objects.count(it->first) && objects[it->first] == it->second) continue; // same! - - dout(10) << " local o " << hex << it->first << dec << " v " << it->second << " old, removing" << endl; - store->remove(it->first); - local_objects.erase(it->first); - } + // just cleanup old local objects + // FIXME: do this async? + for (map::iterator it = local_objects.begin(); + it != local_objects.end(); + it++) { + if (objects.count(it->first) && objects[it->first] == it->second) continue; // same! + + dout(10) << " local o " << hex << it->first << dec << " v " << it->second << " old, removing" << endl; + store->remove(it->first); + local_objects.erase(it->first); + } + // get complete PG + plan_pull(); + } - // make remote action plans! - plan_pull(); + // sync up replicas plan_push_cleanup(); } @@ -225,7 +231,7 @@ void PG::plan_pull() } if (pull_plan.empty()) { - dout(10) << "nothing to pull, marking complete" << endl; + dout(10) << " nothing to pull, marking complete" << endl; mark_complete(); } } @@ -248,6 +254,8 @@ void PG::plan_push_cleanup() PGPeer *pgp = peers[acting[r]]; assert(pgp); + if (pgp->is_complete()) continue; + if (pgp->peer_state.objects.count(oit->first) == 0 || oit->second < pgp->peer_state.objects[oit->first]) { dout(10) << " o " << hex << oit->first << dec << " v " << oit->second << " old|dne on osd" << pgp->get_peer() << ", pushing" << endl; @@ -270,6 +278,13 @@ void PG::plan_push_cleanup() if (role == 0) continue; // skip primary PGPeer *pgp = pit->second; + assert(pgp->is_active()); + if (pgp->is_complete()) { + dout(12) << " peer osd" << pit->first << " is complete" << endl; + continue; + } + dout(12) << " peer osd" << pit->first << " is !complete" << endl; + for (map::iterator oit = pit->second->peer_state.objects.begin(); oit != pit->second->peer_state.objects.end(); oit++) { @@ -286,7 +301,7 @@ void PG::plan_push_cleanup() } if (push_plan.empty() && clean_plan.empty()) { - dout(10) << "nothing to push|clean, marking clean" << endl; + dout(10) << " nothing to push|clean, marking clean" << endl; mark_clean(); } } diff --git a/ceph/osd/PG.h b/ceph/osd/PG.h index 1c46158698040..356d0abdadd42 100644 --- a/ceph/osd/PG.h +++ b/ceph/osd/PG.h @@ -38,12 +38,11 @@ class PGPeer { class PG *pg; private: int peer; - //int role; // 0 primary, 1+ replica, -1 residual int state; // peer state public: - PGReplicaInfo peer_state; + PGReplicaInfo peer_state; // only defined if active && !complete protected: // recovery: for pulling content from (old) replicas @@ -59,20 +58,18 @@ class PGPeer { PGPeer(class PG *pg, int p/*, int ro*/) : pg(pg), peer(p), - //role(ro), state(0) { } - //int get_role() { return role; } int get_peer() { return peer; } - bool state_test(int m) { return state & m != 0; } + + int get_state() { return state; } + bool state_test(int m) { return (state & m) != 0; } void state_set(int m) { state |= m; } void state_clear(int m) { state &= ~m; } bool is_active() { return state_test(PG_PEER_STATE_ACTIVE); } bool is_complete() { return state_test(PG_PEER_STATE_COMPLETE); } - - //bool is_residual() { return role < 0; } - bool is_empty() { return is_active() && peer_state.objects.empty(); } // *** && peer_state & COMPLETE + bool is_recovering() { return is_active() && !is_complete(); } bool has_latest(object_t o, version_t v) { if (is_complete()) return true; @@ -80,6 +77,7 @@ class PGPeer { return peer_state.objects[o] == v; } + // actors void pull(object_t o, version_t v) { pulling[o] = v; } bool is_pulling(object_t o) { return pulling.count(o); } version_t pulling_version(object_t o) { return pulling[o]; } -- 2.39.5