From 1cdfa1ffea4e319f2c5389bbbcd276c95bb56123 Mon Sep 17 00:00:00 2001 From: sageweil Date: Fri, 2 Mar 2007 20:08:34 +0000 Subject: [PATCH] copiles and runs. RAID4PG instantiation and pg creation commented out in OSD.cc until the virutal methods are implemented... git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1160 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/pgs/Makefile | 1 + branches/sage/pgs/config.cc | 2 +- branches/sage/pgs/mds/IdAllocator.cc | 2 +- branches/sage/pgs/mds/MDLog.cc | 3 +- branches/sage/pgs/mds/MDStore.cc | 4 +- branches/sage/pgs/osd/OSD.cc | 109 ++---- branches/sage/pgs/osd/OSD.h | 18 +- branches/sage/pgs/osd/PG.cc | 195 ---------- branches/sage/pgs/osd/PG.h | 34 +- branches/sage/pgs/osd/RAID4PG.cc | 37 ++ branches/sage/pgs/osd/RAID4PG.h | 63 ++++ branches/sage/pgs/osd/ReplicatedPG.cc | 499 ++++++++++++++++++++------ branches/sage/pgs/osd/ReplicatedPG.h | 42 ++- branches/sage/pgs/osd/osd_types.h | 33 +- 14 files changed, 612 insertions(+), 430 deletions(-) create mode 100644 branches/sage/pgs/osd/RAID4PG.cc create mode 100644 branches/sage/pgs/osd/RAID4PG.h diff --git a/branches/sage/pgs/Makefile b/branches/sage/pgs/Makefile index 7e071393e736b..565ef0aff551a 100644 --- a/branches/sage/pgs/Makefile +++ b/branches/sage/pgs/Makefile @@ -72,6 +72,7 @@ MDS_OBJS= \ OSD_OBJS= \ osd/PG.o\ osd/ReplicatedPG.o\ + osd/RAID4PG.o\ osd/Ager.o\ osd/FakeStore.o\ osd/OSD.o diff --git a/branches/sage/pgs/config.cc b/branches/sage/pgs/config.cc index 5da3e90218163..f197a549792b3 100644 --- a/branches/sage/pgs/config.cc +++ b/branches/sage/pgs/config.cc @@ -31,7 +31,7 @@ long buffer_total_alloc = 0; Mutex bufferlock; - +#include "osd/osd_types.h" FileLayout g_OSD_FileLayout( 1<<20, 1, 1<<20, pg_t::TYPE_REP, 2 ); // stripe over 1M objects, 2x replication //FileLayout g_OSD_FileLayout( 1<<17, 4, 1<<20 ); // 128k stripes over sets of 4 diff --git a/branches/sage/pgs/mds/IdAllocator.cc b/branches/sage/pgs/mds/IdAllocator.cc index 671bd70a77c27..2d17739776419 100644 --- a/branches/sage/pgs/mds/IdAllocator.cc +++ b/branches/sage/pgs/mds/IdAllocator.cc @@ -169,7 +169,7 @@ void IdAllocator::load(Context *onfinish) C_ID_Load *c = new C_ID_Load(this, onfinish); mds->filer->read(inode, - 0, inode.layout.stripe_size, + 0, inode.layout.stripe_unit, &c->bl, c); } diff --git a/branches/sage/pgs/mds/MDLog.cc b/branches/sage/pgs/mds/MDLog.cc index 182bd4d0333e1..f16cefff285c7 100644 --- a/branches/sage/pgs/mds/MDLog.cc +++ b/branches/sage/pgs/mds/MDLog.cc @@ -84,8 +84,7 @@ void MDLog::init_journaler() log_inode.layout = g_OSD_MDLogLayout; if (g_conf.mds_local_osd) { - log_inode.layout.object_layout = OBJECT_LAYOUT_STARTOSD; - log_inode.layout.osd = mds->get_nodeid() + 10000; // hack + log_inode.layout.preferred = mds->get_nodeid() + 10000; // hack } // log streamer diff --git a/branches/sage/pgs/mds/MDStore.cc b/branches/sage/pgs/mds/MDStore.cc index 13aa270a2ee6c..a8891b67322d7 100644 --- a/branches/sage/pgs/mds/MDStore.cc +++ b/branches/sage/pgs/mds/MDStore.cc @@ -199,9 +199,9 @@ void MDStore::fetch_dir_hash( CDir *dir, C_MDS_FetchHash *fin = new C_MDS_FetchHash( mds, dir->get_inode()->inode, c, hashcode ); // grab first stripe bit (which had better be more than 16 bytes!) - assert(dir->get_inode()->inode.layout.stripe_size >= 16); + assert(dir->get_inode()->inode.layout.stripe_unit >= 16); mds->filer->read(dir->get_inode()->inode, - get_hash_offset(hashcode), dir->get_inode()->inode.layout.stripe_size, + get_hash_offset(hashcode), dir->get_inode()->inode.layout.stripe_unit, &fin->bl, fin ); } diff --git a/branches/sage/pgs/osd/OSD.cc b/branches/sage/pgs/osd/OSD.cc index e64a55f84f01f..1d4392748f207 100644 --- a/branches/sage/pgs/osd/OSD.cc +++ b/branches/sage/pgs/osd/OSD.cc @@ -919,21 +919,7 @@ void OSD::handle_osd_map(MOSDMap *m) PG *pg = it->second; _lock_pg(pg->info.pgid); - { - list ls; // do async; repop_ack() may modify pg->repop_gather - for (map::iterator p = pg->repop_gather.begin(); - p != pg->repop_gather.end(); - p++) { - //dout(-1) << "checking repop tid " << p->first << endl; - if (p->second->waitfor_ack.count(osd) || - p->second->waitfor_commit.count(osd)) - ls.push_back(p->second); - } - for (list::iterator p = ls.begin(); - p != ls.end(); - p++) - repop_ack(pg, *p, -1, true, osd); - } + pg->note_failed_osd(osd); _unlock_pg(pg->info.pgid); } } @@ -1065,6 +1051,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) } // raided + /* for (int size = g_conf.osd_min_raid_width; size <= g_conf.osd_max_raid_width; size++) { @@ -1107,9 +1094,8 @@ void OSD::advance_map(ObjectStore::Transaction& t) dout(7) << "created " << *pg << endl; } - } - + */ dout(1) << "mkfs done, created " << pg_map.size() << " pgs" << endl; } else { @@ -1165,26 +1151,7 @@ void OSD::advance_map(ObjectStore::Transaction& t) // apply any repops in progress. if (oldacker == whoami) { - // apply repops - for (map::iterator p = pg->repop_gather.begin(); - p != pg->repop_gather.end(); - p++) { - if (!p->second->applied) - apply_repop(pg, p->second); - delete p->second->op; - delete p->second; - } - pg->repop_gather.clear(); - - // and repop waiters - for (map >::iterator p = pg->waiting_for_repop.begin(); - p != pg->waiting_for_repop.end(); - p++) - for (list::iterator pm = p->second.begin(); - pm != p->second.end(); - pm++) - delete *pm; - pg->waiting_for_repop.clear(); + pg->on_acker_change(); } if (role != oldrole) { @@ -1192,24 +1159,19 @@ void OSD::advance_map(ObjectStore::Transaction& t) if (oldrole == 0) { pg->state_clear(PG::STATE_CLEAN); - // take replay queue waiters - list ls; - for (map::iterator it = pg->replay_queue.begin(); - it != pg->replay_queue.end(); - it++) - ls.push_back(it->second); - pg->replay_queue.clear(); - take_waiters(ls); - - // take active waiters - take_waiters(pg->waiting_for_active); - - // take object waiters - for (hash_map >::iterator it = pg->waiting_for_missing_object.begin(); - it != pg->waiting_for_missing_object.end(); - it++) - take_waiters(it->second); - pg->waiting_for_missing_object.clear(); + // take replay queue waiters + list ls; + for (map::iterator it = pg->replay_queue.begin(); + it != pg->replay_queue.end(); + it++) + ls.push_back(it->second); + pg->replay_queue.clear(); + take_waiters(ls); + + // take active waiters + take_waiters(pg->waiting_for_active); + + pg->on_role_change(); } // new primary? @@ -1446,7 +1408,13 @@ PG *OSD::create_pg(pg_t pgid, ObjectStore::Transaction& t) assert(pg_map.count(pgid) == 0); assert(!pg_exists(pgid)); - PG *pg = new PG(this, pgid); + PG *pg; + if (pgid.is_rep()) + pg = new ReplicatedPG(this, pgid); + else if (pgid.is_raid4()) + assert(0); //pg = new RAID4PG(this, pgid); + else + assert(0); pg_map[pgid] = pg; t.create_collection(pgid); @@ -1476,10 +1444,10 @@ void OSD::load_pgs() pg_t pgid = *it; PG *pg = 0; - if (pgid->is_rep()) + if (pgid.is_rep()) new ReplicatedPG(this, pgid); - else if (pgid->is_raid()) - new RAID4PG(this, pgid); + else if (pgid.is_raid4()) + assert(0); //new RAID4PG(this, pgid); else assert(0); pg_map[pgid] = pg; @@ -1955,24 +1923,6 @@ void OSD::handle_pg_remove(MOSDPGRemove *m) -/*** RECOVERY ***/ - - -// op_rep_modify - - -// process a modification operation - - -/** op_rep_modify - * process a replicated modify. - * NOTE: called from opqueue. - */ -void OSD::op_rep_modify(MOSDOp *op, PG *pg) -{ -} - - // ========================================================= // OPS @@ -2165,9 +2115,10 @@ void OSD::handle_op(MOSDOp *op) _unlock_pg(pgid); } else { // queue for worker threads - if (read) + /*if (read) enqueue_op(0, op); // no locking needed for reads else + */ enqueue_op(pgid, op); } } @@ -2334,7 +2285,7 @@ void OSD::do_op(Message *m, PG *pg) case OSD_OP_UPLOCK: case OSD_OP_DNLOCK: if (op->get_source().is_osd()) { - pg->op_rep_modify(op, pg); + pg->op_rep_modify(op); } else { // locked by someone else? // for _any_ op type -- eg only the locker can unlock! diff --git a/branches/sage/pgs/osd/OSD.h b/branches/sage/pgs/osd/OSD.h index 6174997ba9ee4..1fa0752712a32 100644 --- a/branches/sage/pgs/osd/OSD.h +++ b/branches/sage/pgs/osd/OSD.h @@ -207,17 +207,27 @@ public: hash_map > waiting_for_pg; + Mutex tid_lock; + tid_t get_tid() { + tid_t t; + tid_lock.Lock(); + t = ++last_tid; + tid_lock.Unlock(); + return t; + } + - void handle_rep_op_ack(MOSDOpReply *m); + //void handle_rep_op_ack(MOSDOpReply *m); // recovery void do_notifies(map< int, list >& notify_list); void do_queries(map< int, map >& query_map); void repeer(PG *pg, map< int, map >& query_map); + /* void pull(PG *pg, object_t oid); void push(PG *pg, object_t oid, int dest); - + */ bool require_current_map(Message *m, epoch_t v); bool require_same_or_newer_map(Message *m, epoch_t e); @@ -226,12 +236,14 @@ public: void handle_pg_log(class MOSDPGLog *m); void handle_pg_remove(class MOSDPGRemove *m); + /* void op_pull(class MOSDOp *op, PG *pg); void op_push(class MOSDOp *op, PG *pg); - void op_rep_modify(class MOSDOp *op, PG *pg); // write, trucnate, delete void op_rep_modify_commit(class MOSDOp *op, int ackerosd, eversion_t last_complete); + */ + friend class C_OSD_RepModifyCommit; diff --git a/branches/sage/pgs/osd/PG.cc b/branches/sage/pgs/osd/PG.cc index 4c757ab4d0f33..1d27afee541ca 100644 --- a/branches/sage/pgs/osd/PG.cc +++ b/branches/sage/pgs/osd/PG.cc @@ -973,202 +973,7 @@ void PG::activate(ObjectStore::Transaction& t) osd->take_waiters(waiting_for_active); } -/** clean_up_local - * remove any objects that we're storing but shouldn't. - * as determined by log. - */ -void PG::clean_up_local(ObjectStore::Transaction& t) -{ - dout(10) << "clean_up_local" << endl; - - assert(info.last_update >= log.bottom); // otherwise we need some help! - - if (log.backlog) { - // be thorough. - list ls; - osd->store->collection_list(info.pgid, ls); - set s; - - for (list::iterator i = ls.begin(); - i != ls.end(); - i++) - s.insert(*i); - - set did; - for (list::reverse_iterator p = log.log.rbegin(); - p != log.log.rend(); - p++) { - if (did.count(p->oid)) continue; - did.insert(p->oid); - - if (p->is_delete()) { - if (s.count(p->oid)) { - dout(10) << " deleting " << p->oid - << " when " << p->version << endl; - t.remove(p->oid); - } - s.erase(p->oid); - } else { - // just leave old objects.. they're missing or whatever - s.erase(p->oid); - } - } - - for (set::iterator i = s.begin(); - i != s.end(); - i++) { - dout(10) << " deleting stray " << *i << endl; - t.remove(*i); - } - - } else { - // just scan the log. - set did; - for (list::reverse_iterator p = log.log.rbegin(); - p != log.log.rend(); - p++) { - if (did.count(p->oid)) continue; - did.insert(p->oid); - - if (p->is_delete()) { - dout(10) << " deleting " << p->oid - << " when " << p->version << endl; - t.remove(p->oid); - } else { - // keep old(+missing) objects, just for kicks. - } - } - } -} - - - -void PG::cancel_recovery() -{ - // forget about where missing items are, or anything we're pulling - missing.loc.clear(); - osd->num_pulling -= objects_pulling.size(); - objects_pulling.clear(); -} - -/** - * do one recovery op. - * return true if done, false if nothing left to do. - */ -bool PG::do_recovery() -{ - dout(-10) << "do_recovery pulling " << objects_pulling.size() << " in pg, " - << osd->num_pulling << "/" << g_conf.osd_max_pull << " total" - << endl; - dout(10) << "do_recovery " << missing << endl; - - // can we slow down on this PG? - if (osd->num_pulling >= g_conf.osd_max_pull && !objects_pulling.empty()) { - dout(-10) << "do_recovery already pulling max, waiting" << endl; - return true; - } - - // look at log! - Log::Entry *latest = 0; - - while (log.requested_to != log.log.end()) { - assert(log.objects.count(log.requested_to->oid)); - latest = log.objects[log.requested_to->oid]; - assert(latest); - dout(10) << "do_recovery " - << *log.requested_to - << (objects_pulling.count(latest->oid) ? " (pulling)":"") - << endl; - - if (latest->is_update() && - !objects_pulling.count(latest->oid) && - missing.is_missing(latest->oid)) { - osd->pull(this, latest->oid); - return true; - } - - log.requested_to++; - } - - if (!objects_pulling.empty()) { - dout(7) << "do_recovery requested everything, still waiting" << endl; - return false; - } - - // done? - assert(missing.num_missing() == 0); - assert(info.last_complete == info.last_update); - - if (is_primary()) { - // i am primary - dout(7) << "do_recovery complete, cleaning strays" << endl; - clean_set.insert(osd->whoami); - if (is_all_clean()) { - state_set(PG::STATE_CLEAN); - clean_replicas(); - } - } else { - // tell primary - dout(7) << "do_recovery complete, telling primary" << endl; - list ls; - ls.push_back(info); - osd->messenger->send_message(new MOSDPGNotify(osd->osdmap->get_epoch(), - ls), - osd->osdmap->get_inst(get_primary())); - } - - return false; -} - -void PG::do_peer_recovery() -{ - dout(10) << "do_peer_recovery" << endl; - - for (unsigned i=0; isecond; - eversion_t v = peer_missing[peer].rmissing.begin()->first; - - osd->push(this, oid, peer); - - // do other peers need it too? - for (i++; ipush(this, oid, peer); - } - - return; - } - - // nothing to do! -} - - - -void PG::clean_replicas() -{ - dout(10) << "clean_replicas. strays are " << stray_set << endl; - - for (set::iterator p = stray_set.begin(); - p != stray_set.end(); - p++) { - dout(10) << "sending PGRemove to osd" << *p << endl; - set ls; - ls.insert(info.pgid); - MOSDPGRemove *m = new MOSDPGRemove(osd->osdmap->get_epoch(), ls); - osd->messenger->send_message(m, osd->osdmap->get_inst(*p)); - } - - stray_set.clear(); -} diff --git a/branches/sage/pgs/osd/PG.h b/branches/sage/pgs/osd/PG.h index 331b664e030b5..32e511fdb73b7 100644 --- a/branches/sage/pgs/osd/PG.h +++ b/branches/sage/pgs/osd/PG.h @@ -471,16 +471,19 @@ public: void activate(ObjectStore::Transaction& t); - void cancel_recovery(); - bool do_recovery(); - void do_peer_recovery(); + virtual void clean_up_local(ObjectStore::Transaction& t) = 0; - void clean_replicas(); + virtual void cancel_recovery() = 0; + virtual bool do_recovery() = 0; + virtual void clean_replicas() = 0; off_t get_log_write_pos() { return 0; } + virtual void op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete) = 0; + friend class C_OSD_RepModify_Commit; + public: PG(OSD *o, pg_t p) : osd(o), @@ -492,7 +495,7 @@ public: peers_complete_thru(0), have_master_log(true) { } - virtual ~PG() {} + virtual ~PG() { } pg_t get_pgid() const { return info.pgid; } int get_nrep() const { return acting.size(); } @@ -536,10 +539,6 @@ public: return objects_pulling.size(); } - - // pg on-disk content - void clean_up_local(ObjectStore::Transaction& t); - // pg on-disk state void write_log(ObjectStore::Transaction& t); void append_log(ObjectStore::Transaction& t, @@ -559,21 +558,28 @@ public: + + // abstract bits virtual void op_stat(MOSDOp *op) = 0; virtual int op_read(MOSDOp *op) = 0; virtual void op_modify(MOSDOp *op) = 0; + virtual void op_rep_modify(MOSDOp *op) = 0; virtual void op_push(MOSDOp *op) = 0; virtual void op_pull(MOSDOp *op) = 0; virtual void op_reply(MOSDOpReply *op) = 0; - virtual bool same_for_read_since(epoch_t e); - virtual bool same_for_modify_since(epoch_t e); - virtual bool same_for_rep_modify_since(epoch_t e); + virtual bool same_for_read_since(epoch_t e) = 0; + virtual bool same_for_modify_since(epoch_t e) = 0; + virtual bool same_for_rep_modify_since(epoch_t e) = 0; + + virtual bool is_missing_object(object_t oid) = 0; + virtual void wait_for_missing_object(object_t oid, MOSDOp *op) = 0; - virtual bool is_missing_object(object_t oid); - virtual void wait_for_missing_object(object_t oid, MOSDOp *op); + virtual void note_failed_osd(int osd) = 0; + virtual void on_acker_change() = 0; + virtual void on_role_change() = 0; }; diff --git a/branches/sage/pgs/osd/RAID4PG.cc b/branches/sage/pgs/osd/RAID4PG.cc new file mode 100644 index 0000000000000..d1ad6c93fa8f9 --- /dev/null +++ b/branches/sage/pgs/osd/RAID4PG.cc @@ -0,0 +1,37 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include "RAID4PG.h" +#include "OSD.h" + +#include "common/Logger.h" + +#include "messages/MOSDOp.h" +#include "messages/MOSDOpReply.h" + +#include "messages/MOSDPGNotify.h" +#include "messages/MOSDPGRemove.h" + +#include "config.h" + +#undef dout +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << g_clock.now() << " osd" << osd->get_nodeid() << " " << (osd->osdmap ? osd->osdmap->get_epoch():0) << " " << *this << " " + +#include +#include + + + + + + diff --git a/branches/sage/pgs/osd/RAID4PG.h b/branches/sage/pgs/osd/RAID4PG.h new file mode 100644 index 0000000000000..2a6f3a8a14889 --- /dev/null +++ b/branches/sage/pgs/osd/RAID4PG.h @@ -0,0 +1,63 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __RAID4PG_H +#define __RAID4PG_H + + +#include "PG.h" + +#include "messages/MOSDOp.h" + + +class RAID4PG : public PG { +public: + +protected: + + void prepare_log_transaction(ObjectStore::Transaction& t, + MOSDOp *op, eversion_t& version, + objectrev_t crev, objectrev_t rev, + eversion_t trim_to); + void prepare_op_transaction(ObjectStore::Transaction& t, + MOSDOp *op, eversion_t& version, + objectrev_t crev, objectrev_t rev); + +public: + RAID4PG(OSD *o, pg_t p) : PG(o,p) { } + + void op_stat(MOSDOp *op); + int op_read(MOSDOp *op); + void op_modify(MOSDOp *op); + void op_rep_modify(MOSDOp *op); + void op_push(MOSDOp *op); + void op_pull(MOSDOp *op); + + void op_reply(MOSDOpReply *r); + + bool same_for_read_since(epoch_t e); + bool same_for_modify_since(epoch_t e); + bool same_for_rep_modify_since(epoch_t e); + + bool is_missing_object(object_t oid); + void wait_for_missing_object(object_t oid, MOSDOp *op); + + void note_failed_osd(int osd); + + void on_acker_change(); + void on_role_change(); + +}; + + +#endif diff --git a/branches/sage/pgs/osd/ReplicatedPG.cc b/branches/sage/pgs/osd/ReplicatedPG.cc index bb5629cd34100..967b80f7353b3 100644 --- a/branches/sage/pgs/osd/ReplicatedPG.cc +++ b/branches/sage/pgs/osd/ReplicatedPG.cc @@ -20,6 +20,9 @@ #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" +#include "messages/MOSDPGNotify.h" +#include "messages/MOSDPGRemove.h" + #include "config.h" #undef dout @@ -192,9 +195,9 @@ void ReplicatedPG::prepare_log_transaction(ObjectStore::Transaction& t, } // actual op - int opcode = PG::Log::Entry::MODIFY; - if (op->get_op() == OSD_OP_DELETE) opcode = PG::Log::Entry::DELETE; - PG::Log::Entry logentry(opcode, oid, version, op->get_reqid()); + int opcode = Log::Entry::MODIFY; + if (op->get_op() == OSD_OP_DELETE) opcode = Log::Entry::DELETE; + Log::Entry logentry(opcode, oid, version, op->get_reqid()); dout(10) << "prepare_log_transaction " << op->get_op() << " " << logentry @@ -488,6 +491,7 @@ void ReplicatedPG::issue_repop(MOSDOp *op, int dest) ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op) { dout(10) << "new_rep_gather rep_tid " << op->get_rep_tid() << " on " << *op << endl; + int whoami = osd->get_nodeid(); RepGather *repop = new RepGather(op, op->get_rep_tid(), op->get_version(), @@ -509,7 +513,7 @@ ReplicatedPG::RepGather *ReplicatedPG::new_rep_gather(MOSDOp *op) repop->waitfor_commit.insert(whoami); // also, the previous guy will ack to me - int myrank = osdmap->calc_pg_rank(whoami, acting); + int myrank = osd->osdmap->calc_pg_rank(whoami, acting); if (myrank > 0) { int osd = acting[ myrank-1 ]; repop->osds.insert(osd); @@ -610,8 +614,10 @@ void ReplicatedPG::op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru) -void ReplicatedPG::assign_version(MOSDOp *op) +objectrev_t ReplicatedPG::assign_version(MOSDOp *op) { + object_t oid = op->get_oid(); + // check crev objectrev_t crev = 0; osd->store->getattr(oid, "crev", (char*)&crev, sizeof(crev)); @@ -648,12 +654,59 @@ void ReplicatedPG::assign_version(MOSDOp *op) // set version in op, for benefit of client and our eventual reply op->set_version(nv); + + return crev; } +// commit (to disk) callback +class C_OSD_RepModifyCommit : public Context { +public: + OSD *osd; + MOSDOp *op; + int destosd; + + eversion_t pg_last_complete; + + Mutex lock; + Cond cond; + bool acked; + bool waiting; + + C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo, int dosd, eversion_t lc) : + osd(o), op(oo), destosd(dosd), pg_last_complete(lc), + acked(false), waiting(false) { } + void finish(int r) { + lock.Lock(); + assert(!waiting); + while (!acked) { + waiting = true; + cond.Wait(lock); + } + assert(acked); + lock.Unlock(); + + PG *pg = osd->lock_pg(op->get_pg()); + pg->op_rep_modify_commit(op, destosd, pg_last_complete); + osd->unlock_pg(op->get_pg()); + } + void ack() { + lock.Lock(); + assert(!acked); + acked = true; + if (waiting) cond.Signal(); + + // discard my reference to buffer + op->get_data().clear(); + + lock.Unlock(); + } +}; + void ReplicatedPG::op_modify(MOSDOp *op) { + int whoami = osd->get_nodeid(); object_t oid = op->get_oid(); const char *opname = MOSDOp::get_opname(op->get_op()); @@ -666,62 +719,63 @@ void ReplicatedPG::op_modify(MOSDOp *op) } // assign the op a version - assign_version(op); + objectrev_t crev = assign_version(op); + eversion_t nv = op->get_version(); // are any peers missing this? - for (unsigned i=1; iacting.size(); i++) { - int peer = pg->acting[i]; - if (pg->peer_missing.count(peer) && - pg->peer_missing[peer].is_missing(oid)) { + for (unsigned i=1; ipeer_missing[peer].got(oid); - push(pg, oid, peer); + peer_missing[peer].got(oid); + push(oid, peer); } } dout(10) << "op_modify " << opname << " " << oid << " v " << nv - << " crev " << crev - << " rev " << op->get_rev() + //<< " crev " << crev + << " rev " << op->get_rev() << " " << op->get_offset() << "~" << op->get_length() << endl; // issue replica writes RepGather *repop = 0; - bool alone = (pg->acting.size() == 1); - tid_t rep_tid = ++last_tid; + bool alone = (acting.size() == 1); + tid_t rep_tid = osd->get_tid(); op->set_rep_tid(rep_tid); if (g_conf.osd_rep == OSD_REP_CHAIN && !alone) { // chain rep. send to #2 only. - int next = pg->acting[1]; - if (pg->acting.size() > 2) - next = pg->acting[2]; - issue_repop(pg, op, next); + int next = acting[1]; + if (acting.size() > 2) + next = acting[2]; + issue_repop(op, next); } else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) { // splay rep. send to rest. - for (unsigned i=1; iacting.size(); ++i) - //for (unsigned i=pg->acting.size()-1; i>=1; --i) - issue_repop(pg, op, pg->acting[i]); + for (unsigned i=1; i=1; --i) + issue_repop(op, acting[i]); } else { // primary rep, or alone. - repop = new_rep_gather(pg, op); + repop = new_rep_gather(op); // send to rest. if (!alone) - for (unsigned i=1; iacting.size(); i++) - issue_repop(pg, op, pg->acting[i]); + for (unsigned i=1; iget_op() != OSD_OP_WRNOOP) { // log and update later. - prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), pg, pg->peers_complete_thru); - prepare_op_transaction(repop->t, op, nv, crev, op->get_rev(), pg); + prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), peers_complete_thru); + prepare_op_transaction(repop->t, op, nv, crev, op->get_rev()); } // (logical) local ack. @@ -731,17 +785,17 @@ void ReplicatedPG::op_modify(MOSDOp *op) assert(repop->waitfor_ack.count(whoami)); repop->waitfor_ack.erase(whoami); } - put_rep_gather(pg, repop); + put_rep_gather(repop); } else { // chain or splay. apply. ObjectStore::Transaction t; - prepare_log_transaction(t, op, nv, crev, op->get_rev(), pg, pg->peers_complete_thru); - prepare_op_transaction(t, op, nv, crev, op->get_rev(), pg); + prepare_log_transaction(t, op, nv, crev, op->get_rev(), peers_complete_thru); + prepare_op_transaction(t, op, nv, crev, op->get_rev()); - C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, pg->get_acker(), - pg->info.last_complete); - unsigned r = store->apply_transaction(t, oncommit); + C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(osd, op, get_acker(), + info.last_complete); + unsigned r = osd->store->apply_transaction(t, oncommit); if (r != 0 && // no errors r != 2) { // or error on collection_add cerr << "error applying transaction: r = " << r << endl; @@ -759,47 +813,6 @@ void ReplicatedPG::op_modify(MOSDOp *op) // replicated -// commit (to disk) callback -class C_OSD_RepModifyCommit : public Context { -public: - OSD *osd; - MOSDOp *op; - int destosd; - - eversion_t pg_last_complete; - - Mutex lock; - Cond cond; - bool acked; - bool waiting; - - C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo, int dosd, eversion_t lc) : - osd(o), op(oo), destosd(dosd), pg_last_complete(lc), - acked(false), waiting(false) { } - void finish(int r) { - lock.Lock(); - assert(!waiting); - while (!acked) { - waiting = true; - cond.Wait(lock); - } - assert(acked); - lock.Unlock(); - osd->op_rep_modify_commit(op, destosd, pg_last_complete); - } - void ack() { - lock.Lock(); - assert(!acked); - acked = true; - if (waiting) cond.Signal(); - - // discard my reference to buffer - op->get_data().clear(); - - lock.Unlock(); - } -}; - @@ -812,34 +825,33 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op) // check crev objectrev_t crev = 0; - store->getattr(oid, "crev", (char*)&crev, sizeof(crev)); + osd->store->getattr(oid, "crev", (char*)&crev, sizeof(crev)); dout(10) << "op_rep_modify " << opname << " " << oid << " v " << nv << " " << op->get_offset() << "~" << op->get_length() - << " in " << *pg << endl; // we better not be missing this. - assert(!pg->missing.is_missing(oid)); + assert(!missing.is_missing(oid)); // prepare our transaction ObjectStore::Transaction t; // am i acker? RepGather *repop = 0; - int ackerosd = pg->acting[0]; + int ackerosd = acting[0]; if ((g_conf.osd_rep == OSD_REP_CHAIN || g_conf.osd_rep == OSD_REP_SPLAY)) { - ackerosd = pg->get_acker(); + ackerosd = get_acker(); - if (pg->is_acker()) { + if (is_acker()) { // i am tail acker. - if (pg->rep_gather.count(op->get_rep_tid())) { - repop = pg->rep_gather[ op->get_rep_tid() ]; + if (rep_gather.count(op->get_rep_tid())) { + repop = rep_gather[ op->get_rep_tid() ]; } else { - repop = new_rep_gather(pg, op); + repop = new_rep_gather(op); } // infer ack from source @@ -849,46 +861,46 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op) //assert(repop->waitfor_ack.count(fromosd)); // no, we may come thru here twice. repop->waitfor_ack.erase(fromosd); } - put_rep_gather(pg, repop); + put_rep_gather(repop); // prepare dest socket //messenger->prepare_send_message(op->get_client()); } // chain? forward? - if (g_conf.osd_rep == OSD_REP_CHAIN && !pg->is_acker()) { + if (g_conf.osd_rep == OSD_REP_CHAIN && !is_acker()) { // chain rep, not at the tail yet. - int myrank = osdmap->calc_pg_rank(whoami, pg->acting); + int myrank = osd->osdmap->calc_pg_rank(osd->get_nodeid(), acting); int next = myrank+1; - if (next == (int)pg->acting.size()) + if (next == (int)acting.size()) next = 1; - issue_repop(pg, op, pg->acting[next]); + issue_repop(op, acting[next]); } } // do op? C_OSD_RepModifyCommit *oncommit = 0; - logger->inc("r_wr"); - logger->inc("r_wrb", op->get_length()); + osd->logger->inc("r_wr"); + osd->logger->inc("r_wrb", op->get_length()); if (repop) { // acker. we'll apply later. if (op->get_op() != OSD_OP_WRNOOP) { - prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), pg, op->get_pg_trim_to()); - prepare_op_transaction(repop->t, op, nv, crev, op->get_rev(), pg); + prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), op->get_pg_trim_to()); + prepare_op_transaction(repop->t, op, nv, crev, op->get_rev()); } } else { // middle|replica. if (op->get_op() != OSD_OP_WRNOOP) { - prepare_log_transaction(t, op, nv, crev, op->get_rev(), pg, op->get_pg_trim_to()); - prepare_op_transaction(t, op, nv, crev, op->get_rev(), pg); + prepare_log_transaction(t, op, nv, crev, op->get_rev(), op->get_pg_trim_to()); + prepare_op_transaction(t, op, nv, crev, op->get_rev()); } - oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, pg->info.last_complete); + oncommit = new C_OSD_RepModifyCommit(osd, op, ackerosd, info.last_complete); // apply log update. and possibly update itself. - unsigned tr = store->apply_transaction(t, oncommit); + unsigned tr = osd->store->apply_transaction(t, oncommit); if (tr != 0 && // no errors tr != 2) { // or error on collection_add cerr << "error applying transaction: r = " << tr << endl; @@ -901,16 +913,16 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op) // (logical) local ack. this may induce the actual update. get_rep_gather(repop); { - assert(repop->waitfor_ack.count(whoami)); - repop->waitfor_ack.erase(whoami); + assert(repop->waitfor_ack.count(osd->get_nodeid())); + repop->waitfor_ack.erase(osd->get_nodeid()); } - put_rep_gather(pg, repop); + put_rep_gather(repop); } else { // send ack to acker? if (g_conf.osd_rep != OSD_REP_CHAIN) { - MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap->get_epoch(), false); - messenger->send_message(ack, osdmap->get_inst(ackerosd)); + MOSDOpReply *ack = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), false); + osd->messenger->send_message(ack, osd->osdmap->get_inst(ackerosd)); } // ack myself. @@ -927,9 +939,9 @@ void ReplicatedPG::op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t las dout(10) << "rep_modify_commit on op " << *op << ", sending commit to osd" << ackerosd << endl; - MOSDOpReply *commit = new MOSDOpReply(op, 0, osdmap->get_epoch(), true); + MOSDOpReply *commit = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); commit->set_pg_complete_thru(last_complete); - messenger->send_message(commit, osdmap->get_inst(ackerosd)); + osd->messenger->send_message(commit, osd->osdmap->get_inst(ackerosd)); delete op; } @@ -950,21 +962,21 @@ void ReplicatedPG::pull(object_t oid) { assert(missing.loc.count(oid)); eversion_t v = missing.missing[oid]; - int osd = missing.loc[oid]; + int fromosd = missing.loc[oid]; dout(7) << "pull " << oid << " v " << v - << " from osd" << osd + << " from osd" << fromosd << endl; // send op - tid_t tid = ++last_tid; + tid_t tid = osd->get_tid(); MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, tid, oid, info.pgid, osd->osdmap->get_epoch(), OSD_OP_PULL); op->set_version(v); - osd->messenger->send_message(op, osdmap->get_inst(osd)); + osd->messenger->send_message(op, osd->osdmap->get_inst(fromosd)); // take note assert(objects_pulling.count(oid) == 0); @@ -1001,8 +1013,8 @@ void ReplicatedPG::push(object_t oid, int dest) osd->logger->inc("r_pushb", bl.length()); // send - MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, ++last_tid, - oid, info.pgid, osdmap->get_epoch(), + MOSDOp *op = new MOSDOp(osd->messenger->get_myinst(), 0, osd->get_tid(), + oid, info.pgid, osd->osdmap->get_epoch(), OSD_OP_PUSH); op->set_offset(0); op->set_length(bl.length()); @@ -1152,9 +1164,274 @@ void ReplicatedPG::op_reply(MOSDOpReply *r) r->get_result(), r->get_commit(), fromosd, r->get_pg_complete_thru()); - delete m; + delete r; } else { // early ack. waiting_for_repop[rep_tid].push_back(r); } } + + + +void ReplicatedPG::note_failed_osd(int o) +{ + dout(10) << "note_failed_osd " << o << endl; + // do async; repop_ack() may modify pg->repop_gather + list ls; + for (map::iterator p = rep_gather.begin(); + p != rep_gather.end(); + p++) { + //dout(-1) << "checking repop tid " << p->first << endl; + if (p->second->waitfor_ack.count(o) || + p->second->waitfor_commit.count(o)) + ls.push_back(p->second); + } + for (list::iterator p = ls.begin(); + p != ls.end(); + p++) + repop_ack(*p, -1, true, o); +} + + +void ReplicatedPG::on_acker_change() +{ + dout(10) << "on_acker_change" << endl; + + // apply repops + for (map::iterator p = rep_gather.begin(); + p != rep_gather.end(); + p++) { + if (!p->second->applied) + apply_repop(p->second); + delete p->second->op; + delete p->second; + } + rep_gather.clear(); + + // and repop waiters + for (map >::iterator p = waiting_for_repop.begin(); + p != waiting_for_repop.end(); + p++) + for (list::iterator pm = p->second.begin(); + pm != p->second.end(); + pm++) + delete *pm; + waiting_for_repop.clear(); +} + + +void ReplicatedPG::on_role_change() +{ + dout(10) << "on_role_change" << endl; + + // take object waiters + for (hash_map >::iterator it = waiting_for_missing_object.begin(); + it != waiting_for_missing_object.end(); + it++) + osd->take_waiters(it->second); + waiting_for_missing_object.clear(); +} + + + + + + + + + +/** clean_up_local + * remove any objects that we're storing but shouldn't. + * as determined by log. + */ +void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t) +{ + dout(10) << "clean_up_local" << endl; + + assert(info.last_update >= log.bottom); // otherwise we need some help! + + if (log.backlog) { + // be thorough. + list ls; + osd->store->collection_list(info.pgid, ls); + set s; + + for (list::iterator i = ls.begin(); + i != ls.end(); + i++) + s.insert(*i); + + set did; + for (list::reverse_iterator p = log.log.rbegin(); + p != log.log.rend(); + p++) { + if (did.count(p->oid)) continue; + did.insert(p->oid); + + if (p->is_delete()) { + if (s.count(p->oid)) { + dout(10) << " deleting " << p->oid + << " when " << p->version << endl; + t.remove(p->oid); + } + s.erase(p->oid); + } else { + // just leave old objects.. they're missing or whatever + s.erase(p->oid); + } + } + + for (set::iterator i = s.begin(); + i != s.end(); + i++) { + dout(10) << " deleting stray " << *i << endl; + t.remove(*i); + } + + } else { + // just scan the log. + set did; + for (list::reverse_iterator p = log.log.rbegin(); + p != log.log.rend(); + p++) { + if (did.count(p->oid)) continue; + did.insert(p->oid); + + if (p->is_delete()) { + dout(10) << " deleting " << p->oid + << " when " << p->version << endl; + t.remove(p->oid); + } else { + // keep old(+missing) objects, just for kicks. + } + } + } +} + + + +void ReplicatedPG::cancel_recovery() +{ + // forget about where missing items are, or anything we're pulling + missing.loc.clear(); + osd->num_pulling -= objects_pulling.size(); + objects_pulling.clear(); +} + +/** + * do one recovery op. + * return true if done, false if nothing left to do. + */ +bool ReplicatedPG::do_recovery() +{ + dout(-10) << "do_recovery pulling " << objects_pulling.size() << " in pg, " + << osd->num_pulling << "/" << g_conf.osd_max_pull << " total" + << endl; + dout(10) << "do_recovery " << missing << endl; + + // can we slow down on this PG? + if (osd->num_pulling >= g_conf.osd_max_pull && !objects_pulling.empty()) { + dout(-10) << "do_recovery already pulling max, waiting" << endl; + return true; + } + + // look at log! + Log::Entry *latest = 0; + + while (log.requested_to != log.log.end()) { + assert(log.objects.count(log.requested_to->oid)); + latest = log.objects[log.requested_to->oid]; + assert(latest); + + dout(10) << "do_recovery " + << *log.requested_to + << (objects_pulling.count(latest->oid) ? " (pulling)":"") + << endl; + + if (latest->is_update() && + !objects_pulling.count(latest->oid) && + missing.is_missing(latest->oid)) { + pull(latest->oid); + return true; + } + + log.requested_to++; + } + + if (!objects_pulling.empty()) { + dout(7) << "do_recovery requested everything, still waiting" << endl; + return false; + } + + // done? + assert(missing.num_missing() == 0); + assert(info.last_complete == info.last_update); + + if (is_primary()) { + // i am primary + dout(7) << "do_recovery complete, cleaning strays" << endl; + clean_set.insert(osd->whoami); + if (is_all_clean()) { + state_set(PG::STATE_CLEAN); + clean_replicas(); + } + } else { + // tell primary + dout(7) << "do_recovery complete, telling primary" << endl; + list ls; + ls.push_back(info); + osd->messenger->send_message(new MOSDPGNotify(osd->osdmap->get_epoch(), + ls), + osd->osdmap->get_inst(get_primary())); + } + + return false; +} + +void ReplicatedPG::do_peer_recovery() +{ + dout(10) << "do_peer_recovery" << endl; + + for (unsigned i=0; isecond; + eversion_t v = peer_missing[peer].rmissing.begin()->first; + + push(oid, peer); + + // do other peers need it too? + for (i++; i::iterator p = stray_set.begin(); + p != stray_set.end(); + p++) { + dout(10) << "sending PGRemove to osd" << *p << endl; + set ls; + ls.insert(info.pgid); + MOSDPGRemove *m = new MOSDPGRemove(osd->osdmap->get_epoch(), ls); + osd->messenger->send_message(m, osd->osdmap->get_inst(*p)); + } + + stray_set.clear(); +} + diff --git a/branches/sage/pgs/osd/ReplicatedPG.h b/branches/sage/pgs/osd/ReplicatedPG.h index 0d6019c5b08d5..555bf0580fcce 100644 --- a/branches/sage/pgs/osd/ReplicatedPG.h +++ b/branches/sage/pgs/osd/ReplicatedPG.h @@ -82,18 +82,46 @@ protected: int fromosd, eversion_t pg_complete_thru=0); // push/pull + int num_pulling; + void push(object_t oid, int dest); void pull(object_t oid); // modify - void assign_version(MOSDOp *op); + objectrev_t assign_version(MOSDOp *op); void op_modify_commit(tid_t rep_tid, eversion_t pg_complete_thru); void op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete); + void prepare_log_transaction(ObjectStore::Transaction& t, + MOSDOp *op, eversion_t& version, + objectrev_t crev, objectrev_t rev, + eversion_t trim_to); + void prepare_op_transaction(ObjectStore::Transaction& t, + MOSDOp *op, eversion_t& version, + objectrev_t crev, objectrev_t rev); + friend class C_OSD_WriteCommit; + + // pg on-disk content + void clean_up_local(ObjectStore::Transaction& t); + + void cancel_recovery(); + bool do_recovery(); + void do_peer_recovery(); + + void clean_replicas(); + + + + + public: - ReplicatedPG(OSD *o, pg_t p) : PG(o,p) {} + ReplicatedPG(OSD *o, pg_t p) : + PG(o,p), + num_pulling(0) + { } + ~ReplicatedPG() {} void op_stat(MOSDOp *op); int op_read(MOSDOp *op); @@ -111,13 +139,9 @@ public: bool is_missing_object(object_t oid); void wait_for_missing_object(object_t oid, MOSDOp *op); - void prepare_log_transaction(ObjectStore::Transaction& t, - MOSDOp *op, eversion_t& version, - objectrev_t crev, objectrev_t rev, - eversion_t trim_to); - void prepare_op_transaction(ObjectStore::Transaction& t, - MOSDOp *op, eversion_t& version, - objectrev_t crev, objectrev_t rev); + void note_failed_osd(int o); + void on_acker_change(); + void on_role_change(); }; diff --git a/branches/sage/pgs/osd/osd_types.h b/branches/sage/pgs/osd/osd_types.h index fe69a17e9c072..6ca4fe5839822 100644 --- a/branches/sage/pgs/osd/osd_types.h +++ b/branches/sage/pgs/osd/osd_types.h @@ -43,11 +43,17 @@ public: private: union { struct { - __uint8_t type:3; // 2 - __uint8_t size:5; // 5 - ps_t ps:16; // 16 - pruleset_t ruleset:8; // 8 - __uint32_t preferred:32; // 32 + /* + int preferred:32; // 32 + unsigned type:3; // 3 + unsigned size:5; // 5 + unsigned ps:16; // 16 + unsigned ruleset:8; // 8 + */ + __int32_t preferred; + __uint8_t type; + __uint8_t size; + __uint16_t ps; } fields; __uint64_t val; // 64 } u; @@ -55,12 +61,13 @@ private: public: pg_t() { u.val = 0; } pg_t(const pg_t& o) { u.val = o.u.val; } - pg_t(int type, int size, ps_t seed, int pref, pruleset_t r=0) { + pg_t(int type, int size, ps_t seed, int pref) {//, pruleset_t r=0) { u.fields.type = type; u.fields.size = size; u.fields.ps = seed; - u.fields.preferred = pref+1; // hack: avoid negative. - u.fields.ruleset = r; + u.fields.preferred = pref; // hack: avoid negative. + //u.fields.ruleset = r; + assert(sizeof(u.fields) == sizeof(u.val)); } pg_t(__uint64_t v) { u.val = v; } @@ -70,8 +77,8 @@ public: int size() { return u.fields.size; } ps_t ps() { return u.fields.ps; } - pruleset_t ruleset() { return u.fields.ruleset; } - __uint32_t preferred() { return u.fields.preferred-1; } // hack: avoid negative. + //pruleset_t ruleset() { return u.fields.ruleset; } + int preferred() { return u.fields.preferred; } // hack: avoid negative. /* pg_t operator=(__uint64_t v) { u.val = v; return *this; } @@ -96,10 +103,10 @@ inline ostream& operator<<(ostream& out, pg_t pg) else out << pg.size() << '?'; - if (pg.ruleset()) - out << (int)pg.ruleset() << 's'; + //if (pg.ruleset()) + //out << (int)pg.ruleset() << 's'; - if (pg.preferred()) + if (pg.preferred() >= 0) out << pg.preferred() << 'p'; out << hex << pg.ps() << dec; -- 2.39.5