From 81de51359bde9f98d1e4c37a15dbcd055b6ffc65 Mon Sep 17 00:00:00 2001 From: sageweil Date: Thu, 1 Mar 2007 16:58:37 +0000 Subject: [PATCH] more transplanted old guts. blood everywhere. git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1149 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/pgs/osd/OSD.cc | 439 +++------------------- branches/sage/pgs/osd/OSD.h | 8 +- branches/sage/pgs/osd/PG.cc | 57 ++- branches/sage/pgs/osd/PG.h | 36 +- branches/sage/pgs/osd/ReplicatedPG.cc | 520 +++++++++++++++++++------- branches/sage/pgs/osd/ReplicatedPG.h | 55 ++- 6 files changed, 566 insertions(+), 549 deletions(-) diff --git a/branches/sage/pgs/osd/OSD.cc b/branches/sage/pgs/osd/OSD.cc index beed4e69f504c..e64a55f84f01f 100644 --- a/branches/sage/pgs/osd/OSD.cc +++ b/branches/sage/pgs/osd/OSD.cc @@ -1957,196 +1957,6 @@ void OSD::handle_pg_remove(MOSDPGRemove *m) /*** RECOVERY ***/ -/** pull - request object from a peer - */ -void OSD::pull(PG *pg, object_t oid) -{ - assert(pg->missing.loc.count(oid)); - eversion_t v = pg->missing.missing[oid]; - int osd = pg->missing.loc[oid]; - - dout(7) << *pg << " pull " << oid - << " v " << v - << " from osd" << osd - << endl; - - // send op - tid_t tid = ++last_tid; - MOSDOp *op = new MOSDOp(messenger->get_myinst(), 0, tid, - oid, pg->get_pgid(), - osdmap->get_epoch(), - OSD_OP_PULL); - op->set_version(v); - messenger->send_message(op, osdmap->get_inst(osd)); - - // take note - assert(pg->objects_pulling.count(oid) == 0); - num_pulling++; - pg->objects_pulling[oid] = v; -} - - -/** push - send object to a peer - */ -void OSD::push(PG *pg, object_t oid, int dest) -{ - // read data+attrs - bufferlist bl; - eversion_t v; - int vlen = sizeof(v); - map attrset; - - ObjectStore::Transaction t; - t.read(oid, 0, 0, &bl); - t.getattr(oid, "version", &v, &vlen); - t.getattrs(oid, attrset); - unsigned tr = store->apply_transaction(t); - - assert(tr == 0); // !!! - - // ok - dout(7) << *pg << " push " << oid << " v " << v - << " size " << bl.length() - << " to osd" << dest - << endl; - - logger->inc("r_push"); - logger->inc("r_pushb", bl.length()); - - // send - MOSDOp *op = new MOSDOp(messenger->get_myinst(), 0, ++last_tid, - oid, pg->info.pgid, osdmap->get_epoch(), - OSD_OP_PUSH); - op->set_offset(0); - op->set_length(bl.length()); - op->set_data(bl); // note: claims bl, set length above here! - op->set_version(v); - op->set_attrset(attrset); - - messenger->send_message(op, osdmap->get_inst(dest)); -} - - -/** op_pull - * process request to pull an entire object. - * NOTE: called from opqueue. - */ -void OSD::op_pull(MOSDOp *op, PG *pg) -{ - const object_t oid = op->get_oid(); - const eversion_t v = op->get_version(); - int from = op->get_source().num(); - - dout(7) << *pg << " op_pull " << oid << " v " << op->get_version() - << " from " << op->get_source() - << endl; - - // is a replica asking? are they missing it? - if (pg->is_primary()) { - // primary - assert(pg->peer_missing.count(from)); // we had better know this, from the peering process. - - if (!pg->peer_missing[from].is_missing(oid)) { - dout(7) << *pg << " op_pull replica isn't actually missing it, we must have already pushed to them" << endl; - delete op; - return; - } - - // do we have it yet? - if (waitfor_missing_object(op, pg)) - return; - } else { - // non-primary - if (pg->missing.is_missing(oid)) { - dout(7) << *pg << " op_pull not primary, and missing " << oid << ", ignoring" << endl; - delete op; - return; - } - } - - // push it back! - push(pg, oid, op->get_source().num()); -} - - -/** op_push - * NOTE: called from opqueue. - */ -void OSD::op_push(MOSDOp *op, PG *pg) -{ - object_t oid = op->get_oid(); - eversion_t v = op->get_version(); - - if (!pg->missing.is_missing(oid)) { - dout(7) << *pg << " op_push not missing " << oid << endl; - return; - } - - dout(7) << *pg << " op_push " - << oid - << " v " << v - << " size " << op->get_length() << " " << op->get_data().length() - << endl; - - assert(op->get_data().length() == op->get_length()); - - // write object and add it to the PG - ObjectStore::Transaction t; - t.remove(oid); // in case old version exists - t.write(oid, 0, op->get_length(), op->get_data()); - t.setattrs(oid, op->get_attrset()); - t.collection_add(pg->info.pgid, oid); - - // close out pull op? - num_pulling--; - if (pg->objects_pulling.count(oid)) - pg->objects_pulling.erase(oid); - pg->missing.got(oid, v); - - - // raise last_complete? - assert(pg->log.complete_to != pg->log.log.end()); - while (pg->log.complete_to != pg->log.log.end()) { - if (pg->missing.missing.count(pg->log.complete_to->oid)) break; - if (pg->info.last_complete < pg->log.complete_to->version) - pg->info.last_complete = pg->log.complete_to->version; - pg->log.complete_to++; - } - dout(10) << *pg << " last_complete now " << pg->info.last_complete << endl; - - - // apply to disk! - t.collection_setattr(pg->info.pgid, "info", &pg->info, sizeof(pg->info)); - unsigned r = store->apply_transaction(t); - assert(r == 0); - - - - // am i primary? are others missing this too? - if (pg->is_primary()) { - for (unsigned i=1; iacting.size(); i++) { - int peer = pg->acting[i]; - assert(pg->peer_missing.count(peer)); - if (pg->peer_missing[peer].is_missing(oid)) { - // ok, push it, and they (will) have it now. - pg->peer_missing[peer].got(oid, v); - push(pg, oid, peer); - } - } - } - - // continue recovery - pg->do_recovery(); - - // kick waiters - if (pg->waiting_for_missing_object.count(oid)) - take_waiters(pg->waiting_for_missing_object[oid]); - - delete op; -} - - - // op_rep_modify @@ -2213,7 +2023,8 @@ void OSD::handle_op(MOSDOp *op) delete op; return; } - if (!read && !pg->same_for_modify_since(op->get_map_epoch())) { + if (!read && (pg->get_primary() != whoami || + !pg->same_for_modify_since(op->get_map_epoch()))) { dout(7) << "handle_rep_op pg changed " << pg->info.history << " after " << op->get_map_epoch() << ", dropping" << endl; @@ -2244,6 +2055,11 @@ void OSD::handle_op(MOSDOp *op) } // missing object? + if (pg->is_missing_object(op->get_oid())) { + pg->wait_for_missing_object(op->get_oid(), op); + return; + } + /* if (read && op->get_oid().rev > 0) { // versioned read. hrm. // are we missing a revision that we might need? @@ -2273,6 +2089,7 @@ void OSD::handle_op(MOSDOp *op) if (op->get_op() != OSD_OP_PUSH && waitfor_missing_object(op, pg)) return; } + */ dout(7) << "handle_op " << *op << " in " << *pg << endl; @@ -2486,18 +2303,22 @@ void OSD::do_op(Message *m, PG *pg) // reads case OSD_OP_READ: - op_read(op, pg); + if (block_if_wrlocked(op)) + return; + pg->op_read(op); break; case OSD_OP_STAT: - op_stat(op, pg); + if (block_if_wrlocked(op)) + return; + pg->op_stat(op); break; // rep stuff case OSD_OP_PULL: - op_pull(op, pg); + pg->op_pull(op); break; case OSD_OP_PUSH: - op_push(op, pg); + pg->op_push(op); break; // writes @@ -2512,10 +2333,33 @@ void OSD::do_op(Message *m, PG *pg) case OSD_OP_RDUNLOCK: case OSD_OP_UPLOCK: case OSD_OP_DNLOCK: - if (op->get_source().is_osd()) - op_rep_modify(op, pg); - else - op_modify(op, pg); + if (op->get_source().is_osd()) { + pg->op_rep_modify(op, pg); + } else { + // locked by someone else? + // for _any_ op type -- eg only the locker can unlock! + if (op->get_op() != OSD_OP_WRNOOP && // except WRNOOP; we just want to flush + block_if_wrlocked(op)) + return; // op will be handled later, after the object unlocks + + // share latest osd map with rest of pg? + osd_lock.Lock(); + { + for (unsigned i=1; iacting.size(); i++) { + int osd = pg->acting[i]; + _share_map_outgoing( osdmap->get_inst(osd) ); + } + } + osd_lock.Unlock(); + + // go go gadget pg + pg->op_modify(op); + + if (op->get_op() == OSD_OP_WRITE) { + logger->inc("c_wr"); + logger->inc("c_wrb", op->get_length()); + } + } break; default: @@ -2523,23 +2367,7 @@ void OSD::do_op(Message *m, PG *pg) } } else if (m->get_type() == MSG_OSD_OPREPLY) { - // must be replication. - MOSDOpReply *r = (MOSDOpReply*)m; - tid_t rep_tid = r->get_rep_tid(); - - if (pg->repop_gather.count(rep_tid)) { - // oh, good. - int fromosd = r->get_source().num(); - repop_ack(pg, pg->repop_gather[rep_tid], - r->get_result(), r->get_commit(), - fromosd, - r->get_pg_complete_thru()); - delete m; - } else { - // early ack. - pg->waiting_for_repop[rep_tid].push_back(r); - } - + pg->op_reply((MOSDOpReply*)m); } else assert(0); } @@ -2560,6 +2388,8 @@ void OSD::wait_for_no_ops() } + + // ============================== // Object locking @@ -2589,180 +2419,3 @@ bool OSD::block_if_wrlocked(MOSDOp* op) // =============================== // OPS - -/* -int OSD::list_missing_revs(object_t oid, set& revs, PG *pg) -{ - int c = 0; - oid.rev = 0; - - map::iterator p = pg->missing.missing.lower_bound(oid); - if (p == pg->missing.missing.end()) - return 0; // clearly not - - while (p->first.ino == oid.ino && - p->first.bno == oid.bno) { - revs.insert(p->first); - c++; - } - return c; -}*/ - -bool OSD::pick_missing_object_rev(object_t& oid, PG *pg) -{ - map::iterator p = pg->missing.missing.upper_bound(oid); - if (p == pg->missing.missing.end()) - return false; // clearly no candidate - - if (p->first.ino == oid.ino && p->first.bno == oid.bno) { - oid = p->first; // yes! it's an upper bound revision for me. - return true; - } - return false; -} - -bool OSD::pick_object_rev(object_t& oid) -{ - object_t t = oid; - - if (!store->pick_object_revision_lt(t)) - return false; // we have no revisions of this object! - - objectrev_t crev; - int r = store->getattr(t, "crev", &crev, sizeof(crev)); - assert(r >= 0); - if (crev <= oid.rev) { - dout(10) << "pick_object_rev choosing " << t << " crev " << crev << " for " << oid << endl; - oid = t; - return true; - } - - return false; -} - -bool OSD::waitfor_missing_object(MOSDOp *op, PG *pg) -{ - const object_t oid = op->get_oid(); - - // are we missing the object? - if (pg->missing.missing.count(oid)) { - // we don't have it (yet). - eversion_t v = pg->missing.missing[oid]; - if (pg->objects_pulling.count(oid)) { - dout(7) << "missing " - << oid - << " v " << v - << " in " << *pg - << ", already pulling" - << endl; - } else { - dout(7) << "missing " - << oid - << " v " << v - << " in " << *pg - << ", pulling" - << endl; - pull(pg, oid); - } - pg->waiting_for_missing_object[oid].push_back(op); - return true; - } - - return false; -} - - - - -// READ OPS - -/** op_read - * client read op - * NOTE: called from opqueue. - */ -void OSD::op_read(MOSDOp *op, PG *pg) -{ - object_t oid = op->get_oid(); - - // if the target object is locked for writing by another client, put 'op' to the waiting queue - // for _any_ op type -- eg only the locker can unlock! - if (block_if_wrlocked(op)) return; // op will be handled later, after the object unlocks - - int r = pg->op_read(op); - - logger->inc("rd"); - if (r >= 0) { - logger->inc("c_rd"); - logger->inc("c_rdb", r); - logger->inc("rdb", r); - } - -} - - -/** op_stat - * client stat - * NOTE: called from opqueue - */ -void OSD::op_stat(MOSDOp *op)//, PG *pg) -{ - object_t oid = op->get_oid(); - - // if the target object is locked for writing by another client, put 'op' to the waiting queue - if (block_if_wrlocked(op)) return; //read will be handled later, after the object unlocks - - pg->op_stat(op); - - logger->inc("stat"); -} - - - - - - -/** op_modify - * process client modify op - * NOTE: called from opqueue. - */ -void OSD::op_modify(MOSDOp *op, PG *pg) -{ - object_t oid = op->get_oid(); - const char *opname = MOSDOp::get_opname(op->get_op()); - - // locked by someone else? - // for _any_ op type -- eg only the locker can unlock! - if (op->get_op() != OSD_OP_WRNOOP && // except WRNOOP; we just want to flush - block_if_wrlocked(op)) - return; // op will be handled later, after the object unlocks - - - // dup op? - if (pg->is_dup(op->get_reqid())) { - dout(-3) << "op_modify " << opname << " dup op " << op->get_reqid() - << ", doing WRNOOP" << endl; - op->set_op(OSD_OP_WRNOOP); - opname = MOSDOp::get_opname(op->get_op()); - } - - // share latest osd map with rest of pg? - osd_lock.Lock(); - { - for (unsigned i=1; iacting.size(); i++) { - int osd = pg->acting[i]; - _share_map_outgoing( osdmap->get_inst(osd) ); - } - } - osd_lock.Unlock(); - - if (op->get_op() == OSD_OP_WRITE) { - logger->inc("c_wr"); - logger->inc("c_wrb", op->get_length()); - } - - // go - pg->assign_version(op); - pg->op_modify(op); -} - - diff --git a/branches/sage/pgs/osd/OSD.h b/branches/sage/pgs/osd/OSD.h index 082e3465e333c..6174997ba9ee4 100644 --- a/branches/sage/pgs/osd/OSD.h +++ b/branches/sage/pgs/osd/OSD.h @@ -144,12 +144,12 @@ public: objectrev_t crev, objectrev_t rev, PG *pg); bool waitfor_missing_object(MOSDOp *op, PG *pg); - bool pick_missing_object_rev(object_t& oid, PG *pg); - bool pick_object_rev(object_t& oid); - friend class PG; + friend class PG; + friend class ReplicatedPG; + friend class C_OSD_WriteCommit; protected: @@ -238,6 +238,8 @@ public: public: OSD(int id, Messenger *m, MonMap *mm, char *dev = 0); ~OSD(); + + int get_nodeid() { return whoami; } // startup/shutdown int init(); diff --git a/branches/sage/pgs/osd/PG.cc b/branches/sage/pgs/osd/PG.cc index f1a630428e49d..4c757ab4d0f33 100644 --- a/branches/sage/pgs/osd/PG.cc +++ b/branches/sage/pgs/osd/PG.cc @@ -1334,7 +1334,60 @@ void PG::read_log(ObjectStore *store) -bool PG::is_dup(reqid_t rid) + + + +// ======================= +// revisions + + +/* +int OSD::list_missing_revs(object_t oid, set& revs, PG *pg) { - return pg->log.logged_req(rid); + int c = 0; + oid.rev = 0; + + map::iterator p = pg->missing.missing.lower_bound(oid); + if (p == pg->missing.missing.end()) + return 0; // clearly not + + while (p->first.ino == oid.ino && + p->first.bno == oid.bno) { + revs.insert(p->first); + c++; + } + return c; +}*/ + +bool PG::pick_missing_object_rev(object_t& oid) +{ + map::iterator p = missing.missing.upper_bound(oid); + if (p == missing.missing.end()) + return false; // clearly no candidate + + if (p->first.ino == oid.ino && p->first.bno == oid.bno) { + oid = p->first; // yes! it's an upper bound revision for me. + return true; + } + return false; } + +bool PG::pick_object_rev(object_t& oid) +{ + object_t t = oid; + + if (!osd->store->pick_object_revision_lt(t)) + return false; // we have no revisions of this object! + + objectrev_t crev; + int r = osd->store->getattr(t, "crev", &crev, sizeof(crev)); + assert(r >= 0); + if (crev <= oid.rev) { + dout(10) << "pick_object_rev choosing " << t << " crev " << crev << " for " << oid << endl; + oid = t; + return true; + } + + return false; +} + diff --git a/branches/sage/pgs/osd/PG.h b/branches/sage/pgs/osd/PG.h index b5a245ca96c17..331b664e030b5 100644 --- a/branches/sage/pgs/osd/PG.h +++ b/branches/sage/pgs/osd/PG.h @@ -16,7 +16,7 @@ #include "include/types.h" -#include "include/osd_types.h" +#include "osd_types.h" #include "include/buffer.h" #include "OSDMap.h" @@ -31,6 +31,8 @@ using namespace __gnu_cxx; class OSD; +class MOSDOp; +class MOSDOpReply; /** PG - Replica Placement Group @@ -490,6 +492,7 @@ public: peers_complete_thru(0), have_master_log(true) { } + virtual ~PG() {} pg_t get_pgid() const { return info.pgid; } int get_nrep() const { return acting.size(); } @@ -546,19 +549,30 @@ public: void trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v); - bool is_dup(reqid_t rid); + bool is_dup(reqid_t rid) { + return log.logged_req(rid); + } - // abstract bits - void op_stat(MOSDOp *op) = 0; - int op_read(MOSDOp *op) = 0; - void op_modify(MOSDOp *op) = 0; - bool same_for_read_since(epoch_t e); - bool same_for_modify_since(epoch_t e); - bool same_for_rep_modify_since(epoch_t e); + bool pick_missing_object_rev(object_t& oid); + bool pick_object_rev(object_t& oid); + - bool is_missing_object(object_t oid); - void wait_for_missing_object(object_t oid, op); + + // abstract bits + virtual void op_stat(MOSDOp *op) = 0; + virtual int op_read(MOSDOp *op) = 0; + virtual void op_modify(MOSDOp *op) = 0; + virtual void op_push(MOSDOp *op) = 0; + virtual void op_pull(MOSDOp *op) = 0; + virtual void op_reply(MOSDOpReply *op) = 0; + + virtual bool same_for_read_since(epoch_t e); + virtual bool same_for_modify_since(epoch_t e); + virtual bool same_for_rep_modify_since(epoch_t e); + + virtual bool is_missing_object(object_t oid); + virtual void wait_for_missing_object(object_t oid, MOSDOp *op); }; diff --git a/branches/sage/pgs/osd/ReplicatedPG.cc b/branches/sage/pgs/osd/ReplicatedPG.cc index 931f4d67698ac..bb5629cd34100 100644 --- a/branches/sage/pgs/osd/ReplicatedPG.cc +++ b/branches/sage/pgs/osd/ReplicatedPG.cc @@ -13,14 +13,25 @@ #include "ReplicatedPG.h" +#include "OSD.h" +#include "common/Logger.h" + +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" #include "config.h" #undef dout -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << g_clock.now() << " osd" << osd->whoami << " " << (osd->osdmap ? osd->osdmap->get_epoch():0) << " " << *this << " " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << g_clock.now() << " osd" << osd->get_nodeid() << " " << (osd->osdmap ? osd->osdmap->get_epoch():0) << " " << *this << " " + +#include +#include + +// ======================= +// pg changes bool ReplicatedPG::same_for_read_since(epoch_t e) { @@ -29,8 +40,7 @@ bool ReplicatedPG::same_for_read_since(epoch_t e) bool ReplicatedPG::same_for_modify_since(epoch_t e) { - return (get_primary() == whoami && - e >= info.history.same_primary_since); + return (e >= info.history.same_primary_since); } bool ReplicatedPG::same_for_rep_modify_since(epoch_t e) @@ -41,11 +51,13 @@ bool ReplicatedPG::same_for_rep_modify_since(epoch_t e) return e >= info.history.same_since; // whole pg set same } else { // primary, splay - return (e >= info.history.same_primary_since &&| + return (e >= info.history.same_primary_since && e >= info.history.same_acker_since); } } +// ==================== +// missing objects bool ReplicatedPG::is_missing_object(object_t oid) { @@ -53,10 +65,27 @@ bool ReplicatedPG::is_missing_object(object_t oid) } - -void ReplicatedPG::wait_for_missing_object(object_t oid, op) +void ReplicatedPG::wait_for_missing_object(object_t oid, MOSDOp *op) { - + assert(is_missing_object(oid)); + + // we don't have it (yet). + eversion_t v = missing.missing[oid]; + if (objects_pulling.count(oid)) { + dout(7) << "missing " + << oid + << " v " << v + << ", already pulling" + << endl; + } else { + dout(7) << "missing " + << oid + << " v " << v + << ", pulling" + << endl; + pull(oid); + } + waiting_for_missing_object[oid].push_back(op); } @@ -67,6 +96,8 @@ void ReplicatedPG::wait_for_missing_object(object_t oid, op) int ReplicatedPG::op_read(MOSDOp *op) { + object_t oid = op->get_oid(); + dout(10) << "op_read " << oid << " " << op->get_offset() << "~" << op->get_length() //<< " in " << *pg @@ -74,19 +105,19 @@ int ReplicatedPG::op_read(MOSDOp *op) long r = 0; bufferlist bl; - + if (oid.rev && !pick_object_rev(oid)) { // we have no revision for this request. r = -EEXIST; } else { // read into a buffer - r = store->read(oid, - op->get_offset(), op->get_length(), - bl); + r = osd->store->read(oid, + op->get_offset(), op->get_length(), + bl); } // set up reply - MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap->get_epoch(), true); + MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); if (r >= 0) { reply->set_result(0); reply->set_data(bl); @@ -109,6 +140,8 @@ int ReplicatedPG::op_read(MOSDOp *op) void ReplicatedPG::op_stat(MOSDOp *op) { + object_t oid = op->get_oid(); + struct stat st; memset(&st, sizeof(st), 0); int r = 0; @@ -117,7 +150,7 @@ void ReplicatedPG::op_stat(MOSDOp *op) // we have no revision for this request. r = -EEXIST; } else { - r = store->stat(oid, &st); + r = osd->store->stat(oid, &st); } dout(3) << "op_stat on " << oid @@ -126,7 +159,7 @@ void ReplicatedPG::op_stat(MOSDOp *op) //<< " in " << *pg << endl; - MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap->get_epoch(), true); + MOSDOpReply *reply = new MOSDOpReply(op, r, osd->osdmap->get_epoch(), true); reply->set_object_size(st.st_size); osd->messenger->send_message(reply, op->get_client_inst()); @@ -139,11 +172,10 @@ void ReplicatedPG::op_stat(MOSDOp *op) // ======================================================================== // MODIFY -void OSD::prepare_log_transaction(ObjectStore::Transaction& t, - MOSDOp *op, eversion_t& version, - objectrev_t crev, objectrev_t rev, - PG *pg, - eversion_t trim_to) +void ReplicatedPG::prepare_log_transaction(ObjectStore::Transaction& t, + MOSDOp *op, eversion_t& version, + objectrev_t crev, objectrev_t rev, + eversion_t trim_to) { const object_t oid = op->get_oid(); @@ -151,12 +183,12 @@ void OSD::prepare_log_transaction(ObjectStore::Transaction& t, if (crev && rev && rev > crev) { eversion_t cv = version; cv.version--; - PG::Log::Entry cloneentry(PG::Log::Entry::CLONE, oid, cv, op->get_reqid()); - pg->log.add(cloneentry); + Log::Entry cloneentry(PG::Log::Entry::CLONE, oid, cv, op->get_reqid()); + log.add(cloneentry); dout(10) << "prepare_log_transaction " << op->get_op() << " " << cloneentry - << " in " << *pg << endl; + << endl; } // actual op @@ -166,26 +198,25 @@ void OSD::prepare_log_transaction(ObjectStore::Transaction& t, dout(10) << "prepare_log_transaction " << op->get_op() << " " << logentry - << " in " << *pg << endl; + << endl; // append to log - assert(version > pg->log.top); - pg->log.add(logentry); - assert(pg->log.top == version); - dout(10) << "prepare_log_transaction appended to " << *pg << endl; + assert(version > log.top); + log.add(logentry); + assert(log.top == version); + dout(10) << "prepare_log_transaction appended" << endl; // write to pg log on disk - pg->append_log(t, logentry, trim_to); + append_log(t, logentry, trim_to); } /** prepare_op_transaction * apply an op to the store wrapped in a transaction. */ -void OSD::prepare_op_transaction(ObjectStore::Transaction& t, - MOSDOp *op, eversion_t& version, - objectrev_t crev, objectrev_t rev, - PG *pg) +void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t, + MOSDOp *op, eversion_t& version, + objectrev_t crev, objectrev_t rev) { const object_t oid = op->get_oid(); const pg_t pgid = op->get_pg(); @@ -197,22 +228,22 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t, << " v " << version << " crev " << crev << " rev " << rev - << " in " << *pg << endl; + << endl; // WRNOOP does nothing. if (op->get_op() == OSD_OP_WRNOOP) return; // raise last_complete? - if (pg->info.last_complete == pg->info.last_update) - pg->info.last_complete = version; + if (info.last_complete == info.last_update) + info.last_complete = version; // raise last_update. - assert(version > pg->info.last_update); - pg->info.last_update = version; + assert(version > info.last_update); + info.last_update = version; // write pg info - t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info)); + t.collection_setattr(pgid, "info", &info, sizeof(info)); // clone? if (crev && rev && rev > crev) { @@ -238,9 +269,9 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t, t.rmattr(oid, "wrlock"); // unblock all operations that were waiting for this object to become unlocked - if (waiting_for_wr_unlock.count(oid)) { - take_waiters(waiting_for_wr_unlock[oid]); - waiting_for_wr_unlock.erase(oid); + if (osd->waiting_for_wr_unlock.count(oid)) { + osd->take_waiters(osd->waiting_for_wr_unlock[oid]); + osd->waiting_for_wr_unlock.erase(oid); } } break; @@ -261,7 +292,7 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t, assert(0); // are you sure this is what you want? // zero, remove, or truncate? struct stat st; - int r = store->stat(oid, &st); + int r = osd->store->stat(oid, &st); if (r >= 0) { if (op->get_offset() + op->get_length() >= st.st_size) { if (op->get_offset()) @@ -324,13 +355,30 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t, // ======================================================================== // rep op gather -void ReplicatedPG::get_repop_gather(Gather *repop) +class C_OSD_WriteCommit : public Context { +public: + OSD *osd; + pg_t pgid; + tid_t rep_tid; + eversion_t pg_last_complete; + C_OSD_WriteCommit(OSD *o, pg_t p, tid_t rt, eversion_t lc) : osd(o), pgid(p), rep_tid(rt), pg_last_complete(lc) {} + void finish(int r) { + ReplicatedPG *pg = (ReplicatedPG*)(osd->_lock_pg(pgid)); + if (pg) { + pg->op_modify_commit(rep_tid, pg_last_complete); + osd->_unlock_pg(pg->info.pgid); + } + } +}; + + +void ReplicatedPG::get_rep_gather(RepGather *repop) { //repop->lock.Lock(); dout(10) << "get_repop " << *repop << endl; } -void ReplicatedPG::apply_repop(Gather *repop) +void ReplicatedPG::apply_repop(RepGather *repop) { dout(10) << "apply_repop applying update on " << *repop << endl; assert(!repop->applied); @@ -346,7 +394,7 @@ void ReplicatedPG::apply_repop(Gather *repop) repop->applied = true; } -void ReplicatedPG::put_repop_gather(Gather *repop) +void ReplicatedPG::put_rep_gather(RepGather *repop) { dout(10) << "put_repop " << *repop << endl; @@ -399,8 +447,8 @@ void ReplicatedPG::put_repop_gather(Gather *repop) dout(10) << "put_repop deleting " << *repop << endl; //repop->lock.Unlock(); - assert(repop_gather.count(repop->rep_tid)); - repop_gather.erase(repop->rep_tid); + assert(rep_gather.count(repop->rep_tid)); + rep_gather.erase(repop->rep_tid); delete repop->op; delete repop; @@ -411,13 +459,13 @@ void ReplicatedPG::put_repop_gather(Gather *repop) } -void ReplicatedPG::issue_repop(MOSDOp *op, int osd) +void ReplicatedPG::issue_repop(MOSDOp *op, int dest) { object_t oid = op->get_oid(); dout(7) << " issue_repop rep_tid " << op->get_rep_tid() << " o " << oid - << " to osd" << osd + << " to osd" << dest << endl; // forward the write/update/whatever @@ -432,16 +480,16 @@ void ReplicatedPG::issue_repop(MOSDOp *op, int osd) wr->set_version(op->get_version()); wr->set_rep_tid(op->get_rep_tid()); - wr->set_pg_trim_to(pg->peers_complete_thru); + wr->set_pg_trim_to(peers_complete_thru); - osd->messenger->send_message(wr, osdmap->get_inst(osd)); + osd->messenger->send_message(wr, osd->osdmap->get_inst(dest)); } -Gather *ReplicatedPG::new_repop_gather(MOSDOp *op) +ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op) { - dout(10) << "new_repop_gather rep_tid " << op->get_rep_tid() << " on " << *op << endl; + dout(10) << "new_rep_gather rep_tid " << op->get_rep_tid() << " on " << *op << endl; - Gather *repop = new Gather(op, op->get_rep_tid(), + RepGather *repop = new RepGather(op, op->get_rep_tid(), op->get_version(), info.last_complete); @@ -478,11 +526,11 @@ Gather *ReplicatedPG::new_repop_gather(MOSDOp *op) repop->start = g_clock.now(); - repop_gather[ repop->rep_tid ] = repop; + rep_gather[ repop->rep_tid ] = repop; // anyone waiting? (acks that got here before the op did) if (waiting_for_repop.count(repop->rep_tid)) { - take_waiters(waiting_for_repop[repop->rep_tid]); + osd->take_waiters(waiting_for_repop[repop->rep_tid]); waiting_for_repop.erase(repop->rep_tid); } @@ -490,7 +538,7 @@ Gather *ReplicatedPG::new_repop_gather(MOSDOp *op) } -void ReplicatedPG::repop_ack(Gather *repop, +void ReplicatedPG::repop_ack(RepGather *repop, int result, bool commit, int fromosd, eversion_t pg_complete_thru) { @@ -500,7 +548,7 @@ void ReplicatedPG::repop_ack(Gather *repop, << " result " << result << " commit " << commit << " from osd" << fromosd << endl; - get_repop_gather(repop); + get_rep_gather(repop); { if (commit) { // commit @@ -513,7 +561,7 @@ void ReplicatedPG::repop_ack(Gather *repop, repop->waitfor_ack.erase(fromosd); } } - put_repop_gather(repop); + put_rep_gather(repop); } @@ -529,20 +577,53 @@ void ReplicatedPG::repop_ack(Gather *repop, + + + + + + + + + +/** op_modify_commit + * transaction commit on the acker. + */ +void ReplicatedPG::op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru) +{ + if (rep_gather.count(rep_tid)) { + RepGather *repop = rep_gather[rep_tid]; + + dout(10) << "op_modify_commit " << *repop->op << endl; + get_rep_gather(repop); + { + assert(repop->waitfor_commit.count(osd->get_nodeid())); + repop->waitfor_commit.erase(osd->get_nodeid()); + repop->pg_complete_thru[osd->get_nodeid()] = pg_complete_thru; + } + put_rep_gather(repop); + dout(10) << "op_modify_commit done on " << repop << endl; + } else { + dout(10) << "op_modify_commit rep_tid " << rep_tid << " dne" << endl; + } +} + + + void ReplicatedPG::assign_version(MOSDOp *op) { // check crev objectrev_t crev = 0; - store->getattr(oid, "crev", (char*)&crev, sizeof(crev)); + osd->store->getattr(oid, "crev", (char*)&crev, sizeof(crev)); // assign version eversion_t clone_version; - eversion_t nv = pg->log.top; + eversion_t nv = log.top; if (op->get_op() != OSD_OP_WRNOOP) { - nv.epoch = osdmap->get_epoch(); + nv.epoch = osd->osdmap->get_epoch(); nv.version++; - assert(nv > pg->info.last_update); - assert(nv > pg->log.top); + assert(nv > info.last_update); + assert(nv > log.top); // will clone? if (crev && op->get_rev() && op->get_rev() > crev) { @@ -567,66 +648,26 @@ void ReplicatedPG::assign_version(MOSDOp *op) // set version in op, for benefit of client and our eventual reply op->set_version(nv); - -} - - - - - - - - -class C_OSD_WriteCommit : public Context { -public: - OSD *osd; - pg_t pgid; - tid_t rep_tid; - eversion_t pg_last_complete; - C_OSD_WriteCommit(OSD *o, pg_t p, tid_t rt, eversion_t lc) : osd(o), pgid(p), rep_tid(rt), pg_last_complete(lc) {} - void finish(int r) { - ReplicatedPG *pg = (ReplicatedPG*)osd->_lock_pg(pgid); - if (pg) { - pg->op_modify_commit(rep_tid, pg_last_complete); - osd->_unlock_pg(pg); - } - } -}; - - -/** op_modify_commit - * transaction commit on the acker. - */ -void ReplicatedPG::op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru) -{ - lock(); - - if (repop_gather.count(rep_tid)) { - Gather *repop = repop_gather[rep_tid]; - - dout(10) << "op_modify_commit " << *repop->op << endl; - get_repop_gather(repop); - { - assert(repop->waitfor_commit.count(whoami)); - repop->waitfor_commit.erase(whoami); - repop->pg_complete_thru[whoami] = pg_complete_thru; - } - put_repop_gather(repop); - dout(10) << "op_modify_commit done on " << repop << endl; - } else { - dout(10) << "op_modify_commit pg " << pgid << " rep_tid " << rep_tid << " dne" << endl; - } } - - void ReplicatedPG::op_modify(MOSDOp *op) { object_t oid = op->get_oid(); const char *opname = MOSDOp::get_opname(op->get_op()); + // dup op? + if (is_dup(op->get_reqid())) { + dout(-3) << "op_modify " << opname << " dup op " << op->get_reqid() + << ", doing WRNOOP" << endl; + op->set_op(OSD_OP_WRNOOP); + opname = MOSDOp::get_opname(op->get_op()); + } + + // assign the op a version + assign_version(op); + // are any peers missing this? for (unsigned i=1; iacting.size(); i++) { int peer = pg->acting[i]; @@ -648,7 +689,7 @@ void ReplicatedPG::op_modify(MOSDOp *op) << endl; // issue replica writes - Gather *repop = 0; + RepGather *repop = 0; bool alone = (pg->acting.size() == 1); tid_t rep_tid = ++last_tid; op->set_rep_tid(rep_tid); @@ -667,7 +708,7 @@ void ReplicatedPG::op_modify(MOSDOp *op) issue_repop(pg, op, pg->acting[i]); } else { // primary rep, or alone. - repop = new_repop_gather(pg, op); + repop = new_rep_gather(pg, op); // send to rest. if (!alone) @@ -685,12 +726,12 @@ void ReplicatedPG::op_modify(MOSDOp *op) // (logical) local ack. // (if alone, this will apply the update.) - get_repop_gather(repop); + get_rep_gather(repop); { assert(repop->waitfor_ack.count(whoami)); repop->waitfor_ack.erase(whoami); } - put_repop_gather(pg, repop); + put_rep_gather(pg, repop); } else { // chain or splay. apply. @@ -787,7 +828,7 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op) ObjectStore::Transaction t; // am i acker? - Gather *repop = 0; + RepGather *repop = 0; int ackerosd = pg->acting[0]; if ((g_conf.osd_rep == OSD_REP_CHAIN || g_conf.osd_rep == OSD_REP_SPLAY)) { @@ -795,20 +836,20 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op) if (pg->is_acker()) { // i am tail acker. - if (pg->repop_gather.count(op->get_rep_tid())) { - repop = pg->repop_gather[ op->get_rep_tid() ]; + if (pg->rep_gather.count(op->get_rep_tid())) { + repop = pg->rep_gather[ op->get_rep_tid() ]; } else { - repop = new_repop_gather(pg, op); + repop = new_rep_gather(pg, op); } // infer ack from source int fromosd = op->get_source().num(); - get_repop_gather(repop); + get_rep_gather(repop); { //assert(repop->waitfor_ack.count(fromosd)); // no, we may come thru here twice. repop->waitfor_ack.erase(fromosd); } - put_repop_gather(pg, repop); + put_rep_gather(pg, repop); // prepare dest socket //messenger->prepare_send_message(op->get_client()); @@ -858,12 +899,12 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op) // ack? if (repop) { // (logical) local ack. this may induce the actual update. - get_repop_gather(repop); + get_rep_gather(repop); { assert(repop->waitfor_ack.count(whoami)); repop->waitfor_ack.erase(whoami); } - put_repop_gather(pg, repop); + put_rep_gather(pg, repop); } else { // send ack to acker? @@ -892,3 +933,228 @@ void ReplicatedPG::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t las delete op; } + + + + + + + + + +// =========================================================== + +/** pull - request object from a peer + */ +void ReplicatedPG::pull(object_t oid) +{ + assert(missing.loc.count(oid)); + eversion_t v = missing.missing[oid]; + int osd = missing.loc[oid]; + + dout(7) << "pull " << oid + << " v " << v + << " from osd" << osd + << endl; + + // send op + tid_t tid = ++last_tid; + MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, tid, + oid, info.pgid, + osd->osdmap->get_epoch(), + OSD_OP_PULL); + op->set_version(v); + osd->messenger->send_message(op, osdmap->get_inst(osd)); + + // take note + assert(objects_pulling.count(oid) == 0); + num_pulling++; + objects_pulling[oid] = v; +} + + +/** push - send object to a peer + */ +void ReplicatedPG::push(object_t oid, int dest) +{ + // read data+attrs + bufferlist bl; + eversion_t v; + int vlen = sizeof(v); + map attrset; + + ObjectStore::Transaction t; + t.read(oid, 0, 0, &bl); + t.getattr(oid, "version", &v, &vlen); + t.getattrs(oid, attrset); + unsigned tr = osd->store->apply_transaction(t); + + assert(tr == 0); // !!! + + // ok + dout(7) << "push " << oid << " v " << v + << " size " << bl.length() + << " to osd" << dest + << endl; + + osd->logger->inc("r_push"); + osd->logger->inc("r_pushb", bl.length()); + + // send + MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, ++last_tid, + oid, info.pgid, osdmap->get_epoch(), + OSD_OP_PUSH); + op->set_offset(0); + op->set_length(bl.length()); + op->set_data(bl); // note: claims bl, set length above here! + op->set_version(v); + op->set_attrset(attrset); + + osd->messenger->send_message(op, osd->osdmap->get_inst(dest)); +} + + + +/** op_pull + * process request to pull an entire object. + * NOTE: called from opqueue. + */ +void ReplicatedPG::op_pull(MOSDOp *op) +{ + const object_t oid = op->get_oid(); + const eversion_t v = op->get_version(); + int from = op->get_source().num(); + + dout(7) << "op_pull " << oid << " v " << op->get_version() + << " from " << op->get_source() + << endl; + + // is a replica asking? are they missing it? + if (is_primary()) { + // primary + assert(peer_missing.count(from)); // we had better know this, from the peering process. + + if (!peer_missing[from].is_missing(oid)) { + dout(7) << "op_pull replica isn't actually missing it, we must have already pushed to them" << endl; + delete op; + return; + } + + // do we have it yet? + if (is_missing_object(oid)) { + wait_for_missing_object(oid, op); + return; + } + } else { + // non-primary + if (missing.is_missing(oid)) { + dout(7) << "op_pull not primary, and missing " << oid << ", ignoring" << endl; + delete op; + return; + } + } + + // push it back! + push(oid, op->get_source().num()); +} + + +/** op_push + * NOTE: called from opqueue. + */ +void ReplicatedPG::op_push(MOSDOp *op) +{ + object_t oid = op->get_oid(); + eversion_t v = op->get_version(); + + if (!is_missing_object(oid)) { + dout(7) << "op_push not missing " << oid << endl; + return; + } + + dout(7) << "op_push " + << oid + << " v " << v + << " size " << op->get_length() << " " << op->get_data().length() + << endl; + + assert(op->get_data().length() == op->get_length()); + + // write object and add it to the PG + ObjectStore::Transaction t; + t.remove(oid); // in case old version exists + t.write(oid, 0, op->get_length(), op->get_data()); + t.setattrs(oid, op->get_attrset()); + t.collection_add(info.pgid, oid); + + // close out pull op? + num_pulling--; + if (objects_pulling.count(oid)) + objects_pulling.erase(oid); + missing.got(oid, v); + + + // raise last_complete? + assert(log.complete_to != log.log.end()); + while (log.complete_to != log.log.end()) { + if (missing.missing.count(log.complete_to->oid)) break; + if (info.last_complete < log.complete_to->version) + info.last_complete = log.complete_to->version; + log.complete_to++; + } + dout(10) << "last_complete now " << info.last_complete << endl; + + + // apply to disk! + t.collection_setattr(info.pgid, "info", &info, sizeof(info)); + unsigned r = osd->store->apply_transaction(t); + assert(r == 0); + + + + // am i primary? are others missing this too? + if (is_primary()) { + for (unsigned i=1; itake_waiters(waiting_for_missing_object[oid]); + waiting_for_missing_object.erase(oid); + } + + delete op; +} + + + + +void ReplicatedPG::op_reply(MOSDOpReply *r) +{ + // must be replication. + tid_t rep_tid = r->get_rep_tid(); + + if (rep_gather.count(rep_tid)) { + // oh, good. + int fromosd = r->get_source().num(); + repop_ack(rep_gather[rep_tid], + r->get_result(), r->get_commit(), + fromosd, + r->get_pg_complete_thru()); + delete m; + } else { + // early ack. + waiting_for_repop[rep_tid].push_back(r); + } +} diff --git a/branches/sage/pgs/osd/ReplicatedPG.h b/branches/sage/pgs/osd/ReplicatedPG.h index dfb52a6cb556d..0d6019c5b08d5 100644 --- a/branches/sage/pgs/osd/ReplicatedPG.h +++ b/branches/sage/pgs/osd/ReplicatedPG.h @@ -17,13 +17,15 @@ #include "PG.h" +#include "messages/MOSDOp.h" + class ReplicatedPG : public PG { - +public: /* * gather state on the primary/head while replicating an osd op. */ - class Gather { + class RepGather { public: class MOSDOp *op; tid_t rep_tid; @@ -44,7 +46,7 @@ class ReplicatedPG : public PG { eversion_t pg_local_last_complete; map pg_complete_thru; - Gather(MOSDOp *o, tid_t rt, eversion_t nv, eversion_t lc) : + RepGather(MOSDOp *o, tid_t rt, eversion_t nv, eversion_t lc) : op(o), rep_tid(rt), applied(false), sent_ack(false), sent_commit(false), @@ -64,38 +66,65 @@ class ReplicatedPG : public PG { } }; +protected: // replica ops // [primary|tail] - map repop_gather; + map rep_gather; map > waiting_for_repop; - void get_repop_gather(Gather*); - void apply_repop(Gather *repop); - void put_repop_gather(Gather*); + void get_rep_gather(RepGather*); + void apply_repop(RepGather *repop); + void put_rep_gather(RepGather*); void issue_repop(MOSDOp *op, int osd); - Gather *new_repop_gather(MOSDOp *op); - void repop_ack(Gather *repop, + RepGather *new_rep_gather(MOSDOp *op); + void repop_ack(RepGather *repop, int result, bool commit, int fromosd, eversion_t pg_complete_thru=0); + + // push/pull + void push(object_t oid, int dest); + void pull(object_t oid); + + // modify + void assign_version(MOSDOp *op); + void op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru); + void op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete); + + friend class C_OSD_WriteCommit; public: + ReplicatedPG(OSD *o, pg_t p) : PG(o,p) {} + void op_stat(MOSDOp *op); int op_read(MOSDOp *op); void op_modify(MOSDOp *op); - + void op_rep_modify(MOSDOp *op); + void op_push(MOSDOp *op); + void op_pull(MOSDOp *op); + + void op_reply(MOSDOpReply *r); + bool same_for_read_since(epoch_t e); bool same_for_modify_since(epoch_t e); bool same_for_rep_modify_since(epoch_t e); bool is_missing_object(object_t oid); - void wait_for_missing_object(object_t oid, op); + void wait_for_missing_object(object_t oid, MOSDOp *op); + + void prepare_log_transaction(ObjectStore::Transaction& t, + MOSDOp *op, eversion_t& version, + objectrev_t crev, objectrev_t rev, + eversion_t trim_to); + void prepare_op_transaction(ObjectStore::Transaction& t, + MOSDOp *op, eversion_t& version, + objectrev_t crev, objectrev_t rev); }; -inline ostream& operator<<(ostream& out, PG::Gather& repop) +inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop) { - out << "repop(" << &repop << " rep_tid=" << repop.rep_tid + out << "repgather(" << &repop << " rep_tid=" << repop.rep_tid << " wfack=" << repop.waitfor_ack << " wfcommit=" << repop.waitfor_commit; out << " pct=" << repop.pg_complete_thru; -- 2.39.5