From fa1664b1291c9393dc86ac68996bae94f69a225f Mon Sep 17 00:00:00 2001 From: sage Date: Sat, 15 Oct 2005 07:09:19 +0000 Subject: [PATCH] osd stuff working pretty well git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@503 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/TODO | 142 +---------------------------- ceph/config.cc | 2 +- ceph/crush/BinaryTree.h | 31 +++++-- ceph/crush/Bucket.h | 166 ++++++++++++++++++++++++++++----- ceph/crush/Hash.h | 3 + ceph/crush/crush.h | 103 +++++++++++++++++---- ceph/crush/test/sizes.cc | 17 +++- ceph/include/bufferlist.h | 61 +++++++------ ceph/mds/MDS.cc | 22 ++++- ceph/mds/OSDMonitor.cc | 16 +++- ceph/msg/FakeMessenger.cc | 3 +- ceph/msg/Message.h | 8 ++ ceph/msg/Messenger.cc | 2 + ceph/msg/Messenger.h | 10 +- ceph/msg/TCPMessenger.cc | 1 + ceph/osd/OSD.cc | 187 ++++++++++++++++++++++++-------------- ceph/osd/OSD.h | 13 +-- ceph/osd/OSDMap.cc | 25 ++--- ceph/osd/OSDMap.h | 129 ++++++++------------------ ceph/osd/PG.cc | 9 +- ceph/osd/PG.h | 34 ++++--- 21 files changed, 556 insertions(+), 428 deletions(-) diff --git a/ceph/TODO b/ceph/TODO index 7aa3f39092408..2639d344d4504 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -59,148 +59,16 @@ osd fun failed n n +a utonomic +r eliable +d istributed +o bject +s tore -mirror(2): -choose(1, room, root) -> [ room1 ] -choose(2, cabinet, _) -> [ cab3 cab2 ] -choose(1, disk, _) -> [ somediskincab3or2, someotherdiskincab3or2 ] - -mirror(4): -choose(2, room, root) -> [ room1 room2 ] -vert(_) -> [ room1; room2 ] -choose(2, cabinet, _) -> [ cab11 cab12; cab21 cab22 ] -vert(_) -> [ cab11; cab12; cab21; cab22 ] -choose(1, disk, _) -> [ disk; disk; disk; disk ] -horiz(_) -> [ disk disk disk disk ] - -two goals: - - separate replicas into failure domains... "floor" in hierarchy - - limit replica set to a single domain... "ceiling" in hierarchy - -additional tricks: - - explicitly choose disks from different storage pools - -mirror(3) -choose(2, disk, primary) -> [ d1 d2 ] // fast scsi (or use more complex chooser as above.) -choose(1, disk, tertiary) -> [ d1 d2 d3 ] // ide - -results are appended to _, unless _ is used as an input, in which case it is cleared. - - - - - -R(5, 0, [ s1 s2 s3 s4 ... ]) -> [ d1 d2 d3 d4 ] -= 5 disks - -R(3, 2, [ c1 c2 c3 .. ])' -> [ c8; c1; c3 ] -R(3, 0, _)' -> [ d81 d11 d33 ; d82 d14 d31 ; d84 d15 d38 ] -= 9 disks - -R(3, 2, cabs)' -> 3 cabs -R(3, 1, _)' -> 9 shelves -R(1, 0, _) -> 9 disks - - - -- weighting? -- how to choose from sets (of sets (...))? - - - - -R : choose K from set (K<=|set|, draw w/o relacement) -T : choose K from set (K<=|set|, prime thing) -P : choose K from set (prime thing, spillover) -R : choose set from sequence (via hypergeometric, weights) -T : choose set from tree (via hash, node weights+labels) - - - - - - - - -T(3, 3, _)' -> [ r1; r2; r3 ] -R(3, 1, _) -> [ s1 s2 s3; s4 s5 s6; s7 s8 s9 ] -R(1, 0, _) -> [ d1; d4; d7 ] - - - - - - -s1 = [ 1 2 3 4 5 6 ] -s2 = [ 7 8 9 10 11 12 13 ] -s3 = -s4 = - -c1 = [ s1 s2 ] -c2 = [ s3 s4 ] -c3 = [ s5 s6 ] -c4 = [ s7 s8 ] - -row1 = [ c1 c2 c3 c4 ] -row2 = [ ... ] - -room1 = [ row1 row2 ] - -choose(1, choose(4, rows)) - - -C(1, room1) -> row2 -C(2, _) -> [ r2 ; r3 ] -C(1, _) -> [ C(1, expand(r2)) ; C(1, expand(r3)) ] - - - -C(4, C(1, room1) ) - - - - -choose -C(4, expand(rows)) - - - - - -C(3, rows) - - - - - - - - - -IPDPS - -map/reduce refs -12 huston: diamond - - early discard .. distributed filters to reduce data... ultra-parallel grep -13 fischer: parallel prefix coputation.. N records in log N time? - -15 riedel:fast01 active disks for large scale data processing - - nearest neighbor .. sort of a map only, no distributed reduce - - association rule .. successive passives over map -16 condor - cluster mgmt -17 bulk synchronous - - - -o locality aware scheduling -o fine partitioning -> balanced scheduling -o redundant execution and re-execution - - REPLICATION diff --git a/ceph/config.cc b/ceph/config.cc index a337a3edd5624..92144965ce233 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -107,7 +107,7 @@ md_config_t g_conf = { osd_max_rep: 4, osd_fsync: true, osd_writesync: false, - osd_maxthreads: 10, + osd_maxthreads: 0, // 0 == no threading! diff --git a/ceph/crush/BinaryTree.h b/ceph/crush/BinaryTree.h index 600ef6374f583..f7d18a415fad1 100644 --- a/ceph/crush/BinaryTree.h +++ b/ceph/crush/BinaryTree.h @@ -8,6 +8,8 @@ //#include using namespace std; +#include "include/bufferlist.h" + namespace crush { class BinaryTree { @@ -22,6 +24,23 @@ namespace crush { public: BinaryTree() : root_node(0), alloc(0) {} + void _encode(bufferlist& bl) { + bl.append((char*)&root_node, sizeof(root_node)); + bl.append((char*)&alloc, sizeof(alloc)); + ::_encode(node_nested, bl); + ::_encode(node_weight, bl); + ::_encode(node_complete, bl); + } + void _decode(bufferlist& bl, int& off) { + bl.copy(off, sizeof(root_node), (char*)&root_node); + off += sizeof(root_node); + bl.copy(off, sizeof(alloc), (char*)&alloc); + off += sizeof(alloc); + ::_decode(node_nested, bl, off); + ::_decode(node_weight, bl, off); + ::_decode(node_complete, bl, off); + } + // accessors bool empty() const { return root_node == 0; } bool exists(int n) const { return n < alloc && node_nested[n]; } @@ -31,7 +50,7 @@ namespace crush { int root() const { return root_node; } - int realloc(int n) { + void realloc(int n) { /* while (alloc <= n) { node_nested.push_back(0); @@ -219,7 +238,7 @@ namespace crush { // print it out - void print_node(ostream& out, const BinaryTree& tree, int n, int i) { + inline void print_binary_tree_node(ostream& out, const BinaryTree& tree, int n, int i) { for (int t=i; t>0; t--) out << " "; if (tree.root() == n) out << "root "; @@ -234,16 +253,16 @@ namespace crush { out << endl; if (!tree.terminal(n)) { if (tree.exists(tree.left(n))) - print_node(out, tree, tree.left(n), i+2); + print_binary_tree_node(out, tree, tree.left(n), i+2); if (tree.exists(tree.right(n))) - print_node(out, tree, tree.right(n), i+2); + print_binary_tree_node(out, tree, tree.right(n), i+2); } } - ostream& operator<<(ostream& out, const BinaryTree& tree) { + inline ostream& operator<<(ostream& out, const BinaryTree& tree) { if (tree.empty()) return out << "tree is empty"; - print_node(out, tree, tree.root(), 0); + print_binary_tree_node(out, tree, tree.root(), 0); return out; } diff --git a/ceph/crush/Bucket.h b/ceph/crush/Bucket.h index e019d7a20dcbe..054ea4d832ba2 100644 --- a/ceph/crush/Bucket.h +++ b/ceph/crush/Bucket.h @@ -10,8 +10,16 @@ #include using namespace std; +#include "include/bufferlist.h" + namespace crush { + + const int CRUSH_BUCKET_UNIFORM = 1; + const int CRUSH_BUCKET_TREE = 2; + const int CRUSH_BUCKET_LIST = 3; + const int CRUSH_BUCKET_STRAW = 4; + /** abstract bucket **/ class Bucket { protected: @@ -26,6 +34,19 @@ namespace crush { id(0), parent(0), type(_type), weight(_weight) { } + + Bucket(bufferlist& bl, int& off) { + bl.copy(off, sizeof(id), (char*)&id); + off += sizeof(id); + bl.copy(off, sizeof(parent), (char*)&parent); + off += sizeof(parent); + bl.copy(off, sizeof(type), (char*)&type); + off += sizeof(type); + bl.copy(off, sizeof(weight), (char*)&weight); + off += sizeof(weight); + } + + virtual ~Bucket() { } virtual const char *get_bucket_type() const = 0; virtual bool is_uniform() const = 0; @@ -49,6 +70,8 @@ namespace crush { } virtual int choose_r(int x, int r, Hash& h) const = 0; + + virtual void _encode(bufferlist& bl) = 0; }; @@ -91,6 +114,9 @@ namespace crush { } public: + UniformBucket(int _type, int _item_type) : + Bucket(_type, 0), + item_type(_item_type) { } UniformBucket(int _type, int _item_type, float _item_weight, vector& _items) : Bucket(_type, _item_weight*_items.size()), @@ -100,12 +126,34 @@ namespace crush { make_primes(); } + UniformBucket(bufferlist& bl, int& off) : Bucket(bl, off) { + bl.copy(off, sizeof(item_type), (char*)&item_type); + off += sizeof(item_type); + bl.copy(off, sizeof(item_weight), (char*)&item_weight); + off += sizeof(item_weight); + ::_decode(items, bl, off); + make_primes(); + } + + void _encode(bufferlist& bl) { + char t = CRUSH_BUCKET_UNIFORM; + bl.append((char*)&t, sizeof(t)); + bl.append((char*)&id, sizeof(id)); + bl.append((char*)&parent, sizeof(parent)); + bl.append((char*)&type, sizeof(type)); + bl.append((char*)&weight, sizeof(weight)); + + bl.append((char*)&item_type, sizeof(item_type)); + bl.append((char*)&item_weight, sizeof(item_weight)); + + ::_encode(items, bl); + } + const char *get_bucket_type() const { return "uniform"; } bool is_uniform() const { return true; } int get_size() const { return items.size(); } - // items void get_items(vector& i) const { i = items; @@ -133,6 +181,7 @@ namespace crush { int s = (x + v + (r+1)*p) % get_size(); return items[s]; } + }; @@ -150,6 +199,25 @@ namespace crush { public: ListBucket(int _type) : Bucket(_type, 0) { } + ListBucket(bufferlist& bl, int& off) : Bucket(bl, off) { + ::_decode(items, bl, off); + ::_decode(item_weight, bl, off); + ::_decode(sum_weight, bl, off); + } + + void _encode(bufferlist& bl) { + char t = CRUSH_BUCKET_LIST; + bl.append((char*)&t, sizeof(t)); + bl.append((char*)&id, sizeof(id)); + bl.append((char*)&parent, sizeof(parent)); + bl.append((char*)&type, sizeof(type)); + bl.append((char*)&weight, sizeof(weight)); + + ::_encode(items, bl); + ::_encode(item_weight, bl); + ::_encode(sum_weight, bl); + } + const char *get_bucket_type() const { return "list"; } bool is_uniform() const { return false; } @@ -168,6 +236,8 @@ namespace crush { if (*i == item) return *w; i++; w++; } + assert(0); + return 0; } void add_item(int item, float w) { @@ -216,6 +286,8 @@ namespace crush { } assert(0); } + + }; @@ -235,7 +307,31 @@ namespace crush { map item_weight; public: - TreeBucket(int _type) : Bucket(_type, 0) { + TreeBucket(int _type) : Bucket(_type, 0) { } + + TreeBucket(bufferlist& bl, int& off) : Bucket(bl, off) { + tree._decode(bl, off); + + ::_decode(node_item, bl, off); + ::_decode(node_item_vec, bl, off); + ::_decode(item_node, bl, off); + ::_decode(item_weight, bl, off); + } + + void _encode(bufferlist& bl) { + char t = CRUSH_BUCKET_TREE; + bl.append((char*)&t, sizeof(t)); + bl.append((char*)&id, sizeof(id)); + bl.append((char*)&parent, sizeof(parent)); + bl.append((char*)&type, sizeof(type)); + bl.append((char*)&weight, sizeof(weight)); + + tree._encode(bl); + + ::_encode(node_item, bl); + ::_encode(node_item_vec, bl); + ::_encode(item_node, bl); + ::_encode(item_weight, bl); } const char *get_bucket_type() const { return "tree"; } @@ -260,11 +356,12 @@ namespace crush { item_weight[item] = w; weight += w; - int n = tree.add_node(w); + unsigned n = tree.add_node(w); node_item[n] = item; item_node[item] = n; - while (node_item_vec.size() <= n) node_item_vec.push_back(0); + while (node_item_vec.size() <= n) + node_item_vec.push_back(0); node_item_vec[n] = item; } @@ -316,6 +413,22 @@ namespace crush { public: StrawBucket(int _type) : Bucket(_type, 0) { } + StrawBucket(bufferlist& bl, int& off) : Bucket(bl, off) { + ::_decode(item_weight, bl, off); + calc_straws(); + } + + void _encode(bufferlist& bl) { + char t = CRUSH_BUCKET_TREE; + bl.append((char*)&t, sizeof(t)); + bl.append((char*)&id, sizeof(id)); + bl.append((char*)&parent, sizeof(parent)); + bl.append((char*)&type, sizeof(type)); + bl.append((char*)&weight, sizeof(weight)); + + ::_encode(item_weight, bl); + } + const char *get_bucket_type() const { return "straw"; } bool is_uniform() const { return false; } @@ -407,7 +520,6 @@ namespace crush { numleft -= 1.0 * (float)cur->second.size(); //cout << "numleft now " << numleft << endl; - float wnext = numleft * (next->first - cur->first); //cout << "wnext " << wnext << endl; @@ -444,28 +556,34 @@ namespace crush { ps++; } return high; - - /* - for (map::const_iterator p = item_weight.begin(); - p != item_weight.end(); - p++) { - if (p->second == 0) continue; // skip zero weights - - const int item = p->first; - const float rnd = (float)(h(x, item, r) % 1000000) / 1000000.0; - float s = rnd * (((map)item_straw)[item]); - - if (high_draw < 0 || - s > high_draw) { - high = p->first; - high_draw = s; - } - } - return high; - */ } }; + + + + + inline Bucket* decode_bucket(bufferlist& bl, int& off) { + char t; + bl.copy(off, sizeof(t), (char*)&t); + off += sizeof(t); + + switch (t) { + case CRUSH_BUCKET_UNIFORM: + return new UniformBucket(bl, off); + case CRUSH_BUCKET_LIST: + return new ListBucket(bl, off); + case CRUSH_BUCKET_TREE: + return new TreeBucket(bl, off); + case CRUSH_BUCKET_STRAW: + return new StrawBucket(bl, off); + default: + assert(0); + } + } + + + } diff --git a/ceph/crush/Hash.h b/ceph/crush/Hash.h index 72ebe27cfaee5..03146f9eef1c9 100644 --- a/ceph/crush/Hash.h +++ b/ceph/crush/Hash.h @@ -19,6 +19,9 @@ namespace crush { int seed; public: + int get_seed() { return seed; } + void set_seed(int s) { seed = s; } + Hash(int s) { unsigned int hash = 1315423911; int x = 231232; diff --git a/ceph/crush/crush.h b/ceph/crush/crush.h index e7fe87cab68bf..5975e5c16a28d 100644 --- a/ceph/crush/crush.h +++ b/ceph/crush/crush.h @@ -92,12 +92,79 @@ namespace crush { Hash h; public: - set failed; + set out; map overload; + map rules; //map collisions; //map bumps; + void _encode(bufferlist& bl) { + // buckets + int n = buckets.size(); + bl.append((char*)&n, sizeof(n)); + for (map::const_iterator it = buckets.begin(); + it != buckets.end(); + it++) { + bl.append((char*)&it->first, sizeof(it->first)); + it->second->_encode(bl); + } + bl.append((char*)&bucketno, sizeof(bucketno)); + + // hash + int s = h.get_seed(); + bl.append((char*)&s, sizeof(s)); + + ::_encode(out, bl); + ::_encode(overload, bl); + + // rules + n = rules.size(); + bl.append((char*)&n, sizeof(n)); + for(map::iterator it = rules.begin(); + it != rules.end(); + it++) { + bl.append((char*)&it->first, sizeof(it->first)); + it->second._encode(bl); + } + + } + + void _decode(bufferlist& bl, int& off) { + int n; + bl.copy(off, sizeof(n), (char*)&n); + off += sizeof(n); + for (int i=0; ichoose_r(x, r, h); + outv = in->choose_r(x, r, h); // did we get the type we want? int itemtype = 0; // 0 is terminal type @@ -233,8 +300,8 @@ namespace crush { if (in->is_uniform()) { itemtype = ((UniformBucket*)in)->get_item_type(); } else { - if (buckets.count(out)) { // another bucket - newin = buckets[out]; + if (buckets.count(outv)) { // another bucket + newin = buckets[outv]; itemtype = newin->get_type(); } } @@ -242,7 +309,7 @@ namespace crush { // collision? bool collide = false; for (int prep=0; prep overload[out]) + if (overload.count(outv)) { + float f = (float)(h(x, outv) % 1000) / 1000.0; + if (f > overload[outv]) bad = true; } @@ -276,14 +343,15 @@ namespace crush { } // output this value - outvec.push_back(out); + outvec.push_back(outv); } // for rep } void do_rule(Rule& rule, int x, vector& result) { - int numresult = 0; - + //int numresult = 0; + result.clear(); + // working vector vector w; // working variable @@ -334,8 +402,9 @@ namespace crush { case CRUSH_RULE_EMIT: { - for (int i=0; i object_queue; int max_object_size = 1024*1024*100; //kb +off_t no; + int get_object() //kb { if (object_queue.empty()) { int max = file_size_distn.sample(); + no++; int filesize = max/2 + (rand() % 100) * max/200 + 1; //cout << "file " << filesize << endl; while (filesize > max_object_size) { @@ -45,13 +48,15 @@ void getdist(vector& v, float& avg, float& var) void testpgs(int n, // numpg off_t pggb, float& avg, - float& var + float& var, + off_t& numo ) { off_t dist = (off_t)n * 1024LL*1024LL * (off_t)pggb; //kb vector pgs(n); off_t did = 0; + no = 0; while (did < dist) { off_t s = get_object(); pgs[rand()%n] += s; @@ -60,6 +65,7 @@ void testpgs(int n, // numpg while (!object_queue.empty()) pgs[rand()%n] += get_object(); + numo = no; //cout << did/n << endl; //for (int i=0; i -inline void _encode(set& s, bufferlist& bl) +// set +template +inline void _encode(set& s, bufferlist& bl) { int n = s.size(); bl.append((char*)&n, sizeof(n)); - for (set::iterator it = s.begin(); + for (typename set::iterator it = s.begin(); it != s.end(); it++) { - int v = *it; + T v = *it; bl.append((char*)&v, sizeof(v)); n--; } assert(n==0); } -inline void _decode(set& s, bufferlist& bl, int& off) +template +inline void _decode(set& s, bufferlist& bl, int& off) { s.clear(); int n; bl.copy(off, sizeof(n), (char*)&n); off += sizeof(n); for (int i=0; i& s, bufferlist& bl, int& off) assert(s.size() == (unsigned)n); } -// vector -inline void _encode(vector& s, bufferlist& bl) +// vector +template +inline void _encode(vector& s, bufferlist& bl) { int n = s.size(); bl.append((char*)&n, sizeof(n)); - for (vector::iterator it = s.begin(); + for (typename vector::iterator it = s.begin(); it != s.end(); it++) { - int v = *it; + T v = *it; bl.append((char*)&v, sizeof(v)); n--; } assert(n==0); } -inline void _decode(vector& s, bufferlist& bl, int& off) +template +inline void _decode(vector& s, bufferlist& bl, int& off) { s.clear(); int n; bl.copy(off, sizeof(n), (char*)&n); off += sizeof(n); - s = vector(n); + s = vector(n); for (int i=0; i& s, bufferlist& bl, int& off) assert(s.size() == (unsigned)n); } -// list<__uint64_t> -inline void _encode(list<__uint64_t>& s, bufferlist& bl) +// list +template +inline void _encode(list& s, bufferlist& bl) { int n = s.size(); bl.append((char*)&n, sizeof(n)); - for (list<__uint64_t>::iterator it = s.begin(); + for (typename list::iterator it = s.begin(); it != s.end(); it++) { - __uint64_t v = *it; + T v = *it; bl.append((char*)&v, sizeof(v)); n--; } assert(n==0); } -inline void _decode(list<__uint64_t>& s, bufferlist& bl, int& off) +template +inline void _decode(list& s, bufferlist& bl, int& off) { s.clear(); int n; bl.copy(off, sizeof(n), (char*)&n); off += sizeof(n); for (int i=0; i& s, bufferlist& bl, int& off) assert(s.size() == (unsigned)n); } -// map<__uint64_t, __uint64_t> -inline void _encode(map<__uint64_t,__uint64_t>& s, bufferlist& bl) +// map +template +inline void _encode(map& s, bufferlist& bl) { int n = s.size(); bl.append((char*)&n, sizeof(n)); - for (map<__uint64_t,__uint64_t>::iterator it = s.begin(); + for (typename map::iterator it = s.begin(); it != s.end(); it++) { - __uint64_t k = it->first; - __uint64_t v = it->second; + T k = it->first; + U v = it->second; bl.append((char*)&k, sizeof(k)); bl.append((char*)&v, sizeof(v)); n--; } assert(n==0); } -inline void _decode(map<__uint64_t,__uint64_t>& s, bufferlist& bl, int& off) +template +inline void _decode(map& s, bufferlist& bl, int& off) { s.clear(); int n; bl.copy(off, sizeof(n), (char*)&n); off += sizeof(n); for (int i=0; i osdmap = new OSDMap(); - OSDGroup osdg; + osdmap->set_pg_bits(g_conf.osd_pg_bits); + + Bucket *b = new UniformBucket(1, 0); + for (int i=0; iosds.insert(i); + b->add_item(i, 1); + } + int root = osdmap->crush.add_bucket(b); + + for (int i=2; i<5; i++) { + osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_TAKE, root)); + osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_CHOOSE, i, 0)); + osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT)); + } + + /*OSDGroup osdg; osdg.num_osds = g_conf.num_osd; for (int i=0; iadd_group(osdg); - osdmap->set_pg_bits(g_conf.osd_pg_bits); + */ + // filer = new Filer(messenger, osdmap); @@ -507,7 +523,7 @@ void MDS::my_dispatch(Message *m) finish_contexts(ls); } - // periodic crap (second resolution) + // periodic crap (1-second resolution) static utime_t last_log = g_clock.recent_now(); utime_t now = g_clock.recent_now(); if (last_log.sec() != now.sec()) { diff --git a/ceph/mds/OSDMonitor.cc b/ceph/mds/OSDMonitor.cc index 75bd4fd201587..c3af228bc149f 100644 --- a/ceph/mds/OSDMonitor.cc +++ b/ceph/mds/OSDMonitor.cc @@ -36,15 +36,21 @@ void OSDMonitor::fake_reorg() { // HACK osd map change - dout(1) << "changing OSD map, removing one OSD" << endl; - mds->osdmap->get_group(0).num_osds--; - mds->osdmap->init_rush(); + static int d = 0; + dout(1) << "changing OSD map, marking osd" << d << " out" << endl; + //mds->osdmap->get_group(0).num_osds--; + //mds->osdmap->init_rush(); + mds->osdmap->out_osds.insert(d++); mds->osdmap->inc_version(); // bcast mds->bcast_osd_map(); - + // do it again? + if (g_conf.num_osd - d > 4 && + g_conf.num_osd - d > g_conf.num_osd/2) + g_timer.add_event_after(g_conf.fake_osdmap_expand, + new C_OM_Faker(this)); } @@ -52,7 +58,7 @@ void OSDMonitor::init() { if (mds->get_nodeid() == 0 && - mds->osdmap->get_group(0).num_osds > 4 && + g_conf.num_osd > 4 && g_conf.fake_osdmap_expand) { dout(1) << "scheduling OSD map reorg at " << g_conf.fake_osdmap_expand << endl; g_timer.add_event_after(g_conf.fake_osdmap_expand, diff --git a/ceph/msg/FakeMessenger.cc b/ceph/msg/FakeMessenger.cc index 20c954e95a3ae..33d0c07fd02e6 100644 --- a/ceph/msg/FakeMessenger.cc +++ b/ceph/msg/FakeMessenger.cc @@ -255,6 +255,7 @@ int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromp { m->set_source(whoami, fromport); m->set_dest(dest, port); + m->set_lamport_stamp( get_lamport() ); lock.Lock(); @@ -277,7 +278,7 @@ int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromp assert(dm); dm->queue_incoming(m); - dout(10) << "sending " << m << " to " << dest << " m " << dm << " has " << dm->num_incoming() << " queued" << endl; + dout(5) << "--> sending " << m << " to " << MSG_ADDR_NICE(dest) << endl;//" m " << dm << " has " << dm->num_incoming() << " queued" << endl; } catch (...) { diff --git a/ceph/msg/Message.h b/ceph/msg/Message.h index 4b7dd52688acd..3b3f430dbbc7d 100644 --- a/ceph/msg/Message.h +++ b/ceph/msg/Message.h @@ -171,6 +171,7 @@ typedef struct { msg_addr_t source, dest; int source_port, dest_port; int nchunks; + __uint64_t lamport_stamp; } msg_envelope_t; #define MSG_ENVELOPE_LEN sizeof(msg_envelope_t) @@ -190,16 +191,23 @@ class Message { env.source_port = env.dest_port = -1; env.source = env.dest = -1; env.nchunks = 0; + env.lamport_stamp = 0; }; Message(int t) { env.source_port = env.dest_port = -1; env.source = env.dest = -1; env.nchunks = 0; env.type = t; + env.lamport_stamp = 0; } virtual ~Message() { } + void set_lamport_stamp(__uint64_t t) { + env.lamport_stamp = t; + } + __uint64_t get_lamport_stamp() { return env.lamport_stamp; } + // for rpc-type procedural messages (pcid = procedure call id) virtual long get_pcid() { return 0; } diff --git a/ceph/msg/Messenger.cc b/ceph/msg/Messenger.cc index ca56865f91d5b..a753830cb35d4 100644 --- a/ceph/msg/Messenger.cc +++ b/ceph/msg/Messenger.cc @@ -119,6 +119,8 @@ void Messenger::dispatch(Message *m) { assert(dispatcher); + bump_lamport(m->get_lamport_stamp()); + long pcid = m->get_pcid(); _lock.Lock(); diff --git a/ceph/msg/Messenger.h b/ceph/msg/Messenger.h index c1895693d2f14..3b35b7929e1a4 100644 --- a/ceph/msg/Messenger.h +++ b/ceph/msg/Messenger.h @@ -19,6 +19,7 @@ class Messenger { private: Dispatcher *dispatcher; msg_addr_t _myaddr; + __uint64_t lamport_clock; // procedure call fun long _last_pcid; @@ -27,12 +28,19 @@ class Messenger { map call_reply; public: - Messenger(msg_addr_t w) : dispatcher(0), _myaddr(w), _last_pcid(1) { } + Messenger(msg_addr_t w) : dispatcher(0), _myaddr(w), lamport_clock(0), _last_pcid(1) { } virtual ~Messenger() { } void set_myaddr(msg_addr_t m) { _myaddr = m; } msg_addr_t get_myaddr() { return _myaddr; } + __uint64_t get_lamport() { return lamport_clock++; } + __uint64_t peek_lamport() { return lamport_clock; } + void bump_lamport(__uint64_t other) { + if (other >= lamport_clock) + lamport_clock = other+1; + } + virtual int shutdown() = 0; // setup diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index d72dd141d083e..abc307f63cf57 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -1052,6 +1052,7 @@ int TCPMessenger::send_message(Message *m, msg_addr_t dest, int port, int frompo // set envelope m->set_source(get_myaddr(), fromport); m->set_dest(dest, port); + m->set_lamport_stamp( get_lamport() ); if (1) { // serialize all output diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index c697557ce8acc..bb4d2421f4d7f 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -332,10 +332,13 @@ void OSD::handle_op_reply(MOSDOpReply *m) MOSDOp *op = replica_writes[m->get_tid()]; dout(7) << "rep_write_reply ack tid " << m->get_tid() << " orig op " << op << endl; + int result = m->get_result(); + replica_writes.erase(m->get_tid()); replica_write_tids[op].erase(m->get_tid()); pg_t pgid = op->get_pg(); + PG *pg = open_pg(pgid); int osd = MSG_ADDR_NUM(m->get_source()); replica_pg_osd_tids[pgid][osd].erase(m->get_tid()); if (replica_pg_osd_tids[pgid][osd].empty()) replica_pg_osd_tids[pgid].erase(osd); @@ -344,19 +347,28 @@ void OSD::handle_op_reply(MOSDOpReply *m) if (replica_write_tids[op].empty()) { // reply? if (replica_write_local.count(op)) { - dout(7) << "last one, replying to write op" << endl; replica_write_local.erase(op); - - // written locally too, reply - MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap); - messenger->send_message(reply, op->get_asker()); - delete op; + + if (result >= 0) { + dout(7) << "last one, replying to write op" << endl; + + // written locally too, reply + MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap); + messenger->send_message(reply, op->get_asker()); + delete op; + } else { + dout(7) << "last one, but replica write failed, fw to new primary" << endl; + messenger->send_message(op, MSG_ADDR_OSD(pg->get_primary())); + } + replica_write_result.erase(op); } else { // not yet written locally. dout(9) << "not yet written locally, still waiting for that" << endl; + replica_write_result[op] = -1; } replica_write_tids.erase(op); } + } else { dout(7) << "not waiting for tid " << m->get_tid() << " rep_write reply, map must have changed, dropping." << endl; } @@ -650,6 +662,7 @@ void OSD::advance_map(list& ls) { dout(7) << "advance_map version " << osdmap->get_version() << endl; + // scan pg's for (list::iterator it = ls.begin(); it != ls.end(); it++) { @@ -681,23 +694,27 @@ void OSD::advance_map(list& ls) pg->state_clear(PG_STATE_CLEAN); pg->discard_recovery_plan(); + + // keep crown. } // new primary? if (role == 0) { pg->set_primary_since(osdmap->get_version()); + pg->state_clear(PG_STATE_PEERED); + } else { + // we need to announce + pg->state_set(PG_STATE_STRAY); } - // we need to re-peer - pg->state_clear(PG_STATE_PEERED); } else { // no role change. // did primary change? if (primary != pg->get_primary()) { dout(10) << " " << *pg << " acting primary change " << pg->get_primary() << " -> " << primary << ", !peered" << endl; - // re-peer - pg->state_clear(PG_STATE_PEERED); + // we need to announce + pg->state_set(PG_STATE_STRAY); } else { // primary is the same. if (role == 0) { @@ -731,12 +748,11 @@ void OSD::activate_map(list& ls) PG *pg = open_pg(pgid); assert(pg); - if (pg->is_peered()) continue; - if (pg->get_role() == 0) { // i am primary start_peers(pg, start_map); - } else { + } + else if (pg->is_stray()) { // i am residual|replica notify_list[pg->get_primary()][pgid] = pg->get_last_complete(); } @@ -802,7 +818,9 @@ void OSD::start_peers(PG *pg, map< int, map >& start_map) int who = it->first; int role = it->second; if (who == whoami) continue; // nevermind me - if (pg->get_peer(who)) { + + PGPeer *pgp = pg->get_peer(who); + if (pgp && pgp->is_active()) { dout(10) << " " << *pg << " already peered with osd" << who << " role " << role << endl; } else { dout(10) << " " << *pg << " need to peer with osd" << who << " role " << role << endl; @@ -811,11 +829,13 @@ void OSD::start_peers(PG *pg, map< int, map >& start_map) } } + /* wtf if (!did_something) { dout(10) << " " << *pg << " already has necessary peers, analyzing" << endl; pg->plan_recovery(store); do_recovery(pg); } + */ } @@ -999,17 +1019,18 @@ void OSD::handle_pg_peer(MOSDPGPeer *m) } } + // PEER // report back state and pg content ack->pg_state[pgid].state = pg->get_state(); - //ack->pg_state[pgid].deleted = pg->get_deleted_objects(); - + // list my objects pg->scan_local_objects(ack->pg_state[pgid].objects, store); - // i am peered. (FIXME?) + // i am now peered pg->state_set(PG_STATE_PEERED); - - dout(10) << " " << *pg << " has state " << pg->get_state() << ", " << ack->pg_state[pgid].objects.size() << " objects" << endl; + pg->state_clear(PG_STATE_STRAY); + + dout(10) << "peer_ack: " << *pg << " has state " << pg->get_state() << ", " << ack->pg_state[pgid].objects.size() << " objects" << endl; } // reply @@ -1019,6 +1040,7 @@ void OSD::handle_pg_peer(MOSDPGPeer *m) delete m; } + void OSD::handle_pg_peer_ack(MOSDPGPeerAck *m) { int from = MSG_ADDR_NUM(m->get_source()); @@ -1166,9 +1188,14 @@ void OSD::pg_pull(PG *pg, int maxops) object_t oid; version_t v; int peer; - if (!pg->pull_plan.get_next(oid, v, peer)) break; + if (!pg->pull_plan.get_next(oid, v, peer)) { + dout(7) << "pg_pull done " << *pg << endl; + break; + } + PGPeer *pgp = pg->get_proxy_peer(oid); if (pgp == 0) { + assert(0); // ?? dout(7) << " apparently already pulled " << hex << oid << dec << endl; continue; } @@ -1295,10 +1322,20 @@ void OSD::pg_push(PG *pg, int maxops) object_t oid; version_t v; int peer; - if (!pg->push_plan.get_next(oid, v, peer)) break; + if (!pg->push_plan.get_next(oid, v, peer)) { + dout(7) << "pg_push done " << *pg << endl; + break; + } PGPeer *p = pg->get_peer(peer); assert(p); + + if (p->peer_state.objects.count(oid) && + p->peer_state.objects[oid] >= v) { + dout(10) << "pg_push not pushing " << hex << oid << dec << " to osd" << peer << " cuz it's already there" << endl; + continue; + } + push_replica(oid, v, p); ops++; } @@ -1422,15 +1459,19 @@ void OSD::pg_clean(PG *pg, int maxops) object_t oid; version_t v; int peer; - if (!pg->clean_plan.get_next(oid, v, peer)) break; - - if (pg->objects.count(oid)) { - assert(0); // think about this: (recovery) -> delete -> create sequences.. where does the new version # start, etc. - continue; + if (!pg->clean_plan.get_next(oid, v, peer)) { + dout(7) << "pg_clean done " << *pg << endl; + break; } PGPeer *p = pg->get_peer(peer); assert(p); + + if (p->peer_state.objects.count(oid)) { + dout(10) << "pg_clean not removing " << hex << oid << dec << " from osd" << peer << " cuz it's not there" << endl; + continue; + } + remove_replica(oid, v, p); ops++; } @@ -1469,6 +1510,7 @@ void OSD::op_rep_remove(MOSDOp *op) assert(v == op->get_version()); // remove + store->collection_remove(op->get_pg(), op->get_oid()); int r = store->remove(op->get_oid()); assert(r == 0); @@ -1505,25 +1547,23 @@ void OSD::op_rep_remove_reply(MOSDOpReply *op) void OSD::op_rep_write(MOSDOp *op) -{ - // check existing object; write must be applied in order! +{ + // when we introduce unordered messaging.. FIXME object_t oid = op->get_oid(); - version_t v = 1; - if (store->exists(oid)) { + version_t v = 0; + if (store->exists(oid)) store->getattr(oid, "version", &v, sizeof(v)); - ++v; - } else { - // new - } - dout(12) << "rep_write to " << hex << oid << dec << " v " << op->get_version() << " (i have " << v << ")" << endl; + v++; assert(op->get_version() == v); + + dout(12) << "rep_write to " << hex << oid << dec << " v " << op->get_version() << " (i have " << v-1 << ")" << endl; // pre-ack //MOSDOpReply *ack1 = new MOSDOpReply(op, 0, osdmap); //messenger->send_message(ack1, op->get_asker()); // write - apply_write(op, false, v); + apply_write(op, false, op->get_version()); // ack MOSDOpReply *ack2 = new MOSDOpReply(op, 0, osdmap); @@ -1540,8 +1580,11 @@ void OSD::op_rep_write(MOSDOp *op) void OSD::handle_op(MOSDOp *op) { + pg_t pgid = op->get_pg(); + PG *pg = open_pg(pgid); + // what kind of op? - if (op->get_pg_role() == 0) { + if (!OSD_OP_IS_REP(op->get_op())) { // REGULAR OP (non-replication) // is our map version up to date? @@ -1553,7 +1596,6 @@ void OSD::handle_op(MOSDOp *op) } // am i the primary? - pg_t pgid = op->get_pg(); int acting_primary = osdmap->get_pg_acting_primary( pgid ); if (acting_primary != whoami) { @@ -1564,7 +1606,6 @@ void OSD::handle_op(MOSDOp *op) } // proxy? - PG *pg = open_pg(pgid); if (!pg) { dout(7) << "hit non-existent pg " << hex << op->get_pg() << dec << ", waiting" << endl; waiting_for_pg[pgid].push_back(op); @@ -1658,7 +1699,6 @@ void OSD::do_op(MOSDOp *op) } } else { // regular op - pg_t pgid = op->get_pg(); PG *pg = open_pg(pgid); @@ -1666,36 +1706,37 @@ void OSD::do_op(MOSDOp *op) if (!pg) { dout(7) << "op_write pg " << hex << pgid << dec << " dne (yet)" << endl; waiting_for_pg[pgid].push_back(op); - return; } - if (!pg->is_peered()) { + else if (!pg->is_peered()) { dout(7) << "op_write " << *pg << " not peered (yet)" << endl; waiting_for_pg_peered[pgid].push_back(op); - return; } - - // do op - switch (op->get_op()) { - case OSD_OP_READ: - op_read(op, pg); - break; - case OSD_OP_WRITE: - op_write(op, pg); - break; - case OSD_OP_DELETE: - op_delete(op, pg); - break; - case OSD_OP_TRUNCATE: - op_truncate(op, pg); - break; - case OSD_OP_STAT: - op_stat(op, pg); - break; - default: - assert(0); + else { + // do op + switch (op->get_op()) { + case OSD_OP_READ: + op_read(op, pg); + break; + case OSD_OP_WRITE: + op_write(op, pg); + break; + case OSD_OP_DELETE: + op_delete(op, pg); + break; + case OSD_OP_TRUNCATE: + op_truncate(op, pg); + break; + case OSD_OP_STAT: + op_stat(op, pg); + break; + default: + assert(0); + } } } + //dout(12) << "finish op " << op << endl; + // finish osd_lock.Lock(); assert(pending_ops > 0); @@ -1881,7 +1922,7 @@ bool OSD::object_clean(PG *pg, object_t oid, version_t& v, Message *op) } } if (v > 0) { - dout(10) << " pg not clean, checking if " << hex << oid << dec << " v " << v << " is specifically clean yet!" << endl; + dout(10) << " pg not clean, checking if " << hex << oid << dec << " v " << v << " is specifically clean yet! " << *pg << endl; // object (logically) exists if (!pg->existant_object_is_clean(oid, v)) { dout(7) << "object " << hex << oid << dec << " v " << v << " in " << *pg @@ -1917,6 +1958,7 @@ void OSD::op_write(MOSDOp *op, PG *pg) return; } v++; // we're good! + //v = messenger->peek_lamport(); dout(12) << "op_write " << hex << oid << dec << " v " << v << endl; @@ -1960,10 +2002,16 @@ void OSD::op_write(MOSDOp *op, PG *pg) replica_write_lock.Lock(); if (replica_write_tids.count(op) == 0) { // all replica writes completed. - dout(10) << "op_write wrote locally: rep writes already finished, replying" << endl; - MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap); - messenger->send_message(reply, op->get_asker()); - delete op; + if (replica_write_result[op] == 0) { + dout(10) << "op_write wrote locally: rep writes already finished, replying" << endl; + MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap); + messenger->send_message(reply, op->get_asker()); + delete op; + } else { + dout(10) << "op_write wrote locally, but rep writes failed, fw to new primary" << endl; + messenger->send_message(op, MSG_ADDR_OSD(pg->get_primary())); + } + replica_write_result.erase(op); } else { // note that it's written locally dout(10) << "op_write wrote locally: rep writes not yet finished, waiting" << endl; @@ -1988,8 +2036,8 @@ void OSD::op_delete(MOSDOp *op, PG *pg) unlock_object(oid); return; } - v++; // we're good! + store->collection_remove(pg->get_pgid(), op->get_oid()); int r = store->remove(oid); dout(12) << "delete on " << hex << op->get_oid() << dec << " r = " << r << endl; @@ -2015,6 +2063,7 @@ void OSD::op_truncate(MOSDOp *op, PG *pg) return; } v++; // we're good! + //v = messenger->peek_lamport(); int r = store->truncate(oid, op->get_offset()); dout(3) << "truncate on " << hex << op->get_oid() << dec << " at " << op->get_offset() << " r = " << r << endl; diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 3b0893b2118c3..c95a98b02bb22 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -93,23 +93,13 @@ class OSD : public Dispatcher { map<__uint64_t, MOSDOp*> replica_writes; map > replica_write_tids; set replica_write_local; + map replica_write_result; map > > replica_pg_osd_tids; // pg -> osd -> tid // // -- replication -- - // PS - /* - hash_map ps_map; - - void get_ps_list(list& ls); - bool ps_exists(ps_t ps); - PS *create_ps(ps_t ps); // create new PS - PS *open_ps(ps_t ps); // return existing PS, load state from store (if needed) - void close_ps(ps_t ps); // close in-memory state - void remove_ps(ps_t ps); // remove state from store - */ // PG hash_map pg_map; void get_pg_list(list& ls); @@ -129,6 +119,7 @@ class OSD : public Dispatcher { hash_map > waiting_for_clean_object; hash_map > waiting_for_pg; hash_map > waiting_for_pg_peered; + //hash_map waiting_for_pg_flush; // pg -> newprimary void advance_map(list& ls); diff --git a/ceph/osd/OSDMap.cc b/ceph/osd/OSDMap.cc index a228c861f0b41..3e8f5cd75f1a2 100644 --- a/ceph/osd/OSDMap.cc +++ b/ceph/osd/OSDMap.cc @@ -11,14 +11,11 @@ void OSDMap::encode(bufferlist& blist) blist.append((char*)&version, sizeof(version)); blist.append((char*)&pg_bits, sizeof(pg_bits)); - int ngroups = osd_groups.size(); - blist.append((char*)&ngroups, sizeof(ngroups)); - for (int i=0; i(ngroups); - for (int i=0; i #include @@ -39,6 +42,7 @@ using namespace std; /** OSDGroup * a group of identical disks added to the OSD cluster */ +/* class OSDGroup { public: int num_osds; // num disks in this group (aka num_disks_in_cluster[]) @@ -62,7 +66,7 @@ class OSDGroup { ::_decode(osds, bl, off); } }; - +*/ /** OSDExtent * for mapping (ino, offset, len) to a (list of) byte extents in objects on osds @@ -85,41 +89,18 @@ class OSDMap { __uint64_t version; // what version of the osd cluster descriptor is this int pg_bits; // placement group bits - // RUSH disk groups - vector osd_groups; // RUSH disk groups - + set osds; set down_osds; // list of down disks - set failed_osds; // list of failed disks - - Rush *rush; // rush implementation + set out_osds; // list of unused disks + Crush crush; Mutex osd_cluster_lock; - public: - void init_rush() { - - // SAB - osd_cluster_lock.Lock(); - - if (rush) delete rush; - rush = new Rush(); - - int ngroups = osd_groups.size(); - for (int i=0; iAddCluster(osd_groups[i].num_osds, - osd_groups[i].weight); - } - - // SAB - osd_cluster_lock.Unlock(); - } - + friend class OSDMonitor; + friend class MDS; public: - OSDMap() : version(0), pg_bits(5), rush(0) { } - ~OSDMap() { - if (rush) { delete rush; rush = 0; } - } + OSDMap() : version(0), pg_bits(5) { } __uint64_t get_version() { return version; } void inc_version() { version++; } @@ -128,30 +109,16 @@ class OSDMap { void set_pg_bits(int b) { pg_bits = b; } // cluster state - bool is_failed(int osd) { return failed_osds.count(osd) ? true:false; } + bool is_up(int osd) { return !is_down(osd); } + bool is_down(int osd) { return down_osds.count(osd); } + bool is_in(int osd) { return !is_in(osd); } + bool is_out(int osd) { return out_osds.count(osd); } int num_osds() { - int n = 0; - for (vector::iterator it = osd_groups.begin(); - it != osd_groups.end(); - it++) - n += it->num_osds; - return n; + return osds.size(); } void get_all_osds(set& ls) { - for (vector::iterator it = osd_groups.begin(); - it != osd_groups.end(); - it++) { - for (unsigned i=0; iosds.size(); i++) - ls.insert(it->osds[i]); - } - } - - int get_num_groups() { return osd_groups.size(); } - OSDGroup& get_group(int i) { return osd_groups[i]; } - void add_group(OSDGroup& g) { - osd_groups.push_back(g); - init_rush(); + ls = osds; } // serialize, unserialize @@ -192,50 +159,30 @@ class OSDMap { return pg >> PG_PS_BITS; } + /* map (repgroup) to a raw list of osds. - this is where we invoke RUSH. */ - int pg_to_raw_osds(pg_t pg, - int *osds) { // list of osd addr's - // get rush list - assert(rush); + this is where we invoke CRUSH. */ + int pg_to_osds(pg_t pg, + vector& osds) { // list of osd addr's int num_rep = pg_to_nrep(pg); - rush->GetServersByKey( pg, num_rep, osds ); - return num_rep; - } - - int pg_to_nonfailed_osds(pg_t pg, - vector& osds) { // list of osd addr's - // get rush list - assert(rush); - int raw[NUM_RUSH_REPLICAS]; - pg_to_raw_osds(pg, raw); - - int nrep = pg_to_nrep(pg); - osds = vector(nrep); - int o = 0; - for (int i=0; i& osds) { // list of osd addr's // get rush list - assert(rush); - int raw[NUM_RUSH_REPLICAS]; - pg_to_raw_osds(pg, raw); - - int nrep = pg_to_nrep(pg); - osds = vector(nrep); - int o = 0; - for (int i=0; i raw; + pg_to_osds(pg, raw); + + osds.clear(); + for (unsigned i=0; i group; - int nrep = pg_to_nonfailed_osds(pg, group); + int nrep = pg_to_osds(pg, group); assert(nrep > 0); // we fail! return group[0]; } @@ -262,7 +209,7 @@ class OSDMap { /* what replica # is a given osd? 0 primary, -1 for none. */ int get_pg_role(pg_t pg, int osd) { vector group; - int nrep = pg_to_nonfailed_osds(pg, group); + int nrep = pg_to_osds(pg, group); for (int i=0; i::iterator it = peers.begin(); it != peers.end(); it++) { - //if (!it->second->is_active()) continue; if (it->second->is_complete()) continue; if (it->second->peer_state.objects.count(o)) { return false; @@ -300,7 +304,8 @@ void PG::plan_push_cleanup() } } - if (push_plan.empty() && clean_plan.empty()) { + if (is_complete() && + push_plan.empty() && clean_plan.empty()) { dout(10) << " nothing to push|clean, marking clean" << endl; mark_clean(); } diff --git a/ceph/osd/PG.h b/ceph/osd/PG.h index 356d0abdadd42..21195e0fc235f 100644 --- a/ceph/osd/PG.h +++ b/ceph/osd/PG.h @@ -31,7 +31,7 @@ struct PGReplicaInfo { // by primary #define PG_PEER_STATE_ACTIVE 1 // peer has acked our request, sent back PG state. -#define PG_PEER_STATE_COMPLETE 2 // peer has everything replicated +#define PG_PEER_STATE_COMPLETE 2 // peer has everything replicated+clean class PGPeer { public: @@ -143,15 +143,20 @@ class PGQueue { * */ -// bits used on any -#define PG_STATE_COMPLETE 1 // i have full PG contents locally. -#define PG_STATE_PEERED 2 // i have contacted prior primary and all - // replica osds and/or fetched their - // content lists, and thus know what's up. - // or, i have check in w/ new primary (on replica) +// any +#define PG_STATE_COMPLETE 1 // i have full PG contents locally +#define PG_STATE_PEERED 2 // primary: peered with everybody + // replica: peered with auth + +// primary +//#define PG_STATE_CROWNED 4 // i have the PG crown +#define PG_STATE_CLEAN 8 // peers are fully replicated and clean of stray objects +//#define PG_STATE_FLUSHING 16 // i am old primary, but flushing rep writes before peering + +// replica +#define PG_STATE_STRAY 32 // i need to announce myself to new auth + -// on primary or old-primary only -#define PG_STATE_CLEAN 4 // i am fully replicated class PG { protected: @@ -267,11 +272,15 @@ class PG { void state_set(int m) { state |= m; } void state_clear(int m) { state &= ~m; } - bool is_peered() { return state_test(PG_STATE_PEERED); } + bool is_complete() { return state_test(PG_STATE_COMPLETE); } + bool is_peered() { return state_test(PG_STATE_PEERED); } + //bool is_crowned() { return state_test(PG_STATE_CROWNED); } + bool is_clean() { return state_test(PG_STATE_CLEAN); } + //bool is_flushing() { return state_test(PG_STATE_FLUSHING); } + bool is_stray() { return state_test(PG_STATE_STRAY); } + void mark_peered(); - bool is_complete() { return state_test(PG_STATE_COMPLETE); } void mark_complete(); - bool is_clean() { return state_test(PG_STATE_CLEAN); } void mark_clean(); int num_active_ops() { @@ -350,6 +359,7 @@ class PG { "version", &v, sizeof(v)); local_objects[*it] = v; + cout << " o " << hex << *it << dec << " v " << v << endl; } } -- 2.39.5