From 3593900d91781971fc49efc17c511124665066a1 Mon Sep 17 00:00:00 2001 From: sage Date: Fri, 5 May 2006 16:20:54 +0000 Subject: [PATCH] lotsa osd-related changes git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@754 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 2 +- ceph/crush/crush.h | 27 +- ceph/mds/MDS.cc | 33 +- ceph/mds/OSDMonitor.cc | 2 +- ceph/messages/MOSDMap.h | 25 +- ceph/messages/MOSDOp.h | 10 +- ceph/messages/MOSDOpReply.h | 11 +- ceph/messages/MOSDPGNotify.h | 26 +- ceph/msg/Message.h | 6 +- ceph/msg/Messenger.cc | 17 +- ceph/osd/OSD.cc | 1795 ++++++++++++++-------------------- ceph/osd/OSD.h | 169 ++-- ceph/osd/OSDMap.cc | 15 +- ceph/osd/OSDMap.h | 30 +- ceph/osd/PG.cc | 301 ++---- ceph/osd/PG.h | 602 ++++-------- ceph/osdc/Objecter.cc | 24 +- 17 files changed, 1218 insertions(+), 1877 deletions(-) diff --git a/ceph/Makefile b/ceph/Makefile index 17f036519e0ca..93cc6eb39fc1e 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -8,7 +8,7 @@ # This makes it less annoying to build on non-mpi hosts for dev work, and seems to # behave just fine... change ${CC} back to mpicxx if you get paranoid. CC = g++ -CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE -DUSE_EBOFS +CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE LIBS = -lpthread -lrt -ldb #for normal mpich2 machines diff --git a/ceph/crush/crush.h b/ceph/crush/crush.h index 10b76f6721435..46320d6054b70 100644 --- a/ceph/crush/crush.h +++ b/ceph/crush/crush.h @@ -94,8 +94,6 @@ namespace crush { Hash h; public: - set out; - map overload; map rules; //map collisions; @@ -117,9 +115,9 @@ namespace crush { int s = h.get_seed(); bl.append((char*)&s, sizeof(s)); - ::_encode(out, bl); - ::_encode(overload, bl); - + //::_encode(out, bl); + //::_encode(overload, bl); + // rules n = rules.size(); bl.append((char*)&n, sizeof(n)); @@ -151,8 +149,8 @@ namespace crush { off += sizeof(s); h.set_seed(s); - ::_decode(out, bl, off); - ::_decode(overload, bl, off); + //::_decode(out, bl, off); + //::_decode(overload, bl, off); // rules bl.copy(off, sizeof(n), (char*)&n); @@ -258,7 +256,8 @@ namespace crush { int type, Bucket *inbucket, vector& outvec, - bool firstn) { + bool firstn, + set& outset, map& overloadmap) { int off = outvec.size(); // for each replica @@ -335,12 +334,12 @@ namespace crush { } // ok choice? - if (type == 0 && out.count(outv)) + if (type == 0 && outset.count(outv)) bad = true; - if (overload.count(outv)) { + if (overloadmap.count(outv)) { float f = (float)(h(x, outv) % 1000) / 1000.0; - if (f > overload[outv]) + if (f > overloadmap[outv]) bad = true; } @@ -356,7 +355,8 @@ namespace crush { } - void do_rule(Rule& rule, int x, vector& result) { + void do_rule(Rule& rule, int x, vector& result, + set& outset, map& overloadmap) { //int numresult = 0; result.clear(); @@ -401,7 +401,8 @@ namespace crush { i++) { assert(buckets.count(*i)); Bucket *b = buckets[*i]; - choose(x, numrep, type, b, out, firstn); + choose(x, numrep, type, b, out, firstn, + outset, overloadmap); } // for inrow // put back into w diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index 5d0bea6f97347..0aee386e74696 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -118,11 +118,10 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) { // osdmap = new OSDMap(); osdmap->set_pg_bits(g_conf.osd_pg_bits); - osdmap->inc_version(); // version = 1 - assert(osdmap->get_version() == 1); + osdmap->inc_epoch(); // = 1 + assert(osdmap->get_epoch() == 1); - if (!g_conf.mkfs) - osdmap->inc_version(); // 1 -> mkfs, we want something bigger or else OSDs will recreate PGs + if (g_conf.mkfs) osdmap->set_mkfs(); Bucket *b = new UniformBucket(1, 0); int root = osdmap->crush.add_bucket(b); @@ -360,7 +359,7 @@ int MDS::shutdown_final() void MDS::bcast_osd_map() { - dout(1) << "bcast_osd_map version " << osdmap->get_version() << endl; + dout(1) << "bcast_osd_map epoch " << osdmap->get_epoch() << endl; assert(get_nodeid() == 0); // tell mds @@ -711,11 +710,11 @@ void MDS::handle_osd_getmap(Message *m) void MDS::handle_osd_map(MOSDMap *m) { if (!osdmap || - m->get_version() > osdmap->get_version()) { + m->get_epoch() > osdmap->get_epoch()) { if (osdmap) { - dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl; + dout(3) << "handle_osd_map got osd map epoch " << m->get_epoch() << " > " << osdmap->get_epoch() << endl; } else { - dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl; + dout(3) << "handle_osd_map got osd map epoch " << m->get_epoch() << endl; } osdmap->decode(m->get_osdmap()); @@ -724,7 +723,7 @@ void MDS::handle_osd_map(MOSDMap *m) // ** FIXME ** } else { - dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl; + dout(3) << "handle_osd_map ignoring osd map epoch " << m->get_epoch() << " <= " << osdmap->get_epoch() << endl; } } @@ -735,18 +734,18 @@ void MDS::mkfs(Context *onfinish) // send MKFS to osds set ls; + assert(osdmap); osdmap->get_all_osds(ls); for (set::iterator it = ls.begin(); it != ls.end(); it++) { - // issue mkfs - messenger->send_message(new MOSDMap(osdmap, true), + messenger->send_message(new MOSDMap(osdmap), MSG_ADDR_OSD(*it)); pending_mkfs.insert(*it); } - + waiting_for_mkfs = onfinish; } @@ -769,9 +768,9 @@ void MDS::handle_osd_mkfs_ack(Message *m) void MDS::handle_client_mount(MClientMount *m) { - // mkfs? (sorta hack!) + // mkfs? if (g_conf.mkfs) { - dout(3) << "MKFS flag is set" << endl; + dout(3) << "MKFS" << endl; if (mdcache->get_root()) { dout(3) << " root inode is already open" << endl; } else { @@ -791,11 +790,9 @@ void MDS::handle_client_mount(MClientMount *m) //if (pgmanager) pgmanager->mark_open(); // init osds too - dout(3) << "wiping osds too" << endl; mkfs(new C_MDS_Unpause(this)); waiting_for_unpause.push_back(new C_MDS_RetryMessage(this, m)); - return; - + return; } } @@ -805,8 +802,6 @@ void MDS::handle_client_mount(MClientMount *m) assert(whoami == 0); // mds0 mounts/unmounts - - // ack messenger->send_message(new MClientMountAck(m, osdmap), m->get_source(), m->get_source_port()); diff --git a/ceph/mds/OSDMonitor.cc b/ceph/mds/OSDMonitor.cc index 86fffa431f0a7..370c0e0d927f3 100644 --- a/ceph/mds/OSDMonitor.cc +++ b/ceph/mds/OSDMonitor.cc @@ -60,7 +60,7 @@ void OSDMonitor::fake_reorg() dout(1) << "changing OSD map, marking osd" << d << " down" << endl; mds->osdmap->mark_down(d); - mds->osdmap->inc_version(); + mds->osdmap->inc_epoch(); d++; // bcast diff --git a/ceph/messages/MOSDMap.h b/ceph/messages/MOSDMap.h index b5ceedcbdf2df..905d2dc3699a9 100644 --- a/ceph/messages/MOSDMap.h +++ b/ceph/messages/MOSDMap.h @@ -21,8 +21,8 @@ class MOSDMap : public Message { bufferlist osdmap; - __uint64_t version; - bool mkfs; + epoch_t epoch; + //bool mkfs; public: // osdmap @@ -30,14 +30,13 @@ class MOSDMap : public Message { return osdmap; } - __uint64_t get_version() { return version; } - bool is_mkfs() { return mkfs; } + epoch_t get_epoch() { return epoch; } + //bool is_mkfs() { return mkfs; } - MOSDMap(OSDMap *oc, bool mkfs=false) : + MOSDMap(OSDMap *oc) : Message(MSG_OSD_MAP) { oc->encode(osdmap); - version = oc->get_version(); - this->mkfs = mkfs; + epoch = oc->get_epoch(); } MOSDMap() {} @@ -45,16 +44,16 @@ class MOSDMap : public Message { // marshalling virtual void decode_payload() { int off = 0; - payload.copy(off, sizeof(version), (char*)&version); - off += sizeof(version); - payload.copy(off, sizeof(mkfs), (char*)&mkfs); - off += sizeof(mkfs); + payload.copy(off, sizeof(epoch), (char*)&epoch); + off += sizeof(epoch); + //payload.copy(off, sizeof(mkfs), (char*)&mkfs); + //off += sizeof(mkfs); payload.splice(0, off); osdmap.claim(payload); } virtual void encode_payload() { - payload.append((char*)&version, sizeof(version)); - payload.append((char*)&mkfs, sizeof(mkfs)); + payload.append((char*)&epoch, sizeof(epoch)); + //payload.append((char*)&mkfs, sizeof(mkfs)); payload.claim_append(osdmap); } diff --git a/ceph/messages/MOSDOp.h b/ceph/messages/MOSDOp.h index fcc202737c994..b78a6bb2f4741 100644 --- a/ceph/messages/MOSDOp.h +++ b/ceph/messages/MOSDOp.h @@ -17,6 +17,8 @@ #include "msg/Message.h" +#include "osd/OSDMap.h" + /* * OSD op * @@ -56,7 +58,7 @@ typedef struct { object_t oid; pg_t pg; int pg_role;//, rg_nrep; - version_t map_version; + epoch_t map_epoch; int op; size_t length, offset; @@ -83,7 +85,7 @@ class MOSDOp : public Message { const object_t get_oid() { return st.oid; } const pg_t get_pg() { return st.pg; } - const version_t get_map_version() { return st.map_version; } + const epoch_t get_map_epoch() { return st.map_epoch; } const int get_pg_role() { return st.pg_role; } // who am i asking for? const version_t get_version() { return st.version; } @@ -110,7 +112,7 @@ class MOSDOp : public Message { long get_pcid() { return st.pcid; } MOSDOp(long tid, msg_addr_t asker, - object_t oid, pg_t pg, version_t mapversion, int op) : + object_t oid, pg_t pg, epoch_t mapepoch, int op) : Message(MSG_OSD_OP) { memset(&st, 0, sizeof(st)); this->st.tid = tid; @@ -119,7 +121,7 @@ class MOSDOp : public Message { this->st.oid = oid; this->st.pg = pg; this->st.pg_role = 0; - this->st.map_version = mapversion; + this->st.map_epoch = mapepoch; this->st.op = op; this->st.want_ack = true; diff --git a/ceph/messages/MOSDOpReply.h b/ceph/messages/MOSDOpReply.h index 0d5b02ca19ecb..a74feed13e1b2 100644 --- a/ceph/messages/MOSDOpReply.h +++ b/ceph/messages/MOSDOpReply.h @@ -35,6 +35,7 @@ typedef struct { long pcid; object_t oid; + pg_t pg; int op; @@ -45,7 +46,7 @@ typedef struct { size_t object_size; version_t version; - __uint64_t _new_map_version; + epoch_t _new_map_epoch; size_t _data_len, _oc_len; } MOSDOpReply_st; @@ -58,6 +59,7 @@ class MOSDOpReply : public Message { public: long get_tid() { return st.tid; } object_t get_oid() { return st.oid; } + pg_t get_pg() { return st.pg; } int get_op() { return st.op; } bool get_commit() { return st.commit; } @@ -83,7 +85,7 @@ class MOSDOpReply : public Message { } // osdmap - __uint64_t get_map_version() { return st._new_map_version; } + epoch_t get_map_epoch() { return st._new_map_epoch; } bufferlist& get_osdmap() { return osdmap; } @@ -99,6 +101,7 @@ class MOSDOpReply : public Message { this->st.tid = req->st.tid; this->st.oid = req->st.oid; + this->st.pg = req->st.pg; this->st.op = req->st.op; this->st.result = result; this->st.commit = commit; @@ -109,9 +112,9 @@ class MOSDOpReply : public Message { // attach updated cluster spec? if (oc && - req->get_map_version() < oc->get_version()) { + req->get_map_epoch() < oc->get_epoch()) { oc->encode(osdmap); - st._new_map_version = oc->get_version(); + st._new_map_epoch = oc->get_epoch(); st._oc_len = osdmap.length(); } } diff --git a/ceph/messages/MOSDPGNotify.h b/ceph/messages/MOSDPGNotify.h index 0f0917cd95d79..6ca69a24ec5cf 100644 --- a/ceph/messages/MOSDPGNotify.h +++ b/ceph/messages/MOSDPGNotify.h @@ -11,38 +11,42 @@ * */ - #ifndef __MOSDPGPEERNOTIFY_H #define __MOSDPGPEERNOTIFY_H #include "msg/Message.h" +#include "osd/PG.h" + +/* + * PGNotify - notify primary of my PGs and versions. + */ class MOSDPGNotify : public Message { - __uint64_t map_version; - map pg_list; // pgid -> last_complete + epoch_t epoch; + list pg_list; // pgid -> version public: - __uint64_t get_version() { return map_version; } - map& get_pg_list() { return pg_list; } + version_t get_epoch() { return epoch; } + list& get_pg_list() { return pg_list; } MOSDPGNotify() {} - MOSDPGNotify(__uint64_t v, map& l) : + MOSDPGNotify(epoch_t e, list& l) : Message(MSG_OSD_PG_NOTIFY) { - this->map_version = v; - pg_list = l; // FIXME + this->epoch = e; + pg_list.splice(pg_list.begin(),l); } char *get_type_name() { return "PGnot"; } void encode_payload() { - payload.append((char*)&map_version, sizeof(map_version)); + payload.append((char*)&epoch, sizeof(epoch)); _encode(pg_list, payload); } void decode_payload() { int off = 0; - payload.copy(off, sizeof(map_version), (char*)&map_version); - off += sizeof(map_version); + payload.copy(off, sizeof(epoch), (char*)&epoch); + off += sizeof(epoch); _decode(pg_list, payload, off); } }; diff --git a/ceph/msg/Message.h b/ceph/msg/Message.h index 5bb527d9cdf39..cff10e2f127d2 100644 --- a/ceph/msg/Message.h +++ b/ceph/msg/Message.h @@ -45,11 +45,9 @@ #define MSG_OSD_MKFS_ACK 19 #define MSG_OSD_PG_NOTIFY 50 -#define MSG_OSD_PG_PEER 51 -#define MSG_OSD_PG_PEERACK 52 +#define MSG_OSD_PG_QUERY 51 +#define MSG_OSD_PG_SUMMARY 52 -#define MSG_OSD_PG_QUERY 55 -#define MSG_OSD_PG_QUERYREPLY 56 #define MSG_OSD_PG_UPDATE 57 #define MSG_OSD_PG_REMOVE 58 diff --git a/ceph/msg/Messenger.cc b/ceph/msg/Messenger.cc index 5bd7542e27c88..09de05750b209 100644 --- a/ceph/msg/Messenger.cc +++ b/ceph/msg/Messenger.cc @@ -43,9 +43,9 @@ using namespace std; #include "messages/MOSDOpReply.h" #include "messages/MOSDMap.h" #include "messages/MOSDPGNotify.h" -#include "messages/MOSDPGPeer.h" -#include "messages/MOSDPGPeerAck.h" -#include "messages/MOSDPGUpdate.h" +#include "messages/MOSDPGQuery.h" +#include "messages/MOSDPGSummary.h" +//#include "messages/MOSDPGUpdate.h" //#include "messages/MOSDPGQuery.h" //#include "messages/MOSDPGQueryReply.h" @@ -320,14 +320,11 @@ decode_message(msg_envelope_t& env, bufferlist& payload) case MSG_OSD_PG_NOTIFY: m = new MOSDPGNotify(); break; - case MSG_OSD_PG_PEER: - m = new MOSDPGPeer(); - break; - case MSG_OSD_PG_PEERACK: - m = new MOSDPGPeerAck(); + case MSG_OSD_PG_QUERY: + m = new MOSDPGQuery(); break; - case MSG_OSD_PG_UPDATE: - m = new MOSDPGUpdate(); + case MSG_OSD_PG_SUMMARY: + m = new MOSDPGSummary(); break; /* case MSG_OSD_PG_QUERY: diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index fbd2d5770640a..1828a9f86a483 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -24,17 +24,15 @@ # include "FakeStore.h" #endif -#ifdef USE_EBOFS -# include "ebofs/Ebofs.h" -#endif +#include "ebofs/Ebofs.h" #include "Ager.h" -#include "mds/MDS.h" #include "msg/Messenger.h" #include "msg/Message.h" +#include "mds/MDS.h" #include "msg/HostMonitor.h" #include "messages/MGenericMessage.h" @@ -45,9 +43,8 @@ #include "messages/MOSDMap.h" #include "messages/MOSDPGNotify.h" -#include "messages/MOSDPGPeer.h" -#include "messages/MOSDPGPeerAck.h" -#include "messages/MOSDPGUpdate.h" +#include "messages/MOSDPGQuery.h" +#include "messages/MOSDPGSummary.h" #include "common/Logger.h" #include "common/LogType.h" @@ -70,6 +67,9 @@ char *ebofs_base_path = "./ebofsdev"; #define ROLE_TYPE(x) ((x)>0 ? 1:(x)) + + +// force remount hack for performance testing FakeStore class C_Remount : public Context { OSD *osd; public: @@ -90,14 +90,13 @@ void OSD::force_remount() osd_lock.Unlock(); dout(0) << "finished remount" << endl; } - +// // cons/des LogType osd_logtype; - OSD::OSD(int id, Messenger *m) { whoami = id; @@ -117,6 +116,7 @@ OSD::OSD(int id, Messenger *m) g_timer.add_event_after(g_conf.osd_remount_at, new C_Remount(this)); + // init object store // try in this order: // ebofsdev/all // ebofsdev/$num @@ -134,18 +134,17 @@ OSD::OSD(int id, Messenger *m) if (::lstat(dev_path, &sta) != 0) sprintf(dev_path, "%s/%s", ebofs_base_path, hostname); + if (g_conf.ebofs) { + store = new Ebofs(dev_path); + } #ifdef USE_OBFS - if (g_conf.uofs) { + else if (g_conf.uofs) { store = new OBFSStore(whoami, NULL, dev_path); } -#else -# ifdef USE_EBOFS - if (g_conf.ebofs) { - store = new Ebofs(dev_path); - } else -# endif - store = new FakeStore(osd_base_path, whoami); #endif + else { + store = new FakeStore(osd_base_path, whoami); + } // monitor char s[80]; @@ -154,7 +153,7 @@ OSD::OSD(int id, Messenger *m) monitor = new HostMonitor(m, st); monitor->set_notify_port(MDS_PORT_OSDMON); - // hack + // for testing monitoring int i = whoami; if (++i == g_conf.num_osd) i = 0; monitor->get_hosts().insert(MSG_ADDR_OSD(i)); @@ -164,6 +163,7 @@ OSD::OSD(int id, Messenger *m) monitor->get_hosts().insert(MSG_ADDR_OSD(i)); monitor->get_notify().insert(MSG_ADDR_MDS(0)); + // // log char name[80]; @@ -186,7 +186,7 @@ OSD::OSD(int id, Messenger *m) osd_logtype.add_inc("rlsum"); osd_logtype.add_inc("rlnum"); - // Thread pool + // request thread pool { char name[80]; sprintf(name,"osd%d.threadpool", whoami); @@ -204,34 +204,37 @@ OSD::~OSD() if (messenger) { delete messenger; messenger = 0; } if (logger) { delete logger; logger = 0; } if (store) { delete store; store = 0; } - } int OSD::init() { osd_lock.Lock(); + { + // mkfs? + if (g_conf.osd_mkfs) { + dout(2) << "mkfs" << endl; + store->mkfs(); + } + + // mount. + int r = store->mount(); + assert(r>=0); + + // age? + if (g_conf.osd_age_time > 0) { + Ager ager(store); + ager.age(g_conf.osd_age_time, g_conf.osd_age, g_conf.osd_age / 2.0, 5, g_conf.osd_age); + } - if (g_conf.osd_mkfs) { - dout(2) << "mkfs" << endl; - - store->mkfs(); - - } - int r = store->mount(); - - if (g_conf.osd_age_time > 0) { - Ager ager(store); - ager.age(g_conf.osd_age_time, g_conf.osd_age, g_conf.osd_age / 2.0, 5, g_conf.osd_age); + // monitor. + monitor->init(); } - - monitor->init(); - osd_lock.Unlock(); // i'm ready! messenger->set_dispatcher(this); - return r; + return 0; } int OSD::shutdown() @@ -372,14 +375,11 @@ void OSD::dispatch(Message *m) case MSG_OSD_PG_NOTIFY: handle_pg_notify((MOSDPGNotify*)m); break; - case MSG_OSD_PG_PEER: - handle_pg_peer((MOSDPGPeer*)m); - break; - case MSG_OSD_PG_PEERACK: - handle_pg_peer_ack((MOSDPGPeerAck*)m); + case MSG_OSD_PG_QUERY: + handle_pg_query((MOSDPGQuery*)m); break; - case MSG_OSD_PG_UPDATE: - handle_pg_update((MOSDPGUpdate*)m); + case MSG_OSD_PG_SUMMARY: + handle_pg_summary((MOSDPGSummary*)m); break; case MSG_OSD_OP: @@ -423,7 +423,7 @@ void OSD::dispatch(Message *m) void OSD::handle_op_reply(MOSDOpReply *m) { // did i get a new osdmap? - if (m->get_map_version() > osdmap->get_version()) { + if (m->get_map_epoch() > osdmap->get_epoch()) { dout(3) << "replica op reply includes a new osd map" << endl; update_map(m->get_osdmap()); } @@ -433,12 +433,6 @@ void OSD::handle_op_reply(MOSDOpReply *m) case OSD_OP_REP_PULL: op_rep_pull_reply(m); break; - case OSD_OP_REP_PUSH: - op_rep_push_reply(m); - break; - case OSD_OP_REP_REMOVE: - op_rep_remove_reply(m); - break; case OSD_OP_REP_WRITE: case OSD_OP_REP_TRUNCATE: @@ -605,64 +599,60 @@ void OSD::wait_for_new_map(Message *m) /** update_map * assimilate a new OSDMap. scan pgs. */ -void OSD::update_map(bufferlist& state, bool mkfs) +void OSD::update_map(bufferlist& state) { // decode new map osdmap = new OSDMap(); osdmap->decode(state); - osdmaps[osdmap->get_version()] = osdmap; - dout(7) << "got osd map version " << osdmap->get_version() << endl; + osdmaps[osdmap->get_epoch()] = osdmap; + dout(7) << "got osd map version " << osdmap->get_epoch() << endl; // pg list list pg_list; - if (mkfs) { - assert(osdmap->get_version() == 1); + if (osdmap->is_mkfs()) { + dout(1) << "mkfs" << endl; + assert(osdmap->get_epoch() == 1); ps_t maxps = 1LL << osdmap->get_pg_bits(); - + // create PGs for (int nrep = 1; nrep <= MIN(g_conf.num_osd, g_conf.osd_max_rep); // for low osd counts.. hackish bleh nrep++) { for (pg_t ps = 0; ps < maxps; ps++) { pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep); - vector acting; - osdmap->pg_to_acting_osds(pgid, acting); + int role = osdmap->get_pg_acting_role(pgid, whoami); + if (role < 0) continue; - - if (acting[0] == whoami) { - PG *pg = create_pg(pgid); - pg->acting = acting; - pg->set_role(0); - pg->set_primary_since(osdmap->get_version()); - pg->mark_complete( osdmap->get_version() ); - - dout(7) << "created " << *pg << endl; + PG *pg = create_pg(pgid); + osdmap->pg_to_acting_osds(pgid, pg->acting); + pg->set_role(role); + pg->info.last_epoch_started = pg->info.same_primary_since = osdmap->get_epoch(); + pg->last_epoch_started_any = osdmap->get_epoch(); + pg->mark_complete(); + pg->mark_active(); - pg_list.push_back(pgid); - } + dout(7) << "created " << *pg << endl; + pg_list.push_back(pgid); } // local PG too pg_t pgid = osdmap->osd_nrep_to_pg(whoami, nrep); - vector acting; - osdmap->pg_to_acting_osds(pgid, acting); - - if (acting[0] == whoami) { - PG *pg = create_pg(pgid); - pg->acting = acting; - pg->set_role(0); - pg->set_primary_since(osdmap->get_version()); - pg->mark_complete( osdmap->get_version() ); - - dout(7) << "created " << *pg << endl; - pg_list.push_back(pgid); - } + int role = osdmap->get_pg_acting_role(pgid, whoami); + if (role < 0) continue; + PG *pg = create_pg(pgid); + osdmap->pg_to_acting_osds(pgid, pg->acting); + pg->set_role(role); + pg->info.last_epoch_started = pg->info.same_primary_since = osdmap->get_epoch(); + pg->last_epoch_started_any = osdmap->get_epoch(); + pg->mark_complete(); + pg->mark_active(); + + dout(7) << "created " << *pg << endl; + pg_list.push_back(pgid); } - - } else { // get pg list get_pg_list(pg_list); @@ -672,121 +662,10 @@ void OSD::update_map(bufferlist& state, bool mkfs) advance_map(pg_list); activate_map(pg_list); - /* - if (mkfs) { - // mark all peers complete - for (list::iterator pgid = pg_list.begin(); - pgid != pg_list.end(); - pgid++) { - PG *pg = get_pg(*pgid); - for (map::iterator it = pg->peers.begin(); - it != pg->peers.end(); - it++) { - PGPeer *p = it->second; - //dout(7) << " " << *pg << " telling peer osd" << p->get_peer() << " they are complete" << endl; - - messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()), - MSG_ADDR_OSD(p->get_peer())); - } - } - }*/ - - // process waiters take_waiters(waiting_for_osdmap); } -void OSD::handle_osd_map(MOSDMap *m) -{ - // wait for ops to finish - wait_for_no_ops(); - - if (m->is_mkfs()) { - dout(2) << "MKFS" << endl; - } - - if (!osdmap || - m->get_version() > osdmap->get_version()) { - if (osdmap) { - dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl; - } else { - dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl; - } - - update_map(m->get_osdmap(), m->is_mkfs()); - - } else { - dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl; - } - - if (m->is_mkfs()) { - // ack - messenger->send_message(new MGenericMessage(MSG_OSD_MKFS_ACK), - m->get_source()); - } - - delete m; -} - - -OSDMap* OSD::get_osd_map(version_t v) -{ - assert(osdmaps[v]); - return osdmaps[v]; -} - - - -// ====================================================== -// REPLICATION - -// PG - -void OSD::get_pg_list(list& ls) -{ - // just list collections; assume they're all pg's (for now) - store->list_collections(ls); -} - -bool OSD::pg_exists(pg_t pgid) -{ - return store->collection_exists(pgid); -} - -PG *OSD::create_pg(pg_t pgid) -{ - assert(pg_map.count(pgid) == 0); - assert(!pg_exists(pgid)); - - PG *pg = new PG(whoami, pgid); - //pg->info.created = osdmap->get_version(); - - pg->store(store); - pg_map[pgid] = pg; - return pg; -} - -PG *OSD::get_pg(pg_t pgid) -{ - // already open? - if (pg_map.count(pgid)) - return pg_map[pgid]; - - // exists? - if (!pg_exists(pgid)) - return 0; - - // open, stat collection - PG *pg = new PG(whoami, pgid); - pg->fetch(store); - pg_map[pgid] = pg; - - return pg; -} - - - - /** * scan placement groups, initiate any replication @@ -794,7 +673,7 @@ PG *OSD::get_pg(pg_t pgid) */ void OSD::advance_map(list& ls) { - dout(7) << "advance_map version " << osdmap->get_version() << endl; + dout(7) << "advance_map version " << osdmap->get_epoch() << endl; // scan pg's for (list::iterator it = ls.begin(); @@ -804,6 +683,12 @@ void OSD::advance_map(list& ls) PG *pg = get_pg(pgid); assert(pg); + // did i finish this epoch? + if (pg->is_active()) { + assert(pg->info.last_epoch_started == osdmap->get_epoch()); + pg->info.last_epoch_finished = osdmap->get_epoch(); + } + // get new acting set vector acting; int nrep = osdmap->pg_to_acting_osds(pgid, acting); @@ -818,39 +703,37 @@ void OSD::advance_map(list& ls) // no change? if (acting == pg->acting) continue; - + + // primary changed? + if (pg->acting[0] != acting[0]) { + pg->info.same_primary_since = osdmap->get_epoch(); + } + if (role != pg->get_role()) { - // role change. + // my role changed. dout(10) << " " << *pg << " role change " << pg->get_role() << " -> " << role << endl; // old primary? if (pg->get_role() == 0) { - // drop peers - take_waiters(pg->waiting_for_peered); + // take waiters + take_waiters(pg->waiting_for_active); for (hash_map >::iterator it = pg->waiting_for_missing_object.begin(); it != pg->waiting_for_missing_object.end(); it++) take_waiters(it->second); pg->waiting_for_missing_object.clear(); - for (hash_map >::iterator it = pg->waiting_for_clean_object.begin(); - it != pg->waiting_for_clean_object.end(); - it++) - take_waiters(it->second); - pg->waiting_for_clean_object.clear(); - + // drop peers pg->drop_peers(); - pg->state_clear(PG_STATE_CLEAN); - pg->discard_recovery_plan(); + pg->state_clear(PG::STATE_CLEAN); } - + // new primary? if (role == 0) { - pg->set_primary_since(osdmap->get_version()); - pg->state_clear(PG_STATE_PEERED); + pg->state_clear(PG::STATE_ACTIVE); } else { // we need to announce - pg->state_set(PG_STATE_STRAY); + pg->state_set(PG::STATE_ACTIVE); if (nrep == 0) dout(1) << "crashed pg " << *pg << endl; @@ -860,17 +743,19 @@ void OSD::advance_map(list& ls) // no role change. // did primary change? if (primary != pg->get_primary()) { - dout(10) << " " << *pg << " acting primary change " << pg->get_primary() << " -> " << primary << ", !peered" << endl; + dout(10) << " " << *pg << " acting primary change " + << pg->get_primary() << " -> " << primary + << ", !peered" << endl; // we need to announce - pg->state_set(PG_STATE_STRAY); + pg->state_set(PG::STATE_STRAY); } else { // primary is the same. if (role == 0) { - // i am (still) primary. but replica set changed. + // i am (still) primary. but my replica set changed. dout(10) << " " << *pg << " replica set changed, !clean !peered" << endl; - pg->state_clear(PG_STATE_PEERED); - pg->state_clear(PG_STATE_CLEAN); + pg->state_clear(PG::STATE_ACTIVE); + pg->state_clear(PG::STATE_CLEAN); } } } @@ -878,16 +763,15 @@ void OSD::advance_map(list& ls) // update PG pg->acting = acting; pg->calc_role(whoami); - pg->store(store); - + //pg->store(); // scan down osds for (set::const_iterator down = osdmap->get_down_osds().begin(); down != osdmap->get_down_osds().end(); down++) { - PGPeer *pgp = pg->get_peer(*down); + PG::PGPeer *pgp = pg->get_peer(*down); if (!pgp) continue; - + dout(10) << " " << *pg << " peer osd" << *down << " is down, removing" << endl; pg->remove_peer(*down); @@ -901,17 +785,15 @@ void OSD::advance_map(list& ls) handle_rep_op_ack(*tid, -1, false, *down); } } - } - } void OSD::activate_map(list& ls) { - dout(7) << "activate_map version " << osdmap->get_version() << endl; + dout(7) << "activate_map version " << osdmap->get_epoch() << endl; - map< int, map > notify_list; // primary -> pgid -> last_any_complete - map< int, map > start_map; // peer -> PG -> peer_role + map< int, list > notify_list; // primary -> list + map< int, map > query_map; // peer -> PG -> get_summary_since // scan pg's for (list::iterator it = ls.begin(); @@ -923,152 +805,79 @@ void OSD::activate_map(list& ls) if (pg->get_role() == 0) { // i am primary - start_peers(pg, start_map); - } + repeer(pg, query_map); + } else if (pg->is_stray()) { // i am residual|replica - notify_list[pg->get_primary()][pgid] = pg->get_last_any_complete(); + notify_list[pg->get_primary()].push_back(pg->info); } } - // notify? (residual|replica) - for (map< int, map >::iterator pit = notify_list.begin(); - pit != notify_list.end(); - pit++) - peer_notify(pit->first, pit->second); - - // start peer? (primary) - for (map< int, map >::iterator pit = start_map.begin(); - pit != start_map.end(); - pit++) - peer_start(pit->first, pit->second); - + if (!osdmap->is_mkfs()) { // hack: skip the queries/summaries if it's a mkfs + // notify? (residual|replica) + do_notifies(notify_list); + + // do queries. + do_queries(query_map); + } } -/** peer_notify - * Send an MOSDPGNotify to a primary, with a list of PGs that I have - * content for, and they are primary for. - */ -void OSD::peer_notify(int primary, map& pg_list) -{ - dout(7) << "peer_notify osd" << primary << " on " << pg_list.size() << " PGs" << endl; - MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_version(), pg_list); - messenger->send_message(m, MSG_ADDR_OSD(primary)); -} - -void OSD::start_peers(PG *pg, map< int, map >& start_map) +void OSD::handle_osd_map(MOSDMap *m) { - dout(10) << " " << *pg << " last_any_complete " << pg->get_last_any_complete() << endl; - - // determine initial peer set - map peerset; // peer -> role - - // prior map(s), if OSDs are still up - for (version_t epoch = pg->get_last_any_complete(); - epoch < osdmap->get_version(); - epoch++) { - OSDMap *omap = get_osd_map(epoch); - assert(omap); - - vector acting; - omap->pg_to_acting_osds(pg->get_pgid(), acting); - - for (unsigned i=0; iis_up(acting[i])) - peerset[acting[i]] = -1; - } - - // current map - for (unsigned i=1; iacting.size(); i++) - peerset[pg->acting[i]] = i>0 ? 1:0; - - - // check peers - bool havepeers = true; - for (map::iterator it = peerset.begin(); - it != peerset.end(); - it++) { - int who = it->first; - int role = it->second; - if (who == whoami) continue; // nevermind me + // wait for ops to finish + wait_for_no_ops(); - PGPeer *pgp = pg->get_peer(who); - if (pgp && pgp->is_active() && - pgp->get_role() == role) { - dout(10) << " " << *pg << " actively peered with osd" << who << " role " << role << endl; + if (!osdmap || + m->get_epoch() > osdmap->get_epoch()) { + if (osdmap) { + dout(3) << "handle_osd_map got osd map epoch " << m->get_epoch() << " > " << osdmap->get_epoch() << endl; } else { - if (pgp) { - pg->remove_peer(who); - dout(10) << " " << *pg << " need to re-peer with osd" << who << " role " << role << endl; - } else { - dout(10) << " " << *pg << " need to peer with osd" << who << " role " << role << endl; - } - start_map[who][pg] = role; - havepeers = false; + dout(3) << "handle_osd_map got osd map epoch " << m->get_epoch() << endl; } - } - if (havepeers && - !pg->is_peered()) { - dout(10) << " " << *pg << " already has necessary peers, analyzing" << endl; - pg->mark_peered(); - take_waiters(pg->waiting_for_peered); + update_map(m->get_osdmap()); - plan_recovery(pg); - do_recovery(pg); + } else { + dout(3) << "handle_osd_map ignoring osd map epoch " << m->get_epoch() << " <= " << osdmap->get_epoch() << endl; + } + + if (osdmap->is_mkfs()) { + // ack + messenger->send_message(new MGenericMessage(MSG_OSD_MKFS_ACK), + m->get_source()); } + + delete m; } -/** peer_start - * initiate a peer session with a replica on given list of PGs - */ -void OSD::peer_start(int replica, map& pg_map) +OSDMap* OSD::get_osd_map(version_t v) { - dout(7) << "peer_start with osd" << replica << " on " << pg_map.size() << " PGs" << endl; - - list pgids; - - for (map::iterator it = pg_map.begin(); - it != pg_map.end(); - it++) { - PG *pg = it->first; - int role = it->second; - - assert(pg->get_peer(replica) == 0); - //PGPeer *p = - pg->new_peer(replica, role); - - // set last_request stamp? - // ... + assert(osdmaps[v]); + return osdmaps[v]; +} - pgids.push_back(pg->get_pgid()); // add to list - } - MOSDPGPeer *m = new MOSDPGPeer(osdmap->get_version(), pgids); - messenger->send_message(m, - MSG_ADDR_OSD(replica)); -} -bool OSD::require_current_map(Message *m, version_t v) +bool OSD::require_current_map(Message *m, epoch_t ep) { int from = MSG_ADDR_NUM(m->get_source()); // older map? - if (v < osdmap->get_version()) { - dout(7) << " from old map version " << v << " < " << osdmap->get_version() << endl; + if (ep < osdmap->get_epoch()) { + dout(7) << " from old map epoch " << ep << " < " << osdmap->get_epoch() << endl; delete m; // discard and ignore. return false; } // newer map? - if (v > osdmap->get_version()) { - dout(7) << " from newer map version " << v << " > " << osdmap->get_version() << endl; + if (ep > osdmap->get_epoch()) { + dout(7) << " from newer map epoch " << ep << " > " << osdmap->get_epoch() << endl; wait_for_new_map(m); return false; } @@ -1080,8 +889,7 @@ bool OSD::require_current_map(Message *m, version_t v) return false; } - assert(v == osdmap->get_version()); - + assert(ep == osdmap->get_epoch()); return true; } @@ -1090,31 +898,22 @@ bool OSD::require_current_map(Message *m, version_t v) * require that we have same (or newer) map, and that * the source is the pg primary. */ -bool OSD::require_current_pg_primary(Message *m, version_t v, PG *pg) +bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch) { int from = MSG_ADDR_NUM(m->get_source()); // newer map? - if (v > osdmap->get_version()) { - dout(7) << " from newer map version " << v << " > " << osdmap->get_version() << endl; + if (epoch > osdmap->get_epoch()) { + dout(7) << " from newer map epoch " << epoch << " > " << osdmap->get_epoch() << endl; wait_for_new_map(m); return false; } - // older map? - if (v < osdmap->get_version()) { - // same primary? - // FIXME.. line of succession must match! - if (from != pg->get_primary()) { - dout(7) << " not from pg primary osd" << pg->get_primary() << ", dropping" << endl; - delete m; // discard and ignore. - return false; - } - } - // down? if (osdmap->is_down(from)) { - dout(7) << " from down OSD osd" << from << ", pinging" << endl; + dout(7) << " from down OSD osd" << from + << ", pinging?" << endl; + assert(epoch < osdmap->get_epoch()); // FIXME return false; } @@ -1124,93 +923,333 @@ bool OSD::require_current_pg_primary(Message *m, version_t v, PG *pg) -void OSD::handle_pg_notify(MOSDPGNotify *m) -{ - int from = MSG_ADDR_NUM(m->get_source()); - dout(7) << "handle_pg_notify from osd" << from << endl; - - if (!require_current_map(m, m->get_version())) return; - - // look for unknown PGs i'm primary for - map< int, map > start_map; - for (map::iterator it = m->get_pg_list().begin(); - it != m->get_pg_list().end(); - it++) { - pg_t pgid = it->first; - PG *pg = get_pg(pgid); +// ====================================================== +// REPLICATION - if (!pg) { - pg = create_pg(pgid); +// PG - int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting); - assert(nrep > 0); - assert(pg->acting[0] == whoami); - pg->set_role(0); - pg->set_primary_since( osdmap->get_version() ); // FIXME: this may miss a few epochs! - pg->mark_any_complete( it->second ); +void OSD::get_pg_list(list& ls) +{ + // just list collections; assume they're all pg's (for now) + store->list_collections(ls); +} - dout(10) << " " << *pg << " is new, nrep=" << nrep << endl; +bool OSD::pg_exists(pg_t pgid) +{ + return store->collection_exists(pgid); +} - // start peers - start_peers(pg, start_map); +PG *OSD::create_pg(pg_t pgid) +{ + assert(pg_map.count(pgid) == 0); + assert(!pg_exists(pgid)); + + PG *pg = new PG(this, pgid); + + //pg->info.created = osdmap->get_epoch(); + //pg->store(store); + + pg_map[pgid] = pg; + return pg; +} + +PG *OSD::get_pg(pg_t pgid) +{ + // already open? + if (pg_map.count(pgid)) + return pg_map[pgid]; + + // exists? + if (!pg_exists(pgid)) + return 0; + + // open, stat collection + PG *pg = new PG(this, pgid); + //pg->fetch(store); + pg_map[pgid] = pg; + + return pg; +} + + + + +/** do_notifies + * Send an MOSDPGNotify to a primary, with a list of PGs that I have + * content for, and they are primary for. + */ + +void OSD::do_notifies(map< int, list >& notify_list) +{ + for (map< int, list >::iterator it = notify_list.begin(); + it != notify_list.end(); + it++) { + dout(7) << "do_notify osd" << it->first << " on " << it->second.size() << " PGs" << endl; + MOSDPGNotify *m = new MOSDPGNotify(osdmap->get_epoch(), it->second); + messenger->send_message(m, MSG_ADDR_OSD(it->first)); + } +} + + +/** do_queries + * send out pending queries for info | summaries + */ +void OSD::do_queries(map< int, map >& query_map) +{ + for (map< int, map >::iterator pit = query_map.begin(); + pit != query_map.end(); + pit++) { + int who = pit->first; + dout(7) << "do_queries querying osd" << who + << " on " << pit->second.size() << " PGs" << endl; + + MOSDPGQuery *m = new MOSDPGQuery(osdmap->get_epoch(), + pit->second); + messenger->send_message(m, + MSG_ADDR_OSD(who)); + } +} + + +/** repeer() + * primary: check, query whatever replicas i need to. + */ +void OSD::repeer(PG *pg, map< int, map >& query_map) +{ + dout(10) << "repeer " << *pg << endl; + + // determine initial peer set + map peerset; // peer -> role + + // prior map(s), if OSDs are still up + for (version_t epoch = pg->last_epoch_started_any; + epoch < osdmap->get_epoch(); + epoch++) { + OSDMap *omap = get_osd_map(epoch); + assert(omap); + + vector acting; + omap->pg_to_acting_osds(pg->get_pgid(), acting); + + for (unsigned i=0; iis_up(acting[i])) + peerset[acting[i]] = -1; + } + + // current map + for (unsigned i=1; iacting.size(); i++) + peerset[pg->acting[i]] = i>0 ? 1:0; + + + // -- query info from everyone. + bool haveallinfo = true; + for (map::iterator it = peerset.begin(); + it != peerset.end(); + it++) { + int who = it->first; + int role = it->second; + if (who == whoami) continue; // nevermind me + + PG::PGPeer *pgp = pg->get_peer(who); + if (pgp && pgp->have_info()) { + dout(10) << *pg << " have info from osd" << who << " role " << role << endl; + continue; + } + if (pgp && pgp->state_test(PG::PGPeer::STATE_QINFO)) { + dout(10) << *pg << " waiting for osd" << who << " role " << role << endl; + } else { + dout(10) << *pg << " querying info from osd" << who << " role " << role << endl; + query_map[who][pg->get_pgid()] = 0; + } + haveallinfo = false; + } + if (!haveallinfo) return; + + + // -- ok, we have all info. who has latest PG content summary? + version_t newest_update = pg->info.last_update; + int newest_update_osd = whoami; + version_t oldest_update = pg->info.last_update; + PG::PGPeer *newest_update_peer = 0; + + for (map::iterator it = pg->peers.begin(); + it != pg->peers.end(); + it++) { + PG::PGPeer *pgp = it->second; + assert(pgp->have_info()); + + if (pgp->info.last_update > newest_update) { + newest_update = pgp->info.last_update; + newest_update_osd = it->first; + newest_update_peer = pgp; + } + if (pgp->get_role() == 1 && + pgp->info.last_update < oldest_update) + oldest_update = pgp->info.last_update; + } + + if (newest_update_peer) { + // get contents from newest. + assert(!newest_update_peer->have_summary()); + + dout(10) << *pg << " newest PG on osd" << newest_update_osd + << " v " << newest_update + << ", querying summary" + << endl; + query_map[newest_update_osd][pg->get_pgid()] = 1; + return; + } else { + dout(10) << *pg << " i have the latest: " << pg->info.last_update << endl; + } + + + // -- find pg contents? + if (pg->info.last_complete < pg->info.last_update) { + if (pg->content_summary->missing > 0) { + // search! + dout(10) << *pg << " searching for PG contents, querying all peers" << endl; + bool didquery = false; + for (map::iterator it = pg->peers.begin(); + it != pg->peers.end(); + it++) { + PG::PGPeer *pgp = it->second; + if (pgp->have_summary()) continue; + query_map[it->first][pg->get_pgid()] = 1; + didquery = true; + } + + if (didquery) return; + } else { + dout(10) << *pg << " i have located all objects" << endl; + } + } else { + dout(10) << *pg << " i have all objects" << endl; + } + + + // -- distribute summary? + + // does anyone need it? + //if (oldest_update < pg->info.last_update) { + + // generate summary? + if (pg->content_summary == 0) + pg->generate_content_summary(); + + // distribute summary! + for (map::iterator it = pg->peers.begin(); + it != pg->peers.end(); + it++) { + PG::PGPeer *pgp = it->second; + if (pgp->get_role() != 1) continue; + + pgp->state_clear(PG::PGPeer::STATE_WAITING); + pgp->state_set(PG::PGPeer::STATE_ACTIVE); + + //if (pgp->info.last_update < pg->info.last_update) { + dout(10) << *pg << " sending summary to osd" << it->first << endl; + MOSDPGSummary *m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary); + messenger->send_message(m, MSG_ADDR_OSD(it->first)); + //} + } + //} else { + //dout(10) << *pg << " nobody needs the summary" << endl; + //} + + // plan my own recovery + pg->plan_recovery(); + + // i am active! + pg->state_set(PG::STATE_ACTIVE); + + take_waiters(pg->waiting_for_active); + +} + + + +/** PGNotify + * from non-primary to primary + * includes PGInfo. + */ + +void OSD::handle_pg_notify(MOSDPGNotify *m) +{ + dout(7) << "handle_pg_notify from " << m->get_source() << endl; + int from = MSG_ADDR_NUM(m->get_source()); + + if (!require_same_or_newer_map(m, m->get_epoch())) return; + + // look for unknown PGs i'm primary for + map< int, map > query_map; + + for (list::iterator it = m->get_pg_list().begin(); + it != m->get_pg_list().end(); + it++) { + pg_t pgid = it->pgid; + PG *pg = get_pg(pgid); + + if (!pg) { + pg = create_pg(pgid); + + int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting); + assert(nrep > 0); + assert(pg->acting[0] == whoami); + pg->info.same_primary_since = it->same_primary_since; + pg->set_role(0); + + dout(10) << " " << *pg << " is new, nrep=" << nrep << endl; + + // start peers + repeer(pg, query_map); // kick any waiters if (waiting_for_pg.count(pgid)) { take_waiters(waiting_for_pg[pgid]); waiting_for_pg.erase(pgid); } - } + } else { + // already had pg. - if (pg->is_peered()) { - // we're already peered. what do we do with this guy? - assert(0); - } + // peered with this guy specifically? + PG::PGPeer *pgp = pg->get_peer(from); + if (!pgp) { + int role = osdmap->get_pg_role(pg->get_pgid(), from); + pgp = pg->new_peer(from, role); + } - if (it->second > pg->get_last_any_complete()) - pg->mark_any_complete( it->second ); + pgp->info = *it; + pgp->state_set(PG::PGPeer::STATE_INFO); - // peered with this guy specifically? - PGPeer *pgp = pg->get_peer(from); - if (!pgp && - start_map[from].count(pg) == 0) { - dout(7) << " " << *pg << " primary needs to peer with residual notifier osd" << from << endl; - start_map[from][pg] = -1; + repeer(pg, query_map); } } - // start peers? - if (start_map.empty()) { - dout(7) << " no new peers" << endl; - } else { - for (map< int, map >::iterator pit = start_map.begin(); - pit != start_map.end(); - pit++) - peer_start(pit->first, pit->second); - } + do_queries(query_map); delete m; } -void OSD::handle_pg_peer(MOSDPGPeer *m) + +/** PGQuery + * from primary to replica | other + */ +void OSD::handle_pg_query(MOSDPGQuery *m) { + dout(7) << "handle_pg_query from " << m->get_source() << endl; int from = MSG_ADDR_NUM(m->get_source()); - dout(7) << "handle_pg_peer from osd" << from << endl; - - if (!require_current_map(m, m->get_version())) return; - - // go - MOSDPGPeerAck *ack = new MOSDPGPeerAck(osdmap->get_version()); - - for (list::iterator it = m->get_pg_list().begin(); - it != m->get_pg_list().end(); + + if (!require_same_or_newer_map(m, m->get_epoch())) return; + + map< int, list > notify_list; + + for (map::iterator it = m->pg_list.begin(); + it != m->pg_list.end(); it++) { - pg_t pgid = *it; - - // open PG + pg_t pgid = it->first; PG *pg = get_pg(pgid); - // dne? if (!pg) { // get active rush mapping vector acting; @@ -1223,7 +1262,8 @@ void OSD::handle_pg_peer(MOSDPGPeer *m) if (role < 0) { dout(10) << " pg " << hex << pgid << dec << " dne, and i am not an active replica" << endl; - ack->pg_dne.push_back(pgid); + PG::PGInfo empty(pgid); + notify_list[from].push_back(empty); continue; } @@ -1231,196 +1271,130 @@ void OSD::handle_pg_peer(MOSDPGPeer *m) pg->acting = acting; pg->set_role(role); - //if (m->get_version() == 1) pg->mark_complete(); // hack... need a more elegant solution - - dout(10) << " " << *pg << " dne (before), but i am role " << role << endl; - - // take any waiters - if (waiting_for_pg.count(pgid)) { - take_waiters(waiting_for_pg[pgid]); - waiting_for_pg.erase(pgid); - } + dout(10) << *pg << " dne (before), but i am role " << role << endl; } - // PEER - - // report back state and pg content - ack->pg_state[pgid].state = pg->get_state(); - ack->pg_state[pgid].last_complete = pg->get_last_complete(); - ack->pg_state[pgid].last_any_complete = pg->get_last_any_complete(); - pg->scan_local_objects(ack->pg_state[pgid].objects, store); // list my objects - - // i am now peered - pg->state_set(PG_STATE_PEERED); - pg->state_clear(PG_STATE_STRAY); - - if (m->get_version() == 1) { - pg->mark_complete( m->get_version() ); // it's a mkfs.. mark pg complete too - } - - dout(10) << "sending peer ack " << *pg << " " << ack->pg_state[pgid].objects.size() << " objects" << endl; + if (it->second) { + // summary + MOSDPGSummary *m; + if (pg->content_summary == 0) { + pg->generate_content_summary(); + m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary); + delete pg->content_summary; + pg->content_summary = 0; + } else { + m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary); + } + messenger->send_message(m, MSG_ADDR_OSD(from)); + } else { + // notify + notify_list[from].push_back(pg->info); + } } - - // reply - messenger->send_message(ack, - MSG_ADDR_OSD(from)); + + do_notifies(notify_list); delete m; } - -void OSD::handle_pg_peer_ack(MOSDPGPeerAck *m) +void OSD::handle_pg_summary(MOSDPGSummary *m) { + dout(7) << "handle_pg_summary from " << m->get_source() << endl; int from = MSG_ADDR_NUM(m->get_source()); - dout(7) << "handle_pg_peer_ack from osd" << from << endl; - - if (!require_current_map(m, m->get_version())) return; - // pg_dne first - for (list::iterator it = m->pg_dne.begin(); - it != m->pg_dne.end(); - it++) { - PG *pg = get_pg(*it); - assert(pg); + if (!require_same_or_newer_map(m, m->get_epoch())) return; - dout(10) << " " << *pg << " dne on osd" << from << endl; - - PGPeer *pgp = pg->get_peer(from); - if (pgp) { - pg->remove_peer(from); - } else { - dout(10) << " weird, i didn't have it!" << endl; // multiple lagged peer requests? - assert(0); // not until peer requests span epochs! - } - } + map< int, map > query_map; // peer -> PG -> get_summary_since - // pg_state - for (map::iterator it = m->pg_state.begin(); - it != m->pg_state.end(); - it++) { - PG *pg = get_pg(it->first); - assert(pg); + pg_t pgid = m->get_pgid(); + PG::PGContentSummary *sum = m->get_summary(); + PG *pg = get_pg(pgid); + assert(pg); - dout(10) << " " << *pg << " osd" << from << " remote state " << it->second.state - << " w/ " << it->second.objects.size() << " objects" - << ", last_complete " << it->second.last_complete - << ", last_any_complete " << it->second.last_any_complete + if (pg->is_primary()) { + // PRIMARY + dout(10) << *pg << " got summary from osd" << from << endl; - - PGPeer *pgp = pg->get_peer(from); + PG::PGPeer *pgp = pg->get_peer(from); assert(pgp); + assert(pgp->content_summary == 0); // ? + pgp->content_summary = sum; + pgp->state_set(PG::PGPeer::STATE_SUMMARY); - pg->mark_any_complete( it->second.last_any_complete ); - - pgp->last_complete = it->second.last_complete; - pgp->objects = it->second.objects; - pgp->state_set(PG_PEER_STATE_ACTIVE); - - // fully peered? - bool fully = true; - for (map::iterator pit = pg->get_peers().begin(); - pit != pg->get_peers().end(); - pit++) { - dout(10) << " " << *pg << " peer osd" << pit->first << " state " << pit->second->get_state() << endl; - if (!pit->second->is_active()) fully = false; + if (pgp->info.last_update > pg->info.last_update) { + // start new summary? + if (pg->content_summary == 0) + pg->content_summary = new PG::PGContentSummary(); + + // assimilate summary info! + list::iterator myp = pg->content_summary->ls.begin(); + list::iterator newp = sum->ls.begin(); + + while (newp != sum->ls.end()) { + if (myp == pg->content_summary->ls.end()) { + // add new item + pg->content_summary->ls.insert(myp, 1, *newp); + pg->info.last_update = newp->version; + if (myp->osd == from) { + // remote + pg->content_summary->remote++; + } else { + // missing. + myp->osd = -1; + pg->content_summary->missing++; + } + myp++; + assert(myp == pg->content_summary->ls.end()); + } else { + assert(myp->oid == newp->oid && myp->version == newp->version); + if (myp->osd == -1 && newp->osd == from) { + myp->osd = from; // found! + pg->content_summary->remote++; + pg->content_summary->missing--; + } + myp++; + } + newp++; + } + assert(myp == pg->content_summary->ls.end()); } - - if (fully) { - if (!pg->is_peered()) { - // now we're peered! - pg->mark_peered(); - // waiters? - take_waiters(pg->waiting_for_peered); + repeer(pg, query_map); + + } else { + // REPLICA + dout(10) << *pg << " got summary from primary osd" << from + << endl; + assert(from == pg->acting[0]); - dout(10) << " " << *pg << " fully peered, analyzing" << endl; - plan_recovery(pg); - do_recovery(pg); - } else { - // we're already peered. - // what's the use of this new guy? - - } - } - } + // copy summary. FIXME. + if (pg->content_summary == 0) + pg->content_summary = new PG::PGContentSummary(); + *pg->content_summary = *sum; - // done + // i'm now active! + pg->state_set(PG::STATE_ACTIVE); + + // take any waiters + take_waiters(pg->waiting_for_active); + + // initiate any recovery? + pg->plan_recovery(); + } + delete m; } -void OSD::handle_pg_update(MOSDPGUpdate *m) -{ - int from = MSG_ADDR_NUM(m->get_source()); - dout(7) << "handle_pg_update on " << hex << m->get_pgid() << dec << " from osd" << from - << " complete=" << m->is_complete() - << " last_any_complete=" << m->get_last_any_complete() - << endl; - - PG *pg = get_pg(m->get_pgid()); - if (!require_current_pg_primary(m, m->get_version(), pg)) return; - // update - if (!pg) { - dout(7) << "don't have pg " << hex << m->get_pgid() << dec << endl; - } else { - // update my info. --what info? - //pg->assim_info( m->get_pginfo() ); - - // complete? - if (m->is_complete()) { - pg->mark_complete( osdmap->get_version() ); - } - if (m->get_last_any_complete()) - pg->mark_any_complete( m->get_last_any_complete() ); - - pg->store(store); - } - - delete m; -} // RECOVERY -void OSD::plan_recovery(PG *pg) -{ - version_t current_version = osdmap->get_version(); - - list complete_peers; - pg->plan_recovery(store, current_version, complete_peers); - if (current_version > 1) { - for (list::iterator it = complete_peers.begin(); - it != complete_peers.end(); - it++) { - PGPeer *p = *it; - dout(7) << " " << *pg << " telling peer osd" << p->get_peer() << " they are complete" << endl; - messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()), - MSG_ADDR_OSD(p->get_peer())); - } - } else { - dout(7) << "not sending PGUpdates since this is a mkfs (current_version==1)" << endl; - } -} -void OSD::do_recovery(PG *pg) -{ - // recover - if (!pg->is_complete(osdmap->get_version())) { - pg_pull(pg, max_recovery_ops); - } - - // replicate - if (pg->is_complete( osdmap->get_version() )) { - if (!pg->objects_unrep.empty()) - pg_push(pg, max_recovery_ops); - if (!pg->objects_stray.empty()) - pg_clean(pg, max_recovery_ops); - } -} // pull @@ -1429,79 +1403,76 @@ void OSD::pg_pull(PG *pg, int maxops) { int ops = pg->num_active_ops(); - dout(7) << "pg_pull pg " << hex << pg->get_pgid() << dec << " " << pg->objects_missing.size() << " to do, " << ops << "/" << maxops << " active" << endl; + dout(7) << "pg_pull pg " << *pg + << " " << pg->objects_missing.size() << " to do, " + << ops << "/" << maxops << " active" << endl; - while (ops < maxops) { - object_t oid; - if (!pg->get_next_pull(oid)) { - dout(7) << "pg_pull done " << *pg << endl; - break; - } - pull_replica(pg, oid); + while (ops < maxops && + !pg->recovery_queue.empty()) { + map::iterator first = pg->recovery_queue.upper_bound(pg->requested_through); + + pull_replica(pg, first->second); + pg->requested_through = first->first; + ops++; } } -void OSD::pull_replica(PG *pg, object_t oid) +void OSD::pull_replica(PG *pg, PG::ObjectInfo& oi) { - version_t v = pg->objects_missing_v[oid]; - - // choose a peer - set::iterator pit = pg->objects_missing[oid].begin(); - PGPeer *p = pg->get_peer(*pit); - dout(7) << "pull_replica " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl; - - // add to fetching list - pg->pulling(oid, v, p); + // get peer + dout(7) << "pull_replica " << hex << oi.oid << dec + << " v " << oi.version + << " from osd" << oi.osd << endl; // send op - __uint64_t tid = ++last_tid; + tid_t tid = ++last_tid; MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(), - oid, p->pg->get_pgid(), - osdmap->get_version(), + oi.oid, pg->get_pgid(), + osdmap->get_epoch(), OSD_OP_REP_PULL); - op->set_version(v); + op->set_version(oi.version); op->set_pg_role(-1); // whatever, not 0 - messenger->send_message(op, MSG_ADDR_OSD(p->get_peer())); + messenger->send_message(op, MSG_ADDR_OSD(oi.osd)); - // register - pull_ops[tid] = p; + // take note + pull_ops[tid] = oi; + pg->objects_pulling[oi.oid] = oi; } void OSD::op_rep_pull(MOSDOp *op) { long got = 0; - //lock_object(op->get_oid()); - { - dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl; - - // get object size - struct stat st; - int r = store->stat(op->get_oid(), &st); - assert(r == 0); - - // check version - version_t v = 0; - store->getattr(op->get_oid(), "version", &v, sizeof(v)); - assert(v == op->get_version()); - - // read - bufferlist bl; - got = store->read(op->get_oid(), - st.st_size, 0, - bl); - assert(got == st.st_size); - - // reply - MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); - reply->set_result(0); - reply->set_data(bl); - reply->set_length(got); - reply->set_offset(0); - - messenger->send_message(reply, op->get_asker()); - } - //unlock_object(op->get_oid()); + + dout(7) << "rep_pull on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl; + + // get object size + struct stat st; + int r = store->stat(op->get_oid(), &st); + assert(r == 0); + + // check version + version_t v = 0; + store->getattr(op->get_oid(), "version", &v, sizeof(v)); + assert(v >= op->get_version()); + + // read + bufferlist bl; + got = store->read(op->get_oid(), + st.st_size, 0, + bl); + assert(got == st.st_size); + + // reply + MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); + reply->set_result(0); + reply->set_data(bl); + reply->set_length(got); + reply->set_offset(0); + reply->set_version(v); + + messenger->send_message(reply, op->get_asker()); + delete op; logger->inc("r_pull"); @@ -1515,303 +1486,57 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op) dout(7) << "rep_pull_reply " << hex << o << dec << " v " << v << " size " << op->get_length() << endl; - PGPeer *p = pull_ops[op->get_tid()]; - PG *pg = p->pg; - assert(p); // FIXME: how will this work? - assert(p->is_pulling(o)); - assert(p->pulling_version(o) == v); + PG::ObjectInfo oi = pull_ops[op->get_tid()]; + assert(v <= op->get_version()); + + PG *pg = get_pg(op->get_pg()); + assert(pg); + assert(pg->objects_pulling.count(oi.oid)); // write it and add it to the PG store->write(o, op->get_length(), 0, op->get_data(), true); - p->pg->add_object(store, o); - + store->collection_add(pg->get_pgid(), o); store->setattr(o, "version", &v, sizeof(v)); // close out pull op. pull_ops.erase(op->get_tid()); + pg->objects_pulling.erase(o); - pg->pulled(o, v, p); + // bottom of queue? + map::iterator bottom = pg->recovery_queue.begin(); + if (bottom->first == oi.version) + pg->info.last_complete = bottom->first; + pg->recovery_queue.erase(oi.version); // now complete? - if (pg->objects_missing.empty()) { - pg->mark_complete(osdmap->get_version()); - - // distribute new last_any_complete - dout(7) << " " << *pg << " now complete, updating last_any_complete on peers" << endl; - for (map::iterator it = pg->peers.begin(); - it != pg->peers.end(); - it++) { - PGPeer *p = it->second; - messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), false, osdmap->get_version()), - MSG_ADDR_OSD(p->get_peer())); - } + if (pg->recovery_queue.empty()) { + assert(pg->info.last_complete == pg->info.last_update); + + // tell primary? + dout(7) << " " << *pg << " recovery complete, telling primary" << endl; + list ls; + ls.push_back(pg->info); + messenger->send_message(new MOSDPGNotify(osdmap->get_epoch(), + ls), + MSG_ADDR_OSD(pg->get_primary())); + } else { + // more? + pg->do_recovery(); } // finish waiters if (pg->waiting_for_missing_object.count(o)) take_waiters(pg->waiting_for_missing_object[o]); - // more? - do_recovery(pg); - delete op; } -// push -void OSD::pg_push(PG *pg, int maxops) -{ - int ops = pg->num_active_ops(); - - dout(7) << "pg_push pg " << hex << pg->get_pgid() << dec << " " << pg->objects_unrep.size() << " objects, " << ops << "/" << maxops << " active ops" << endl; - - while (ops < maxops) { - object_t oid; - if (!pg->get_next_push(oid)) { - dout(7) << "pg_push done " << *pg << endl; - break; - } - - push_replica(pg, oid); - ops++; - } -} - -void OSD::push_replica(PG *pg, object_t oid) -{ - version_t v = 0; - store->getattr(oid, "version", &v, sizeof(v)); - assert(v > 0); - - set& peers = pg->objects_unrep[oid]; - - // load object content - struct stat st; - store->stat(oid, &st); - bufferlist bl; - store->read(oid, st.st_size, 0, bl); - assert(bl.length() == st.st_size); - - dout(7) << "push_replica " << hex << oid << dec << " v " << v << " to osds " << peers << " size " << st.st_size << endl; - - for (set::iterator pit = peers.begin(); - pit != peers.end(); - pit++) { - PGPeer *p = pg->get_peer(*pit); - assert(p); - - // add to list - pg->pushing(oid, v, p); - - // send op - __uint64_t tid = ++last_tid; - MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(), - oid, pg->get_pgid(), - osdmap->get_version(), - OSD_OP_REP_PUSH); - op->set_version(v); - op->set_pg_role(-1); // whatever, not 0 - - // include object content - //op->set_data(bl); // no no bad, will modify bl - op->get_data() = bl; // _copy_ bufferlist, we may have multiple destinations! - op->set_length(st.st_size); - op->set_offset(0); - - messenger->send_message(op, MSG_ADDR_OSD(*pit)); - - // register - push_ops[tid] = p; - } - -} - -void OSD::op_rep_push(MOSDOp *op) -{ - //lock_object(op->get_oid()); - { - dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl; - - PG *pg = get_pg(op->get_pg()); - assert(pg); - - // exists? - if (store->exists(op->get_oid())) { - store->truncate(op->get_oid(), 0); - - version_t ov = 0; - store->getattr(op->get_oid(), "version", &ov, sizeof(ov)); - assert(ov <= op->get_version()); - } - - logger->inc("r_push"); - logger->inc("r_pushb", op->get_length()); - - // write out buffers - int r = store->write(op->get_oid(), - op->get_length(), 0, - op->get_data(), - false); // FIXME - pg->add_object(store, op->get_oid()); - assert(r >= 0); - - // set version - version_t v = op->get_version(); - store->setattr(op->get_oid(), "version", &v, sizeof(v)); - - // reply - MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); - messenger->send_message(reply, op->get_asker()); - - } - //unlock_object(op->get_oid()); - delete op; -} - -void OSD::op_rep_push_reply(MOSDOpReply *op) -{ - object_t oid = op->get_oid(); - version_t v = op->get_version(); - - dout(7) << "rep_push_reply " << hex << oid << dec << endl; - - PGPeer *p = push_ops[op->get_tid()]; - PG *pg = p->pg; - assert(p); // FIXME: how will this work? - assert(p->is_pushing(oid)); - assert(p->pushing_version(oid) == v); - - // close out push op. - push_ops.erase(op->get_tid()); - pg->pushed(oid, v, p); - - if (p->is_complete()) { - dout(7) << " telling replica they are complete" << endl; - messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()), - MSG_ADDR_OSD(p->get_peer())); - } - - // anybody waiting on this object? - if (pg->waiting_for_clean_object.count(oid) && - pg->objects_unrep.count(oid) == 0) { - dout(7) << "kicking waiter on now replicated object " << hex << oid << dec << endl; - take_waiters(pg->waiting_for_clean_object[oid]); - pg->waiting_for_clean_object.erase(oid); - } - - // more? - do_recovery(pg); - - delete op; -} - - -// clean - -void OSD::pg_clean(PG *pg, int maxops) -{ - int ops = pg->num_active_ops(); - - dout(7) << "pg_clean pg " << hex << pg->get_pgid() << dec << " " << pg->objects_stray.size() << " stray, " << ops << "/" << maxops << " active ops" << endl; - - while (ops < maxops) { - object_t oid; - if (!pg->get_next_remove(oid)) { - dout(7) << "pg_clean done " << *pg << endl; - break; - } - - remove_replica(pg, oid); - ops++; - } -} - -void OSD::remove_replica(PG *pg, object_t oid) -{ - dout(7) << "remove_replica " << hex << oid << dec << endl;//" v " << v << " from osd" << p->get_peer() << endl; - - map& stray = pg->objects_stray[oid]; - for (map::iterator it = stray.begin(); - it != stray.end(); - it++) { - PGPeer *p = pg->get_peer(it->first); - assert(p); - const version_t v = it->second; - - // add to list - pg->removing(oid, v, p); - - // send op - __uint64_t tid = ++last_tid; - MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(), - oid, p->pg->get_pgid(), - osdmap->get_version(), - OSD_OP_REP_REMOVE); - op->set_version(v); - op->set_pg_role(-1); // whatever, not 0 - messenger->send_message(op, MSG_ADDR_OSD(p->get_peer())); - - // register - remove_ops[tid] = p; - } -} - -void OSD::op_rep_remove(MOSDOp *op) -{ - //lock_object(op->get_oid()); - { - dout(7) << "rep_remove on " << hex << op->get_oid() << dec << " v " << op->get_version() << endl; - - // sanity checks - assert(store->exists(op->get_oid())); - - version_t v = 0; - store->getattr(op->get_oid(), "version", &v, sizeof(v)); - assert(v == op->get_version()); - - // remove - store->collection_remove(op->get_pg(), op->get_oid()); - int r = store->remove(op->get_oid()); - assert(r == 0); - - // reply - messenger->send_message(new MOSDOpReply(op, r, osdmap, true), - op->get_asker()); - } - //unlock_object(op->get_oid()); - delete op; -} - -void OSD::op_rep_remove_reply(MOSDOpReply *op) -{ - object_t oid = op->get_oid(); - version_t v = op->get_version(); - dout(7) << "rep_remove_reply " << hex << oid << dec << endl; - - PGPeer *p = remove_ops[op->get_tid()]; - PG *pg = p->pg; - assert(p); // FIXME: how will this work? - assert(p->is_removing(oid)); - assert(p->removing_version(oid) == v); - - // close out push op. - remove_ops.erase(op->get_tid()); - pg->removed(oid, v, p); - - if (p->is_complete()) { - dout(7) << " telling replica they are complete" << endl; - messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()), - MSG_ADDR_OSD(p->get_peer())); - } - - // more? - do_recovery(pg); - - delete op; -} +// op_rep_modify +// commit (to disk) callback class C_OSD_RepModifyCommit : public Context { public: OSD *osd; @@ -1836,16 +1561,23 @@ void OSD::op_rep_modify_commit(MOSDOp *op) } } +// process a modification operation + void OSD::op_rep_modify(MOSDOp *op) { - // when we introduce unordered messaging.. FIXME object_t oid = op->get_oid(); + // check current version version_t ov = 0; if (store->exists(oid)) store->getattr(oid, "version", &ov, sizeof(ov)); - if (op->get_old_version() != ov) + + if (op->get_old_version() != ov) { + assert(ov < op->get_old_version()); + + // FIXME: block until i get the updated version. dout(0) << "rep_modify old version is " << ov << " msg sez " << op->get_old_version() << endl; + } assert(op->get_old_version() == ov); // PG @@ -1864,7 +1596,7 @@ void OSD::op_rep_modify(MOSDOp *op) assert(op->get_data().length() == op->get_length()); oncommit = new C_OSD_RepModifyCommit(this, op); r = apply_write(op, op->get_version(), oncommit); - if (ov == 0) pg->add_object(store, oid); + store->collection_add(pg->get_pgid(), oid); logger->inc("r_wr"); logger->inc("r_wrb", op->get_length()); @@ -1877,6 +1609,11 @@ void OSD::op_rep_modify(MOSDOp *op) r = store->truncate(oid, op->get_offset()); } else assert(0); + // update pg version too + pg->info.last_update = op->get_version(); + if (pg->info.last_complete == ov) + pg->info.last_complete = op->get_version(); + if (oncommit) { // ack MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false); @@ -1907,9 +1644,9 @@ void OSD::handle_op(MOSDOp *op) // REGULAR OP (non-replication) // is our map version up to date? - if (op->get_map_version() > osdmap->get_version()) { + if (op->get_map_epoch() > osdmap->get_epoch()) { // op's is newer - dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl; + dout(7) << "op map " << op->get_map_epoch() << " > " << osdmap->get_epoch() << endl; wait_for_new_map(op); return; } @@ -1919,7 +1656,8 @@ void OSD::handle_op(MOSDOp *op) if (acting_primary != whoami) { if (acting_primary >= 0) { - dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl; + dout(7) << " acting primary is " << acting_primary + << ", forwarding" << endl; messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0); logger->inc("fwd"); } else { @@ -1933,86 +1671,69 @@ void OSD::handle_op(MOSDOp *op) // proxy? if (!pg) { - dout(7) << "hit non-existent pg " << hex << op->get_pg() << dec << ", waiting" << endl; + dout(7) << "hit non-existent pg " << hex << pgid << dec + << ", waiting" << endl; waiting_for_pg[pgid].push_back(op); return; } else { dout(7) << "handle_op " << op << " in " << *pg << endl; - + // must be peered. - if (!pg->is_peered()) { - dout(7) << "op_write " << *pg << " not peered (yet)" << endl; - pg->waiting_for_peered.push_back(op); + if (!pg->is_active()) { + dout(7) << "op_write " << *pg << " not active (yet)" << endl; + pg->waiting_for_active.push_back(op); return; } const object_t oid = op->get_oid(); - if (!pg->is_complete( osdmap->get_version() )) { + if (!pg->is_complete()) { // consult PG object map if (pg->objects_missing.count(oid)) { // need to pull - dout(7) << "need to pull object " << hex << oid << dec << endl; + version_t v = pg->objects_missing[oid]; + dout(7) << "need to pull object " << hex << oid << dec + << " v " << v << endl; if (!pg->objects_pulling.count(oid)) - pull_replica(pg, oid); + pull_replica(pg, pg->recovery_queue[v]); pg->waiting_for_missing_object[oid].push_back(op); return; } } - if (!pg->is_clean() && - (op->get_op() == OSD_OP_WRITE || - op->get_op() == OSD_OP_TRUNCATE || - op->get_op() == OSD_OP_DELETE)) { - // exists but not replicated? - if (pg->objects_unrep.count(oid)) { - dout(7) << "object " << hex << oid << dec << " in " << *pg - << " exists but not clean" << endl; - pg->waiting_for_clean_object[oid].push_back(op); - if (pg->objects_pushing.count(oid) == 0) - push_replica(pg, oid); - return; - } - - // just stray? - // FIXME: this is a bit to aggressive; includes inactive peers - if (pg->objects_stray.count(oid)) { - dout(7) << "object " << hex << oid << dec << " in " << *pg - << " dne but is not clean" << endl; - pg->waiting_for_clean_object[oid].push_back(op); - if (pg->objects_removing.count(oid) == 0) - remove_replica(pg, oid); - return; - } - } + // okay! } } else { // REPLICATION OP if (pg) { - dout(7) << "handle_rep_op " << op << " in " << *pg << endl; + dout(7) << "handle_rep_op " << op + << " in " << *pg << endl; } else { - dout(7) << "handle_rep_op " << op << " in pgid " << op->get_pg() << endl; + assert(0); + dout(7) << "handle_rep_op " << op + << " in pgid " << hex << pgid << dec << endl; } + // check osd map - if (op->get_map_version() != osdmap->get_version()) { + if (op->get_map_epoch() != osdmap->get_epoch()) { // make sure source is still primary int curprimary = osdmap->get_pg_acting_primary(op->get_pg()); int myrole = osdmap->get_pg_acting_role(op->get_pg(), whoami); if (curprimary != MSG_ADDR_NUM(op->get_source()) || myrole <= 0) { - dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary changed on pg " << hex << op->get_pg() << dec << endl; + dout(5) << "op map " << op->get_map_epoch() << " != " << osdmap->get_epoch() << ", primary changed on pg " << hex << op->get_pg() << dec << endl; MOSDOpReply *fail = new MOSDOpReply(op, -1, osdmap, false); messenger->send_message(fail, op->get_asker()); return; } else { - dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary same on pg " << hex << op->get_pg() << dec << endl; + dout(5) << "op map " << op->get_map_epoch() << " != " << osdmap->get_epoch() << ", primary same on pg " << hex << op->get_pg() << dec << endl; } } } - + if (g_conf.osd_maxthreads < 1) { do_op(op); // do it now } else { @@ -2039,7 +1760,7 @@ void OSD::enqueue_op(object_t oid, MOSDOp *op) } /* - * dequeue called in worker thread, without osd_lock + * NOTE: dequeue called in worker thread, without osd_lock */ void OSD::dequeue_op(object_t oid) { @@ -2056,7 +1777,8 @@ void OSD::dequeue_op(object_t oid) op = ls.front(); ls.pop_front(); - dout(10) << "dequeue_op " << hex << oid << dec << " op " << op << ", " << ls.size() << " / " << (pending_ops-1) << " more pending" << endl; + dout(10) << "dequeue_op " << hex << oid << dec << " op " << op << ", " + << ls.size() << " / " << (pending_ops-1) << " more pending" << endl; if (ls.empty()) op_queue.erase(oid); @@ -2066,7 +1788,7 @@ void OSD::dequeue_op(object_t oid) // do it do_op(op); - // unlock + // unlock oid unlock_object(oid); // finish @@ -2088,10 +1810,8 @@ void OSD::dequeue_op(object_t oid) -/* - * do an op - * - * object lock may be held (if multithreaded) +/** do_op - do an op + * object lock will be held (if multithreaded) * osd_lock NOT held. */ void OSD::do_op(MOSDOp *op) @@ -2108,12 +1828,6 @@ void OSD::do_op(MOSDOp *op) case OSD_OP_REP_PULL: op_rep_pull(op); break; - case OSD_OP_REP_PUSH: - op_rep_push(op); - break; - case OSD_OP_REP_REMOVE: - op_rep_remove(op); - break; // replica ops case OSD_OP_REP_WRITE: @@ -2134,6 +1848,7 @@ void OSD::do_op(MOSDOp *op) op_stat(op); break; case OSD_OP_WRITE: + case OSD_OP_ZERO: case OSD_OP_DELETE: case OSD_OP_TRUNCATE: op_modify(op); @@ -2290,13 +2005,13 @@ void OSD::issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd) dout(7) << " issue_replica_op in " << *pg << " o " << hex << oid << dec << " to osd" << osd << endl; - // forward the write + // forward the write/update/whatever __uint64_t tid = ++last_tid; MOSDOp *wr = new MOSDOp(tid, messenger->get_myaddr(), oid, pg->get_pgid(), - osdmap->get_version(), + osdmap->get_epoch(), 100+op->get_op()); wr->get_data() = op->get_data(); // copy bufferlist wr->set_length(op->get_length()); @@ -2363,8 +2078,8 @@ void OSD::put_repop(OSDReplicaOp *repop) class C_OSD_WriteCommit : public Context { public: OSD *osd; - OSDReplicaOp *repop; - C_OSD_WriteCommit(OSD *o, OSDReplicaOp *op) : osd(o), repop(op) {} + OSD::OSDReplicaOp *repop; + C_OSD_WriteCommit(OSD *o, OSD::OSDReplicaOp *op) : osd(o), repop(op) {} void finish(int r) { osd->op_modify_commit(repop); } @@ -2385,92 +2100,90 @@ void OSD::op_modify(MOSDOp *op) char *opname = 0; if (op->get_op() == OSD_OP_WRITE) opname = "op_write"; + if (op->get_op() == OSD_OP_ZERO) opname = "op_zero"; if (op->get_op() == OSD_OP_DELETE) opname = "op_delete"; if (op->get_op() == OSD_OP_TRUNCATE) opname = "op_truncate"; - //lock_object(oid); + // version? clean? + version_t ov = 0; // 0 == dne (yet) + store->getattr(oid, "version", &ov, sizeof(ov)); + + //version_t nv = messenger->get_lamport();//op->get_lamport_recv_stamp(); + version_t nv = ov + 1; //FIXME later + + if (nv <= ov) + cerr << opname << " " << hex << oid << dec << " ov " << ov << " nv " << nv + << " ... wtf? msg sent " << op->get_lamport_send_stamp() + << " recv " << op->get_lamport_recv_stamp() << endl; + assert(nv > ov); + + dout(12) << " " << opname << " " << hex << oid << dec << " v " << nv << " off " << op->get_offset() << " len " << op->get_length() << endl; + + // issue replica writes + OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov); + repop->start = g_clock.now(); + repop->waitfor_ack[0] = whoami; // will need local ack, commit + repop->waitfor_commit[0] = whoami; + + pg_t pgid = op->get_pg(); + PG *pg; + osd_lock.Lock(); + repop->lock.Lock(); { - // version? clean? - version_t ov = 0; // 0 == dne (yet) - store->getattr(oid, "version", &ov, sizeof(ov)); - - //version_t nv = messenger->get_lamport();//op->get_lamport_recv_stamp(); - version_t nv = ov + 1; //FIXME later - - if (nv <= ov) - cerr << opname << " " << hex << oid << dec << " ov " << ov << " nv " << nv - << " ... wtf? msg sent " << op->get_lamport_send_stamp() - << " recv " << op->get_lamport_recv_stamp() << endl; - assert(nv > ov); - - dout(12) << " " << opname << " " << hex << oid << dec << " v " << nv << " off " << op->get_offset() << " len " << op->get_length() << endl; - - // issue replica writes - OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov); - repop->start = g_clock.now(); - repop->waitfor_ack[0] = whoami; // will need local ack, commit - repop->waitfor_commit[0] = whoami; - - PG *pg; - osd_lock.Lock(); - repop->lock.Lock(); - { - pg = get_pg(op->get_pg()); - for (unsigned i=1; iacting.size(); i++) { - issue_replica_op(pg, repop, pg->acting[i]); - } + pg = get_pg(pgid); + for (unsigned i=1; iacting.size(); i++) { + issue_replica_op(pg, repop, pg->acting[i]); } - repop->lock.Unlock(); - osd_lock.Unlock(); + } + repop->lock.Unlock(); + osd_lock.Unlock(); + + // pre-ack + //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false); + //messenger->send_message(reply, op->get_asker()); + + // do it + int r; + if (op->get_op() == OSD_OP_WRITE) { + // write + assert(op->get_data().length() == op->get_length()); + Context *oncommit = new C_OSD_WriteCommit(this, repop); + r = apply_write(op, nv, oncommit); - // pre-ack - //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false); - //messenger->send_message(reply, op->get_asker()); + // put new object in proper collection + store->collection_add(pgid, oid); // FIXME : be careful w/ locking - // do it - int r; - if (op->get_op() == OSD_OP_WRITE) { - // write - assert(op->get_data().length() == op->get_length()); - Context *oncommit = new C_OSD_WriteCommit(this, repop); - r = apply_write(op, nv, oncommit); - - // put new object in proper collection - if (ov == 0) - pg->add_object(store, oid); // FIXME : be careful w/ locking - - get_repop(repop); - assert(repop->waitfor_ack.count(0)); - repop->waitfor_ack.erase(0); - put_repop(repop); - - logger->inc("c_wr"); - logger->inc("c_wrb", op->get_length()); - } - else if (op->get_op() == OSD_OP_TRUNCATE) { - // truncate - r = store->truncate(oid, op->get_offset()); - get_repop(repop); - assert(repop->waitfor_ack.count(0)); - assert(repop->waitfor_commit.count(0)); - repop->waitfor_ack.erase(0); - repop->waitfor_commit.erase(0); - put_repop(repop); - } - else if (op->get_op() == OSD_OP_DELETE) { - // delete - pg->remove_object(store, op->get_oid()); // be careful with locking - r = store->remove(oid); - get_repop(repop); - assert(repop->waitfor_ack.count(0)); - assert(repop->waitfor_commit.count(0)); - repop->waitfor_ack.erase(0); - repop->waitfor_commit.erase(0); - put_repop(repop); - } - else assert(0); + get_repop(repop); + assert(repop->waitfor_ack.count(0)); + repop->waitfor_ack.erase(0); + put_repop(repop); + logger->inc("c_wr"); + logger->inc("c_wrb", op->get_length()); + } + else if (op->get_op() == OSD_OP_TRUNCATE) { + // truncate + r = store->truncate(oid, op->get_offset()); + get_repop(repop); + assert(repop->waitfor_ack.count(0)); + assert(repop->waitfor_commit.count(0)); + repop->waitfor_ack.erase(0); + repop->waitfor_commit.erase(0); + put_repop(repop); } + else if (op->get_op() == OSD_OP_DELETE) { + // delete + store->collection_remove(pgid, oid); // be careful with locking + r = store->remove(oid); + get_repop(repop); + assert(repop->waitfor_ack.count(0)); + assert(repop->waitfor_commit.count(0)); + repop->waitfor_ack.erase(0); + repop->waitfor_commit.erase(0); + put_repop(repop); + } + else assert(0); + //unlock_object(oid); } diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index fb27a4c6502c3..765ee929a9d70 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -11,8 +11,6 @@ * */ - - #ifndef __OSD_H #define __OSD_H @@ -21,8 +19,8 @@ #include "common/Mutex.h" #include "common/ThreadPool.h" +#include "Objecter.h" #include "ObjectStore.h" - #include "PG.h" #include @@ -36,60 +34,65 @@ using namespace __gnu_cxx; class Messenger; class Message; -class OSDReplicaOp { - public: - class MOSDOp *op; - Mutex lock; - map<__uint64_t,int> waitfor_ack; - map<__uint64_t,int> waitfor_commit; - utime_t start; - bool cancel; - bool sent_ack, sent_commit; - set osds; - version_t new_version, old_version; - - OSDReplicaOp(class MOSDOp *o, version_t nv, version_t ov) : - op(o), - //local_ack(false), local_commit(false), - cancel(false), - sent_ack(false), sent_commit(false), - new_version(nv), old_version(ov) - { } - bool can_send_ack() { return !sent_ack && !sent_commit && //!cancel && - waitfor_ack.empty(); } - bool can_send_commit() { return !sent_commit && //!cancel && - waitfor_ack.empty() && waitfor_commit.empty(); } - bool can_delete() { return waitfor_ack.empty() && waitfor_commit.empty(); } -}; -inline ostream& operator<<(ostream& out, OSDReplicaOp& repop) -{ - out << "repop(wfack=" << repop.waitfor_ack << " wfcommit=" << repop.waitfor_commit; - //if (repop.local_ack) out << " local_ack"; - //if (repop.local_commit) out << " local_commit"; - if (repop.cancel) out << " cancel"; - out << " op=" << *(repop.op); - out << " repop=" << &repop; - out << ")"; - return out; -} +/** + * + */ class OSD : public Dispatcher { +public: + + /** OSDReplicaOp + * state associated with an in-progress replicated update. + */ + class OSDReplicaOp { + public: + class MOSDOp *op; + Mutex lock; + map<__uint64_t,int> waitfor_ack; + map<__uint64_t,int> waitfor_commit; + + utime_t start; + + bool cancel; + bool sent_ack, sent_commit; + + set osds; + version_t new_version, old_version; + + OSDReplicaOp(class MOSDOp *o, version_t nv, version_t ov) : + op(o), + //local_ack(false), local_commit(false), + cancel(false), + sent_ack(false), sent_commit(false), + new_version(nv), old_version(ov) + { } + bool can_send_ack() { return !sent_ack && !sent_commit && //!cancel && + waitfor_ack.empty(); } + bool can_send_commit() { return !sent_commit && //!cancel && + waitfor_ack.empty() && waitfor_commit.empty(); } + bool can_delete() { return waitfor_ack.empty() && waitfor_commit.empty(); } + }; + + /** OSD **/ protected: Messenger *messenger; int whoami; - char dev_path[100]; - - class ObjectStore *store; - class HostMonitor *monitor; class Logger *logger; int max_recovery_ops; + // local store + char dev_path[100]; + class ObjectStore *store; + + // failure monitoring + class HostMonitor *monitor; + // global lock Mutex osd_lock; @@ -106,11 +109,6 @@ class OSD : public Dispatcher { finished.splice(finished.end(), ls); } - // -- objects -- - //int read_onode(onode_t& onode); - //int write_onode(onode_t& onode); - - // -- ops -- class ThreadPool *threadpool; hash_map > op_queue; @@ -118,33 +116,32 @@ class OSD : public Dispatcher { bool waiting_for_no_ops; Cond no_pending_ops; Cond op_queue_cond; - - void wait_for_no_ops(); - - int apply_write(MOSDOp *op, version_t v, - Context *oncommit = 0); - - - void get_repop(OSDReplicaOp*); - void put_repop(OSDReplicaOp*); // will send ack/commit msgs, and delete as necessary. - void do_op(class MOSDOp *m); + void wait_for_no_ops(); - public: void enqueue_op(object_t oid, MOSDOp *op); void dequeue_op(object_t oid); static void static_dequeueop(OSD *o, object_t oid) { o->dequeue_op(oid); }; + void do_op(class MOSDOp *m); // actually do it + + int apply_write(MOSDOp *op, version_t v, + Context *oncommit = 0); + + + + friend class PG; + protected: // -- osd map -- class OSDMap *osdmap; list waiting_for_osdmap; map osdmaps; - - void update_map(bufferlist& state, bool mkfs=false); + + void update_map(bufferlist& state); void wait_for_new_map(Message *m); void handle_osd_map(class MOSDMap *m); OSDMap *get_osd_map(version_t v); @@ -158,13 +155,12 @@ class OSD : public Dispatcher { // PG hash_map pg_map; - void get_pg_list(list& ls); bool pg_exists(pg_t pg); PG *create_pg(pg_t pg); // create new PG PG *get_pg(pg_t pg); // return existing PG, load state from store (if needed) - void close_pg(pg_t pg); // close in-memory state - void remove_pg(pg_t pg); // remove state from store + void close_pg(pg_t pg); // close in-memory state + void remove_pg(pg_t pg); // remove state from store __uint64_t last_tid; @@ -174,48 +170,36 @@ class OSD : public Dispatcher { map<__uint64_t, OSDReplicaOp*> replica_ops; map > > replica_pg_osd_tids; // pg -> osd -> tid + void get_repop(OSDReplicaOp*); + void put_repop(OSDReplicaOp*); // will send ack/commit msgs, and delete as necessary. void issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd); void handle_rep_op_ack(__uint64_t tid, int result, bool commit, int fromosd); // recovery - map<__uint64_t,PGPeer*> pull_ops; // tid -> PGPeer* - map<__uint64_t,PGPeer*> push_ops; // tid -> PGPeer* - map<__uint64_t,PGPeer*> remove_ops; // tid -> PGPeer* - - void start_peers(PG *pg, map< int, map >& start_map); + map pull_ops; // tid -> PGPeer* - void peer_notify(int primary, map& pg_list); - void peer_start(int replica, map& pg_map); + void do_notifies(map< int, list >& notify_list); + void do_queries(map< int, map >& query_map); + void repeer(PG *pg, map< int, map >& query_map); - void plan_recovery(PG *pg); - void do_recovery(PG *pg); void pg_pull(PG *pg, int maxops); - void pg_push(PG *pg, int maxops); - void pg_clean(PG *pg, int maxops); - - void pull_replica(PG *pg, object_t oid); - void push_replica(PG *pg, object_t oid); - void remove_replica(PG *pg, object_t oid); + void pull_replica(PG *pg, PG::ObjectInfo& oi); bool require_current_map(Message *m, version_t v); - bool require_current_pg_primary(Message *m, version_t v, PG *pg); + bool require_same_or_newer_map(Message *m, epoch_t e); + void handle_pg_query(class MOSDPGQuery *m); void handle_pg_notify(class MOSDPGNotify *m); - void handle_pg_peer(class MOSDPGPeer *m); - void handle_pg_peer_ack(class MOSDPGPeerAck *m); - void handle_pg_update(class MOSDPGUpdate *m); + void handle_pg_summary(class MOSDPGSummary *m); void op_rep_pull(class MOSDOp *op); void op_rep_pull_reply(class MOSDOpReply *op); - void op_rep_push(class MOSDOp *op); - void op_rep_push_reply(class MOSDOpReply *op); - void op_rep_remove(class MOSDOp *op); - void op_rep_remove_reply(class MOSDOpReply *op); void op_rep_modify(class MOSDOp *op); // write, trucnate, delete void op_rep_modify_commit(class MOSDOp *op); friend class C_OSD_RepModifyCommit; + public: OSD(int id, Messenger *m); ~OSD(); @@ -241,4 +225,15 @@ class OSD : public Dispatcher { void force_remount(); }; +inline ostream& operator<<(ostream& out, OSD::OSDReplicaOp& repop) +{ + out << "repop(wfack=" << repop.waitfor_ack << " wfcommit=" << repop.waitfor_commit; + if (repop.cancel) out << " cancel"; + out << " op=" << *(repop.op); + out << " repop=" << &repop; + out << ")"; + return out; +} + + #endif diff --git a/ceph/osd/OSDMap.cc b/ceph/osd/OSDMap.cc index 7ea717d5411eb..6177db5261e22 100644 --- a/ceph/osd/OSDMap.cc +++ b/ceph/osd/OSDMap.cc @@ -22,12 +22,14 @@ void OSDMap::encode(bufferlist& blist) { - blist.append((char*)&version, sizeof(version)); + blist.append((char*)&epoch, sizeof(epoch)); blist.append((char*)&pg_bits, sizeof(pg_bits)); + blist.append((char*)&mkfs, sizeof(mkfs)); _encode(osds, blist); _encode(down_osds, blist); - //_encode(out_osds, blist); + _encode(out_osds, blist); + _encode(overload_osds, blist); crush._encode(blist); } @@ -35,14 +37,17 @@ void OSDMap::encode(bufferlist& blist) void OSDMap::decode(bufferlist& blist) { int off = 0; - blist.copy(off, sizeof(version), (char*)&version); - off += sizeof(version); + blist.copy(off, sizeof(epoch), (char*)&epoch); + off += sizeof(epoch); blist.copy(off, sizeof(pg_bits), (char*)&pg_bits); off += sizeof(pg_bits); + blist.copy(off, sizeof(mkfs), (char*)&mkfs); + off += sizeof(mkfs); _decode(osds, blist, off); _decode(down_osds, blist, off); - //_decode(out_osds, blist, off); + _decode(out_osds, blist, off); + _decode(overload_osds, blist, off); crush._decode(blist, off); } diff --git a/ceph/osd/OSDMap.h b/ceph/osd/OSDMap.h index e3a5b510aed4d..72139f6bee454 100644 --- a/ceph/osd/OSDMap.h +++ b/ceph/osd/OSDMap.h @@ -49,15 +49,21 @@ using namespace std; #define PG_TYPE_STARTOSD 2 // place primary on a specific OSD (named by the pg_bits) +typedef __uint64_t epoch_t; + + /** OSDMap */ class OSDMap { - version_t version; // what version of the osd cluster descriptor is this + epoch_t epoch; // what epoch of the osd cluster descriptor is this int pg_bits; // placement group bits set osds; // all osds set down_osds; // list of down disks - //set out_osds; // list of unmapped disks + set out_osds; // list of unmapped disks + map overload_osds; + + bool mkfs; public: Crush crush; // hierarchical map @@ -66,15 +72,17 @@ class OSDMap { friend class MDS; public: - OSDMap() : version(0), pg_bits(5) { } + OSDMap() : epoch(0), pg_bits(5), mkfs(false) { } // map info - version_t get_version() { return version; } - void inc_version() { version++; } + epoch_t get_epoch() { return epoch; } + void inc_epoch() { epoch++; } int get_pg_bits() { return pg_bits; } void set_pg_bits(int b) { pg_bits = b; } + bool is_mkfs() { return mkfs; } + void set_mkfs() { mkfs = true; } /**** cluster state *****/ int num_osds() { return osds.size(); } @@ -82,17 +90,18 @@ class OSDMap { const set& get_osds() { return osds; } const set& get_down_osds() { return down_osds; } - const set& get_out_osds() { return crush.out; } + const set& get_out_osds() { return out_osds; } + const map& get_overload_osds() { return overload_osds; } bool is_down(int osd) { return down_osds.count(osd); } bool is_up(int osd) { return !is_down(osd); } - bool is_out(int osd) { return crush.out.count(osd); } + bool is_out(int osd) { return out_osds.count(osd); } bool is_in(int osd) { return !is_in(osd); } void mark_down(int o) { down_osds.insert(o); } void mark_up(int o) { down_osds.erase(o); } - void mark_out(int o) { crush.out.insert(o); } - void mark_in(int o) { crush.out.erase(o); } + void mark_out(int o) { out_osds.insert(o); } + void mark_in(int o) { out_osds.erase(o); } // serialize, unserialize void encode(bufferlist& blist); @@ -208,7 +217,8 @@ class OSDMap { case PG_LAYOUT_CRUSH: crush.do_rule(crush.rules[num_rep], hps, - osds); + osds, + out_osds, overload_osds); break; case PG_LAYOUT_LINEAR: diff --git a/ceph/osd/PG.cc b/ceph/osd/PG.cc index 622fa2fabda76..27207cd583d18 100644 --- a/ceph/osd/PG.cc +++ b/ceph/osd/PG.cc @@ -15,267 +15,90 @@ #include "PG.h" #include "config.h" +#include "OSD.h" #undef dout -#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << "osd" << whoami << " " << *this << " " +#define dout(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << "osd" << osd->whoami << " " << *this << " " -void PG::pulled(object_t oid, version_t v, PGPeer *p) -{ - dout(10) << "pulled o " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl; - - objects_pulling.erase(oid); - - // update peer state - p->pulled(oid); - - // object is now local - objects_missing.erase(oid); - objects_missing_v.erase(oid); -} - - -void PG::pushed(object_t oid, version_t v, PGPeer *p) -{ - dout(10) << "pushed o " << hex << oid << dec << " v " << v << " to osd" << p->get_peer() << endl; - - objects_pushing[oid].erase(p); - if (objects_pushing[oid].empty()) - objects_pushing.erase(oid); - - // update peer state - p->pushed(oid); - - objects_unrep[oid].erase(p->get_peer()); - if (objects_unrep[oid].empty()) - objects_unrep.erase(oid); - - // pg clean now? - if (objects_unrep.empty() && - objects_stray.empty()) { - assert(!is_clean()); - mark_clean(); - } else { - dout(10) << " still " << objects_unrep.size() << " unrep and " << objects_stray.size() << " stray" << endl; - } -} - -void PG::removed(object_t oid, version_t v, PGPeer *p) -{ - dout(10) << "removed o " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl; - objects_removing[oid].erase(p); - if (objects_removing[oid].empty()) - objects_removing.erase(oid); +void PG::generate_content_summary() +{ + dout(10) << "generating summary" << endl; - // update peer state - p->removed(oid); + list olist; + osd->store->collection_list(info.pgid, olist); - objects_stray[oid].erase(p->get_peer()); - if (objects_stray[oid].empty()) - objects_stray.erase(oid); + content_summary = new PGContentSummary; - // clean now? - if (objects_unrep.empty() && - objects_stray.empty()) { - assert(!is_clean()); - mark_clean(); - } else { - dout(10) << " still " << objects_unrep.size() << " unrep and " << objects_stray.size() << " stray" << endl; - } -} - -/* -bool PG::existant_object_is_clean(object_t o, version_t v) -{ - assert(is_peered() && !is_clean()); - return objects_unrep.count(o) ? false:true; -} - -bool PG::nonexistant_object_is_clean(object_t o) -{ - assert(is_peered() && !is_clean()); - - // FIXME? - - // removed from peers? - for (map::iterator it = peers.begin(); - it != peers.end(); + for (list::iterator it = olist.begin(); + it != olist.end(); it++) { - if (it->second->get_role() != 1) continue; // only care about active set - if (it->second->is_complete()) continue; - if (it->second->is_stray(o)) - return false; + ObjectInfo item(*it); + osd->store->getattr(item.oid, + "version", + &item.version, sizeof(item.version)); + item.osd = osd->whoami; + content_summary->ls.push_back(item); } - - return true; } -*/ -void PG::plan_recovery(ObjectStore *store, version_t current_version, - list& complete_peers) +void PG::plan_recovery() { - dout(10) << "plan_recovery " << current_version << endl; - assert(is_peered()); - - // choose newest last_complete epoch - version_t last = last_complete; - for (map::iterator pit = peers.begin(); - pit != peers.end(); - pit++) { - dout(10) << " osd" << pit->first << " " - << pit->second->objects.size() << " objects, last_complete " << pit->second->last_complete << endl; - if (pit->second->last_complete > last) - last = pit->second->last_complete; - } - dout(10) << " combined last_complete epoch is " << last << endl; - - if (last+1 < current_version) { - dout(1) << "WARNING: last_complete skipped one or more epochs, we're possibly missing something" << endl; - } - if (!last) { // bootstrap! - dout(1) << "WARNING: no complete peers available (yet), pg is crashed" << endl; - return; - } - - // build the object map - // ... OR of complete OSDs' content - map master; // what the current object set is - - map local_objects; - scan_local_objects(local_objects, store); - dout(10) << " " << local_objects.size() << " local objects" << endl; - - if (last_complete == last) - master = local_objects; + dout(10) << "plan_recovery " << endl; - for (map::iterator pit = peers.begin(); - pit != peers.end(); - pit++) { - for (map::iterator oit = pit->second->objects.begin(); - oit != pit->second->objects.end(); - oit++) { - // know this object? - if (master.count(oit->first)) { - if (oit->second > master[oit->first]) // newer - master[oit->first] = oit->second; - } else { - // newly seen object! - master[oit->first] = oit->second; - } - } + assert(is_active()); + assert(content_summary); + + // load local contents + list olist; + osd->store->collection_list(info.pgid, olist); + + // check versions + map vmap; + for (list::iterator it = olist.begin(); + it != olist.end(); + it++) { + version_t v; + osd->store->getattr(*it, + "version", + &v, sizeof(v)); + vmap[*it] = v; } - - // ok, we have a master list. - dout(7) << " master list has " << master.size() << " objects" << endl; - - // local cleanup? - if (!is_complete(current_version)) { - // just cleanup old local objects - // FIXME: do this async? - - for (map::iterator it = local_objects.begin(); - it != local_objects.end(); + + // scan summary + content_summary->remote = 0; + content_summary->missing = 0; + for (list::iterator it = content_summary->ls.begin(); + it != content_summary->ls.end(); it++) { - if (master.count(it->first) && - master[it->first] == it->second) continue; // same! - - dout(10) << " local o " << hex << it->first << dec << " v " << it->second << " old, removing" << endl; - store->remove(it->first); - local_objects.erase(it->first); - } - } - - - // plan pull -> objects_missing - // plan push -> objects_unrep - for (map::iterator it = master.begin(); - it != master.end(); - it++) { - const object_t o = it->first; - const version_t v = it->second; - - // already have it locally? - bool local = false; - if (local_objects.count(o) && - local_objects[o] == v) local = true; // we have it. - - for (map::iterator pit = peers.begin(); - pit != peers.end(); - pit++) { - // pull? - if (!local && - pit->second->objects.count(o) && - pit->second->objects[o] == v) { - objects_missing[o].insert(pit->first); - objects_missing_v[o] = v; - } - - // push? - if (pit->second->get_role() == 1 && - (pit->second->objects.count(o) == 0 || - pit->second->objects[o] < v)) { - objects_unrep[o].insert(pit->first); - pit->second->missing.insert(o); - } + if (vmap.count(it->oid) && + vmap[it->oid] == it->version) { + // have latest. + vmap.erase(it->oid); + continue; } - assert(local || !objects_missing[o].empty()); // pull + // need it + dout(20) << "need " << hex << it->oid << dec + << " v " << it->version << endl; + objects_missing[it->oid] = it->version; + recovery_queue[it->version] = *it; } - if (objects_missing.empty()) { - mark_complete(current_version); - } - - // plan clean -> objects_stray - for (map::iterator pit = peers.begin(); - pit != peers.end(); - pit++) { - const int role = pit->second->get_role(); - assert(role != 0); // duh - - PGPeer *p = pit->second; - assert(p->is_active()); - - if (p->missing.empty() && p->stray.empty()) { - p->state_set(PG_PEER_STATE_COMPLETE); - complete_peers.push_back(p); - } - - if (p->is_complete()) { - dout(12) << " peer osd" << pit->first << " is complete" << endl; - } else { - dout(12) << " peer osd" << pit->first << " is !complete" << endl; - } - - for (map::iterator oit = pit->second->objects.begin(); - oit != pit->second->objects.end(); - oit++) { - const object_t o = oit->first; - const version_t v = oit->second; - - if (role < 0) { - dout(10) << " remote o " << hex << o << dec << " v " << v << " on osd" << p->get_peer() << " stray, removing" << endl; - } - else if (master.count(oit->first) == 0) { - dout(10) << " remote o " << hex << o << dec << " v " << v << " on osd" << p->get_peer() << " deleted/stray, removing" << endl; - } - else - continue; - - objects_stray[o][pit->first] = v; - p->stray.insert(o); - } + // hose stray + for (map::iterator it = vmap.begin(); + it != vmap.end(); + it++) { + dout(20) << "removing stray " << hex << it->first << dec + << " v " << it->second << endl; + osd->store->remove(it->first); } +} - if (objects_unrep.empty() && objects_stray.empty()) - mark_clean(); - - // clear peer content lists - for (map::iterator pit = peers.begin(); - pit != peers.end(); - pit++) - pit->second->objects.clear(); +void PG::do_recovery() +{ + dout(0) << "do_recovery - implement me" << endl; } diff --git a/ceph/osd/PG.h b/ceph/osd/PG.h index fd4f9f6429ad2..50c1243b68e61 100644 --- a/ceph/osd/PG.h +++ b/ceph/osd/PG.h @@ -11,385 +11,201 @@ * */ +#ifndef __PG_H +#define __PG_H #include "include/types.h" #include "include/bufferlist.h" + +#include "OSDMap.h" #include "ObjectStore.h" #include "msg/Messenger.h" +#include +using namespace std; + #include using namespace __gnu_cxx; -struct PGSummary { - pg_t pgid; - version_t version,mtime; - version_t last_epoch_started; -}; - -struct PGContentSummary { - map objects; -}; - - - -struct PGReplicaInfo { - int state; - version_t last_complete; - version_t last_any_complete; - map objects; // remote object list - - void _encode(bufferlist& blist) { - blist.append((char*)&state, sizeof(state)); - blist.append((char*)&last_complete, sizeof(last_complete)); - blist.append((char*)&last_any_complete, sizeof(last_any_complete)); - ::_encode(objects, blist); - //::_encode(deleted, blist); - } - void _decode(bufferlist& blist, int& off) { - blist.copy(off, sizeof(state), (char*)&state); - off += sizeof(state); - blist.copy(off, sizeof(last_complete), (char*)&last_complete); - off += sizeof(last_complete); - blist.copy(off, sizeof(last_any_complete), (char*)&last_any_complete); - off += sizeof(last_any_complete); - ::_decode(objects, blist, off); - //::_decode(deleted, blist, off); - } - - PGReplicaInfo() : state(0) { } -}; - - -/** PGPeer - * state associated with non-primary OSDS with PG content. - * only used by primary. - */ - -// by primary -#define PG_PEER_STATE_ACTIVE 1 // peer has acked our request, sent back PG state. -#define PG_PEER_STATE_COMPLETE 2 // peer has everything replicated+clean - -class PGPeer { - public: - class PG *pg; - private: - int peer; - int role; - int state; - - public: - // peer state - version_t last_complete; - map objects; // cleared after pg->is_peered() - - private: - // recovery todo - set missing; // missing or old objects to push - set stray; // extra objects to delete - - // recovery in-flight - map pulling; - map pushing; - map removing; - - // replication: for pushing replicas (new or old) - //map writing; // objects i've written to replica - - friend class PG; - - public: - PGPeer(class PG *pg, int p, int ro) : - pg(pg), - peer(p), - role(ro), - state(0) { } - - int get_peer() { return peer; } - int get_role() { return role; } - - int get_state() { return state; } - bool state_test(int m) { return (state & m) != 0; } - void state_set(int m) { state |= m; } - void state_clear(int m) { state &= ~m; } - - bool is_active() { return state_test(PG_PEER_STATE_ACTIVE); } - bool is_complete() { return state_test(PG_PEER_STATE_COMPLETE); } - bool is_recovering() { return is_active() && !is_complete(); } - - bool is_missing(object_t o) { - if (is_complete()) return false; - return missing.count(o); - } - bool is_stray(object_t o) { - if (is_complete()) return false; - return stray.count(o); - } - - // actors - void pull(object_t o, version_t v) { pulling[o] = v; } - bool is_pulling(object_t o) { return pulling.count(o); } - version_t pulling_version(object_t o) { return pulling[o]; } - void pulled(object_t o) { pulling.erase(o); } - - void push(object_t o, version_t v) { pushing[o] = v; } - bool is_pushing(object_t o) { return pushing.count(o); } - version_t pushing_version(object_t o) { return pushing[o]; } - void pushed(object_t o) { - pushing.erase(o); - missing.erase(o); - if (missing.empty() && stray.empty()) - state_set(PG_PEER_STATE_COMPLETE); - } - - void remove(object_t o, version_t v) { removing[o] = v; } - bool is_removing(object_t o) { return removing.count(o); } - version_t removing_version(object_t o) { return removing[o]; } - void removed(object_t o) { - removing.erase(o); - stray.erase(o); - if (missing.empty() && stray.empty()) - state_set(PG_PEER_STATE_COMPLETE); - } - - int num_active_ops() { - return pulling.size() + pushing.size() + removing.size(); - } -}; - - - -/* -// a task list for moving objects around -class PGQueue { - list objects; - list versions; - list peers; - int _size; - public: - PGQueue() : _size(0) { } - - int size() { return _size; } - - void push_back(object_t o, version_t v, int p) { - objects.push_back(o); versions.push_back(v); peers.push_back(p); - _size++; - } - void push_front(object_t o, version_t v, int p) { - objects.push_front(o); versions.push_front(v); peers.push_front(p); - _size++; - } - bool get_next(object_t& o, version_t& v, int& p) { - if (objects.empty()) return false; - o = objects.front(); v = versions.front(); p = peers.front(); - objects.pop_front(); versions.pop_front(); peers.pop_front(); - _size--; - return true; - } - void clear() { - objects.clear(); versions.clear(); peers.clear(); - _size = 0; - } - bool empty() { return objects.empty(); } -}; -*/ - +class OSD; /** PG - Replica Placement Group * */ -// any -//#define PG_STATE_COMPLETE 1 // i have full PG contents locally -#define PG_STATE_PEERED 2 // primary: peered with everybody - // replica: peered with auth - -// primary -#define PG_STATE_CLEAN 8 // peers are fully replicated and clean of stray objects - -// replica -#define PG_STATE_STRAY 32 // i need to announce myself to new auth - - - class PG { - protected: - int whoami; // osd#, purely for debug output, yucka - - pg_t pgid; - int role; // 0 = primary, 1 = replica, -1=none. - int state; // see bit defns above - version_t primary_since; // (only defined if role==0) - - version_t last_complete; // me - version_t last_any_complete; // anybody in the set +public: - public: - map peers; // primary: (soft state) active peers - - public: - vector acting; - //pginfo_t info; - - - /* - lamport_t last_complete_stamp; // lamport timestamp of last complete op - lamport_t last_modify_stamp; // most recent modification - lamport_t last_clean_stamp; - */ - - // pg waiters - list waiting_for_peered; // any op will hang until peered - hash_map > waiting_for_missing_object; - hash_map > waiting_for_clean_object; + /** ObjectInfo + * summary info about an object (replica) + */ + struct ObjectInfo { + object_t oid; + version_t version; + int osd; // -1 = unknown. if local, osd == whoami. + ObjectInfo(object_t o=0, version_t v=0, int os=-1) : oid(o), version(v), osd(os) {} + }; + + struct PGInfo { + pg_t pgid; + version_t last_update; // last object version applied. + version_t last_complete; // last pg version pg was complete. + epoch_t last_epoch_started; // last epoch started. + epoch_t last_epoch_finished; // last epoch finished. + epoch_t same_primary_since; // + PGInfo(pg_t p=0) : pgid(p), + last_update(0), last_complete(0), + last_epoch_started(0), last_epoch_finished(0), + same_primary_since(0) {} + }; + + struct PGContentSummary { + //version_t since; + int remote, missing; + list ls; + + void _encode(bufferlist& blist) { + //blist.append((char*)&since, sizeof(since)); + blist.append((char*)&remote, sizeof(remote)); + blist.append((char*)&missing, sizeof(missing)); + ::_encode(ls, blist); + } + void _decode(bufferlist& blist, int& off) { + //blist.copy(off, sizeof(since), (char*)&since); + //off += sizeof(since); + blist.copy(off, sizeof(remote), (char*)&remote); + off += sizeof(remote); + blist.copy(off, sizeof(missing), (char*)&missing); + off += sizeof(missing); + ::_decode(ls, blist, off); + } + PGContentSummary() : remote(0), missing(0) {} + }; - // recovery - map > objects_missing; // pull: missing locally - map objects_missing_v; // stupid - map > objects_unrep; // push: missing remotely - map > objects_stray; // clean: stray (remote) objects - - map objects_pulling; - map > objects_pushing; - map > objects_removing; - private: - map >::iterator pull_pos; - map >::iterator push_pos; - map >::iterator remove_pos; - - public: - bool get_next_pull(object_t& oid) { - if (objects_missing.empty()) return false; - if (objects_missing.size() == objects_pulling.size()) return false; - - if (objects_pulling.empty() || pull_pos == objects_missing.end()) - pull_pos = objects_missing.begin(); - while (objects_pulling.count(pull_pos->first)) { - pull_pos++; - if (pull_pos == objects_missing.end()) - pull_pos = objects_missing.begin(); - } + /** PGPeer + * state associated with non-primary OSDS with PG content. + * only used by primary. + */ + + class PGPeer { + public: + // bits + static const int STATE_INFO = 1; // we have info + static const int STATE_SUMMARY = 2; // we have summary + static const int STATE_QINFO = 4; // we are querying info|summary. + static const int STATE_QSUMMARY = 8; // we are querying info|summary. + static const int STATE_WAITING = 16; // peer is waiting for go. + static const int STATE_ACTIVE = 32; // peer is active. + //static const int STATE_COMPLETE = 64; // peer is complete. + + class PG *pg; + private: + int peer; + int role; + int state; - oid = pull_pos->first; - pull_pos++; - return true; - } - bool get_next_push(object_t& oid) { - if (objects_unrep.empty()) return false; - if (objects_unrep.size() == objects_pushing.size()) return false; - - if (objects_pushing.empty() || push_pos == objects_unrep.end()) - push_pos = objects_unrep.begin(); - while (objects_pushing.count(push_pos->first)) { - push_pos++; - if (push_pos == objects_unrep.end()) - push_pos = objects_unrep.begin(); - } + public: + // peer state + PGInfo info; + PGContentSummary *content_summary; - oid = push_pos->first; - push_pos++; - return true; - } - bool get_next_remove(object_t& oid) { - if (objects_stray.empty()) return false; - if (objects_stray.size() == objects_removing.size()) return false; - - if (objects_removing.empty() || remove_pos == objects_stray.end()) - remove_pos = objects_stray.begin(); - while (objects_removing.count(remove_pos->first)) { - remove_pos++; - if (remove_pos == objects_stray.end()) - remove_pos = objects_stray.begin(); + friend class PG; + + public: + PGPeer(class PG *pg, int p, int ro) : + pg(pg), + peer(p), + role(ro), + state(0), + content_summary(NULL) { } + ~PGPeer() { + if (content_summary) delete content_summary; } - oid = remove_pos->first; - remove_pos++; - return true; - } - - void pulling(object_t oid, version_t v, PGPeer *p) { - p->pull(oid, v); - objects_pulling[oid] = p; - } - void pulled(object_t oid, version_t v, PGPeer *p); + int get_peer() { return peer; } + int get_role() { return role; } + + int get_state() { return state; } + bool state_test(int m) { return (state & m) != 0; } + void state_set(int m) { state |= m; } + void state_clear(int m) { state &= ~m; } + + bool have_info() { return state_test(STATE_INFO); } + bool have_summary() { return state_test(STATE_SUMMARY); } + + bool is_waiting() { return state_test(STATE_WAITING); } + bool is_active() { return state_test(STATE_ACTIVE); } + bool is_complete() { return have_info() && + info.last_update == info.last_complete; } + }; + - void pushing(object_t oid, version_t v, PGPeer *p) { - p->push(oid, v); - objects_pushing[oid].insert(p); - } - void pushed(object_t oid, version_t v, PGPeer *p); + /*** PG ****/ +public: + // any + //static const int STATE_SUMMARY = 1; // i have a content summary. + static const int STATE_ACTIVE = 2; // i am active. (primary: replicas too) + //static const int STATE_COMPLETE = 4; // i am complete. - void removing(object_t oid, version_t v, PGPeer *p) { - p->remove(oid, v); - objects_removing[oid][p] = v; - } - void removed(object_t oid, version_t v, PGPeer *p); + // primary + static const int STATE_CLEAN = 8; // peers are complete, clean of stray replicas. + + // non-primary + static const int STATE_STRAY = 16; // i haven't sent notify yet. primary may not know i exist. + protected: + OSD *osd; - // log - map< version_t, set > log_write_version_objects; - map< object_t, set > log_write_object_versions; - map< version_t, set > log_delete_version_objects; - map< object_t, set > log_delete_object_versions; + // generic state +public: + PGInfo info; + PGContentSummary *content_summary; - void log_write(object_t o, version_t v) { - log_write_object_versions[o].insert(v); - log_write_version_objects[v].insert(o); - } - void unlog_write(object_t o, version_t v) { - log_write_object_versions[o].erase(v); - log_write_version_objects[v].erase(o); - } - void log_delete(object_t o, version_t v) { - log_delete_object_versions[o].insert(v); - log_delete_version_objects[v].insert(o); - } - void unlog_delete(object_t o, version_t v) { - log_write_object_versions[o].erase(v); - log_write_version_objects[v].erase(o); - } +protected: + int role; // 0 = primary, 1 = replica, -1=none. + int state; // see bit defns above + // primary state +public: + epoch_t last_epoch_started_any; + map peers; // primary: (soft state) active peers public: - void plan_recovery(ObjectStore *store, version_t current_version, - list& complete_peers); + vector acting; - void discard_recovery_plan() { - assert(waiting_for_peered.empty()); - assert(waiting_for_missing_object.empty()); + // pg waiters + list waiting_for_active; + hash_map > waiting_for_missing_object; - objects_missing.clear(); - objects_missing_v.clear(); - objects_unrep.clear(); - objects_stray.clear(); - } + // recovery + map objects_missing; // objects (versions) i need + map recovery_queue; // objects i need to pull (in order) + version_t requested_through; + map objects_pulling; // which objects are currently being pulled + + void plan_recovery(); + void generate_content_summary(); + void do_recovery(); public: - PG(int osd, pg_t p) : whoami(osd), pgid(p), + PG(OSD *o, pg_t p) : + osd(o), + info(p), content_summary(0), role(0), - state(0), - primary_since(0), - last_complete(0), last_any_complete(0) - //last_complete_stamp(0), last_modify_stamp(0), last_clean_stamp(0) - { } + state(0) + { } - pg_t get_pgid() { return pgid; } + pg_t get_pgid() { return info.pgid; } int get_primary() { return acting[0]; } int get_nrep() { return acting.size(); } - version_t get_last_complete() { return last_complete; } - //void set_last_complete(version_t v) { last_complete = v; } - version_t get_last_any_complete() { return last_any_complete; } - //void set_last_any_complete(version_t v) { last_any_complete = v; } - - version_t get_primary_since() { return primary_since; } - void set_primary_since(version_t v) { primary_since = v; } - int get_role() { return role; } void set_role(int r) { role = r; } void calc_role(int whoami) { @@ -399,46 +215,32 @@ class PG { } bool is_primary() { return role == 0; } bool is_residual() { return role < 0; } - + int get_state() { return state; } bool state_test(int m) { return (state & m) != 0; } void set_state(int s) { state = s; } void state_set(int m) { state |= m; } void state_clear(int m) { state &= ~m; } - bool is_complete(version_t v) { - //return state_test(PG_STATE_COMPLETE); - return v == last_complete; - } - bool is_peered() { return state_test(PG_STATE_PEERED); } - //bool is_crowned() { return state_test(PG_STATE_CROWNED); } - bool is_clean() { return state_test(PG_STATE_CLEAN); } - //bool is_flushing() { return state_test(PG_STATE_FLUSHING); } - bool is_stray() { return state_test(PG_STATE_STRAY); } - - void mark_peered() { - state_set(PG_STATE_PEERED); - } - void mark_complete(version_t v) { - last_complete = v; - if (v > last_any_complete) last_any_complete = v; - } - void mark_any_complete(version_t v) { - if (v > last_any_complete) last_any_complete = v; + bool is_complete() { return info.last_complete == info.last_update; } + + bool is_active() { return state_test(STATE_ACTIVE); } + //bool is_complete() { return state_test(STATE_COMPLETE); } + bool is_clean() { return state_test(STATE_CLEAN); } + bool is_stray() { return state_test(STATE_STRAY); } + + void mark_complete() { + info.last_complete = info.last_update; } - void mark_clean() { - state_set(PG_STATE_CLEAN); + void mark_active() { + state_set(STATE_ACTIVE); } int num_active_ops() { - int o = 0; - for (map::iterator it = peers.begin(); - it != peers.end(); - it++) - o += it->second->num_active_ops(); - return o; + return objects_pulling.size(); } + // peers map& get_peers() { return peers; } PGPeer* get_peer(int p) { if (peers.count(p)) return peers[p]; @@ -460,59 +262,53 @@ class PG { peers.clear(); } - - void store(ObjectStore *store) { - if (!store->collection_exists(pgid)) - store->create_collection(pgid); - store->collection_setattr(pgid, "role", &role, sizeof(role)); - store->collection_setattr(pgid, "primary_since", &primary_since, sizeof(primary_since)); - store->collection_setattr(pgid, "state", &state, sizeof(state)); - } - void fetch(ObjectStore *store) { - store->collection_getattr(pgid, "role", &role, sizeof(role)); - store->collection_getattr(pgid, "primary_since", &primary_since, sizeof(primary_since)); - store->collection_getattr(pgid, "state", &state, sizeof(state)); - } - void add_object(ObjectStore *store, const object_t oid) { - store->collection_add(pgid, oid); - } - void remove_object(ObjectStore *store, const object_t oid) { - store->collection_remove(pgid, oid); + // pg state storage + /* + void store() { + if (!osd->store->collection_exists(pgid)) + osd->store->create_collection(pgid); + // *** } - void list_objects(ObjectStore *store, list& ls) { - store->collection_list(pgid, ls); + void fetch() { + //osd->store->collection_getattr(pgid, "role", &role, sizeof(role)); + //osd->store->collection_getattr(pgid, "primary_since", &primary_since, sizeof(primary_since)); + //osd->store->collection_getattr(pgid, "state", &state, sizeof(state)); } - void scan_local_objects(map& local_objects, ObjectStore *store) { - list olist; - local_objects.clear(); - list_objects(store,olist); - for (list::iterator it = olist.begin(); - it != olist.end(); - it++) { - version_t v = 0; - store->getattr(*it, - "version", - &v, sizeof(v)); - local_objects[*it] = v; - cout << " o " << hex << *it << dec << " v " << v << endl; - } - } + void list_objects(list& ls) { + osd->store->collection_list(pgid, ls); + }*/ +}; -}; +inline ostream& operator<<(ostream& out, PG::ObjectInfo& oi) +{ + return out << "object[" << hex << oi.oid << dec + << " v " << oi.version + << " osd" << oi.osd + << "]"; +} +inline ostream& operator<<(ostream& out, PG::PGInfo& pgi) +{ + return out << "pgi(" << hex << pgi.pgid << dec + << " v " << pgi.last_update << "/" << pgi.last_complete + << " e " << pgi.last_epoch_started << "/" << pgi.last_epoch_finished + << ")"; +} inline ostream& operator<<(ostream& out, PG& pg) { - out << "pg[" << hex << pg.get_pgid() << dec << " " << pg.get_role(); - //if (pg.is_complete()) out << " complete"; - if (pg.is_peered()) out << " peered"; + out << "pg[" << pg.info + << " " << pg.get_role(); + if (pg.is_active()) out << " active"; if (pg.is_clean()) out << " clean"; - out << " lc=" << pg.get_last_complete(); + if (pg.is_stray()) out << " stray"; out << "]"; return out; } + +#endif diff --git a/ceph/osdc/Objecter.cc b/ceph/osdc/Objecter.cc index ec3e7b787488b..725b3cd47b348 100644 --- a/ceph/osdc/Objecter.cc +++ b/ceph/osdc/Objecter.cc @@ -33,12 +33,12 @@ void Objecter::dispatch(Message *m) void Objecter::handle_osd_map(MOSDMap *m) { if (!osdmap || - m->get_version() > osdmap->get_version()) { + m->get_epoch() > osdmap->get_epoch()) { if (osdmap) { - dout(3) << "handle_osd_map got osd map version " << m->get_version() - << " > " << osdmap->get_version() << endl; + dout(3) << "handle_osd_map got osd map epoch " << m->get_epoch() + << " > " << osdmap->get_epoch() << endl; } else { - dout(3) << "handle_osd_map got osd map version " << m->get_version() + dout(3) << "handle_osd_map got osd map epoch " << m->get_epoch() << endl; } @@ -48,8 +48,8 @@ void Objecter::handle_osd_map(MOSDMap *m) // ** FIXME ** } else { - dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() - << " <= " << osdmap->get_version() << endl; + dout(3) << "handle_osd_map ignoring osd map epoch " << m->get_epoch() + << " <= " << osdmap->get_epoch() << endl; } } @@ -57,9 +57,9 @@ void Objecter::handle_osd_map(MOSDMap *m) void Objecter::handle_osd_op_reply(MOSDOpReply *m) { // updated cluster info? - if (m->get_map_version() && - m->get_map_version() > osdmap->get_version()) { - dout(3) << "op reply has newer map " << m->get_map_version() << " > " << osdmap->get_version() << endl; + if (m->get_map_epoch() && + m->get_map_epoch() > osdmap->get_epoch()) { + dout(3) << "op reply has newer map " << m->get_map_epoch() << " > " << osdmap->get_epoch() << endl; osdmap->decode( m->get_osdmap() ); } @@ -111,7 +111,7 @@ int Objecter::readx(OSDRead *rd, Context *onfinish) // send last_tid++; MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->pgid, osdmap->get_version(), + it->oid, it->pgid, osdmap->get_epoch(), OSD_OP_READ); m->set_length(it->len); m->set_offset(it->offset); @@ -299,7 +299,7 @@ int Objecter::writex(OSDWrite *wr, Context *onack, Context *oncommit) // send last_tid++; MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->pgid, osdmap->get_version(), + it->oid, it->pgid, osdmap->get_epoch(), OSD_OP_WRITE); m->set_length(it->len); m->set_offset(it->offset); @@ -429,7 +429,7 @@ int Objecter::zerox(OSDZero *z, Context *onack, Context *oncommit) // send last_tid++; MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->pgid, osdmap->get_version(), + it->oid, it->pgid, osdmap->get_epoch(), OSD_OP_ZERO); m->set_length(it->len); m->set_offset(it->offset); -- 2.39.5