From 91ba93c07167ae7d25df67a0b6156230d637226e Mon Sep 17 00:00:00 2001 From: sage Date: Tue, 2 Aug 2005 06:10:32 +0000 Subject: [PATCH] *** empty log message *** git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@479 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/client/Client.cc | 5 + ceph/config.cc | 4 + ceph/config.h | 1 + ceph/include/bufferlist.h | 29 +++ ceph/mds/MDS.cc | 77 +++++++- ceph/mds/MDS.h | 3 + ceph/messages/MOSDMap.h | 17 +- ceph/messages/MOSDRGNotify.h | 35 ++++ ceph/msg/Message.h | 7 +- ceph/msg/Messenger.cc | 10 +- ceph/osd/OSD.cc | 360 +++++++++++++++++++++++++---------- ceph/osd/OSD.h | 51 +++-- ceph/osd/OSDMap.h | 21 +- ceph/osdc/Filer.cc | 25 +++ ceph/osdc/Filer.h | 2 + 15 files changed, 510 insertions(+), 137 deletions(-) create mode 100644 ceph/messages/MOSDRGNotify.h diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 05280fe4ca472..2176342955013 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -425,12 +425,16 @@ void Client::dispatch(Message *m) case MSG_OSD_OPREPLY: filer->handle_osd_op_reply((MOSDOpReply*)m); break; + case MSG_OSD_MAP: + filer->handle_osd_map((MOSDMap*)m); + break; // client case MSG_CLIENT_FILECAPS: handle_file_caps((MClientFileCaps*)m); break; + default: cout << "dispatch doesn't recognize message type " << m->get_type() << endl; assert(0); // fail loudly @@ -629,6 +633,7 @@ void Client::handle_file_caps(MClientFileCaps *m) + // ------------------- // fs ops diff --git a/ceph/config.cc b/ceph/config.cc index 4a226f90cfe08..7912571ff9f78 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -101,10 +101,12 @@ md_config_t g_conf = { // --- osd --- + osd_num_rg: 10000, osd_nrep: 1, osd_fsync: true, osd_writesync: false, osd_maxthreads: 10, + // --- fakeclient (mds regression testing) --- @@ -238,6 +240,8 @@ void parse_config_options(int argc, char **argv, g_conf.client_bcache_ttl = atoi(argv[++i]); + else if (strcmp(argv[i], "--osd_num_rg") == 0) + g_conf.osd_num_rg = atoi(argv[++i]); else if (strcmp(argv[i], "--osd_nrep") == 0) g_conf.osd_nrep = atoi(argv[++i]); else if (strcmp(argv[i], "--osd_fsync") == 0) diff --git a/ceph/config.h b/ceph/config.h index 086f5f1d3a4ae..34fabea3c0dc0 100644 --- a/ceph/config.h +++ b/ceph/config.h @@ -72,6 +72,7 @@ struct md_config_t { bool mds_verify_export_dirauth; // debug flag // osd + int osd_num_rg; int osd_nrep; bool osd_fsync; bool osd_writesync; diff --git a/ceph/include/bufferlist.h b/ceph/include/bufferlist.h index 62cedce69bec2..3ffec68eefd71 100644 --- a/ceph/include/bufferlist.h +++ b/ceph/include/bufferlist.h @@ -466,6 +466,35 @@ inline void _decode(vector& s, bufferlist& bl, int& off) assert(s.size() == (unsigned)n); } +// list<__uint64_t> +inline void _encode(list<__uint64_t>& s, bufferlist& bl) +{ + int n = s.size(); + bl.append((char*)&n, sizeof(n)); + for (list<__uint64_t>::iterator it = s.begin(); + it != s.end(); + it++) { + __uint64_t v = *it; + bl.append((char*)&v, sizeof(v)); + n--; + } + assert(n==0); +} +inline void _decode(list<__uint64_t>& s, bufferlist& bl, int& off) +{ + s.clear(); + int n; + bl.copy(off, sizeof(n), (char*)&n); + off += sizeof(n); + for (int i=0; iget_version() << endl; + assert(get_nodeid() == 0); + + // tell mds + for (int i=0; iget_num_mds(); i++) { + messenger->send_message(new MOSDMap(osdmap), + MSG_ADDR_MDS(i)); + } + + // tell osds + set osds; + osdmap->get_all_osds(osds); + for (set::iterator it = osds.begin(); + it != osds.end(); + it++) { + messenger->send_message(new MOSDMap(osdmap), + MSG_ADDR_OSD(*it)); + } + + // tell clients + for (set::iterator it = mounted_clients.begin(); + it != mounted_clients.end(); + it++) { + messenger->send_message(new MOSDMap(osdmap), + MSG_ADDR_CLIENT(*it)); + } +} + + mds_load_t MDS::get_load() { @@ -299,6 +330,9 @@ void MDS::proc_message(Message *m) case MSG_OSD_OPREPLY: filer->handle_osd_op_reply((class MOSDOpReply*)m); return; + case MSG_OSD_MAP: + handle_osd_map((MOSDMap*)m); + return; case MSG_OSD_GETMAP: handle_osd_getmap(m); @@ -507,6 +541,24 @@ void MDS::my_dispatch(Message *m) } } + + // HACK osd map change + if (0) { + static int didit = 0; + if (whoami == 0 && + elapsed.sec() > 10 && !didit) { + didit = 1; + + dout(1) << "changing OSD map, removing one OSD" << endl; + osdmap->get_group(0).num_osds--; + osdmap->init_rush(); + osdmap->inc_version(); + + // bcast + bcast_osd_map(); + } + } + } // HACK to force export to test foreign renames @@ -544,12 +596,33 @@ void MDS::handle_osd_getmap(Message *m) { dout(7) << "osd_getmap from " << MSG_ADDR_NICE(m->get_source()) << endl; - messenger->send_message(new MOSDGetMapAck(osdmap), + messenger->send_message(new MOSDMap(osdmap), m->get_source()); delete m; } +void MDS::handle_osd_map(MOSDMap *m) +{ + if (!osdmap || + m->get_version() > osdmap->get_version()) { + if (osdmap) { + dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl; + } else { + dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl; + } + + osdmap->decode(m->get_osdmap()); + + // kick requests who might be timing out on the wrong osds + // ** FIXME ** + + } else { + dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl; + } +} + + void MDS::handle_client_mount(MClientMount *m) { // mkfs? (sorta hack!) diff --git a/ceph/mds/MDS.h b/ceph/mds/MDS.h index bc859ed8a1707..8b28c2d2b187d 100644 --- a/ceph/mds/MDS.h +++ b/ceph/mds/MDS.h @@ -160,6 +160,8 @@ class MDS : public Dispatcher { int shutdown_start(); int shutdown_final(); + void bcast_osd_map(); + // messages void proc_message(Message *m); virtual void dispatch(Message *m); @@ -184,6 +186,7 @@ class MDS : public Dispatcher { // osds void handle_osd_getmap(Message *m); + void handle_osd_map(class MOSDMap *m); // clients void handle_client_mount(class MClientMount *m); diff --git a/ceph/messages/MOSDMap.h b/ceph/messages/MOSDMap.h index c47ba8e8069ce..114f95de5d03e 100644 --- a/ceph/messages/MOSDMap.h +++ b/ceph/messages/MOSDMap.h @@ -5,8 +5,9 @@ #include "osd/OSDMap.h" -class MOSDGetMapAck : public Message { +class MOSDMap : public Message { bufferlist osdmap; + __uint64_t version; public: // osdmap @@ -14,19 +15,25 @@ class MOSDGetMapAck : public Message { return osdmap; } - MOSDGetMapAck(OSDMap *oc) : - Message(MSG_OSD_GETMAPACK) { + __uint64_t get_version() { return version; } + + MOSDMap(OSDMap *oc) : + Message(MSG_OSD_MAP) { oc->encode(osdmap); + version = oc->get_version(); } - MOSDGetMapAck() {} + MOSDMap() {} // marshalling virtual void decode_payload() { + payload.copy(0, sizeof(version), (char*)&version); + payload.splice(0, sizeof(version)); osdmap.claim(payload); } virtual void encode_payload() { - payload.claim(osdmap); + payload.append((char*)&version, sizeof(version)); + payload.claim_append(osdmap); } virtual char *get_type_name() { return "ogma"; } diff --git a/ceph/messages/MOSDRGNotify.h b/ceph/messages/MOSDRGNotify.h new file mode 100644 index 0000000000000..588c9509f54cb --- /dev/null +++ b/ceph/messages/MOSDRGNotify.h @@ -0,0 +1,35 @@ +#ifndef __MOSDPEER_H +#define __MOSDPEER_H + +#include "msg/Message.h" + + +class MOSDRGNotify : public Message { + __uint64_t map_version; + list rg_list; + + public: + __uint64_t get_version() { return map_version; } + list& get_rg_list() { return rg_list; } + + MOSDRGNotify() {} + MOSDRGNotify(__uint64_t v, list& l) : + Message(MSG_OSD_RG_NOTIFY) { + this->map_version = v; + rg_list.splice(rg_list.begin(), l); + } + + char *get_type_name() { return "RGnot"; } + + void encode_payload() { + payload.append((char*)&map_version, sizeof(map_version)); + _encode(rg_list, payload); + } + void decode_payload() { + int off = 0; + payload.copy(off, sizeof(map_version), (char*)&map_version); + _decode(rg_list, payload, off); + } +}; + +#endif diff --git a/ceph/msg/Message.h b/ceph/msg/Message.h index a08592cea045a..1e8ac4251884a 100644 --- a/ceph/msg/Message.h +++ b/ceph/msg/Message.h @@ -15,7 +15,12 @@ #define MSG_OSD_PING 16 #define MSG_OSD_GETMAP 17 -#define MSG_OSD_GETMAPACK 18 +#define MSG_OSD_MAP 18 + +#define MSG_OSD_RG_NOTIFY 50 +#define MSG_OSD_RG_PEER 51 +#define MSG_OSD_RG_PEERACK 52 + #define MSG_CLIENT_REQUEST 20 #define MSG_CLIENT_REPLY 21 diff --git a/ceph/msg/Messenger.cc b/ceph/msg/Messenger.cc index 6a687802e9135..735cbdfe46a62 100644 --- a/ceph/msg/Messenger.cc +++ b/ceph/msg/Messenger.cc @@ -20,7 +20,8 @@ using namespace std; #include "messages/MOSDPing.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" -#include "messages/MOSDGetMapAck.h" +#include "messages/MOSDMap.h" +#include "messages/MOSDRGNotify.h" #include "messages/MClientMount.h" #include "messages/MClientMountAck.h" @@ -218,8 +219,11 @@ decode_message(msg_envelope_t& env, bufferlist& payload) case MSG_OSD_OPREPLY: m = new MOSDOpReply(); break; - case MSG_OSD_GETMAPACK: - m = new MOSDGetMapAck(); + case MSG_OSD_MAP: + m = new MOSDMap(); + break; + case MSG_OSD_RG_NOTIFY: + m = new MOSDRGNotify(); break; // clients diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 069043ec26086..77c2ef184433a 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -23,7 +23,8 @@ #include "messages/MPingAck.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" -#include "messages/MOSDGetMapAck.h" +#include "messages/MOSDMap.h" +#include "messages/MOSDRGNotify.h" #include "common/Logger.h" #include "common/LogType.h" @@ -143,46 +144,6 @@ int OSD::shutdown() -// ------------------------------------ -// replica groups - -void OSD::get_rg_list(list& ls) -{ - // just list collections; assume they're all rg's (for now) - store->list_collections(ls); -} - - -bool OSD::rg_exists(repgroup_t rg) -{ - struct stat st; - if (store->collection_stat(rg, &st) == 0) - return true; - else - return false; -} - - -RG *OSD::open_rg(repgroup_t rg) -{ - // already open? - if (rg_map.count(rg)) - return rg_map[rg]; - - // stat collection - RG *r = new RG(rg); - if (rg_exists(rg)) { - // exists - r->fetch(store); - } else { - // dne - r->store(store); - } - rg_map[rg] = r; - - return r; -} - @@ -202,6 +163,14 @@ void OSD::dispatch(Message *m) monitor->proc_message(m); break; + // map and replication + case MSG_OSD_MAP: + handle_osd_map((MOSDMap*)m); + break; + + case MSG_OSD_RG_NOTIFY: + handle_rg_notify((MOSDRGNotify*)m); + break; // osd case MSG_SHUTDOWN: @@ -209,10 +178,6 @@ void OSD::dispatch(Message *m) delete m; break; - case MSG_OSD_GETMAPACK: - handle_getmap_ack((MOSDGetMapAck*)m); - break; - case MSG_PING: // take note. monitor->host_is_alive(m->get_source()); @@ -270,6 +235,24 @@ void OSD::handle_ping(MPing *m) +// ===================================================== +// MAP + +void OSD::wait_for_new_map(Message *m) +{ + // ask MDS + messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), + MSG_ADDR_MDS(0), MDS_PORT_MAIN); + + osd_lock.Lock(); + waiting_for_osdmap.push_back(m); + osd_lock.Unlock(); +} + + +/** update_map + * assimilate a new OSDMap. scan rgs. + */ void OSD::update_map(bufferlist& state) { // decode new map @@ -277,11 +260,108 @@ void OSD::update_map(bufferlist& state) osdmap->decode(state); dout(7) << "update_map version " << osdmap->get_version() << endl; + // scan known replica groups! + scan_rg(); +} + + +void OSD::handle_osd_map(MOSDMap *m) +{ + // SAB + osd_lock.Lock(); + + if (!osdmap || + m->get_version() > osdmap->get_version()) { + if (osdmap) { + dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl; + } else { + dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl; + } + + update_map(m->get_osdmap()); + delete m; + + // process waiters + list waiting; + waiting.splice(waiting.begin(), waiting_for_osdmap); + + osd_lock.Unlock(); + + for (list::iterator it = waiting.begin(); + it != waiting.end(); + it++) { + dispatch(*it); + } + } else { + dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl; + osd_lock.Unlock(); + } +} + + + +// ====================================================== +// REPLICATION + + +// ------------------------------------ +// replica groups + +void OSD::get_rg_list(list& ls) +{ + // just list collections; assume they're all rg's (for now) + store->list_collections(ls); +} + + +bool OSD::rg_exists(repgroup_t rg) +{ + struct stat st; + if (store->collection_stat(rg, &st) == 0) + return true; + else + return false; +} + + +RG *OSD::open_rg(repgroup_t rg) +{ + // already open? + if (rg_map.count(rg)) + return rg_map[rg]; + + // stat collection + RG *r = new RG(rg); + if (rg_exists(rg)) { + // exists + r->fetch(store); + } else { + // dne + r->store(store); + } + rg_map[rg] = r; + + return r; +} + + + + + +/** + * scan replica groups, initiate any replication + * activities. + */ +void OSD::scan_rg() +{ + //dout(7) << "scan_rg map version " << osdmap->get_version() << endl; + // scan replica groups list ls; get_rg_list(ls); - map< int, list > primary_ping_queue; + map< int, list > notify_list; + map< int, set > start_set; for (list::iterator it = ls.begin(); it != ls.end(); @@ -301,7 +381,7 @@ void OSD::update_map(bufferlist& state) if (role != rg->get_role()) { // role change. - dout(7) << " rg " << rgid << " acting role change " << rg->get_role() << " -> " << role << endl; + dout(10) << " rg " << rgid << " acting role change " << rg->get_role() << " -> " << role << endl; // am i old-primary? if (rg->get_role() == 0) { @@ -309,7 +389,7 @@ void OSD::update_map(bufferlist& state) for (map::iterator it = rg->get_peers().begin(); it != rg->get_peers().end(); it++) { - dout(7) << " rg " << rgid << " old-primary, dropping old peer " << it->first << endl; + dout(10) << " rg " << rgid << " old-primary, dropping old peer " << it->first << endl; rg->get_old_replica_set().insert(it->first); delete it->second; } @@ -320,7 +400,14 @@ void OSD::update_map(bufferlist& state) rg->state_clear(RG_STATE_PEERED); rg->set_role(role); rg->store(store); - primary_ping_queue[primary].push_back(rg); + + if (role == 0) { + // i am new primary + + } else { + // i am replica + notify_list[primary].push_back(rgid); + } } else { // no role change. @@ -330,76 +417,153 @@ void OSD::update_map(bufferlist& state) // did primary change? if (primary != rg->get_primary()) { - dout(7) << " rg " << rgid << " acting primary change " << rg->get_primary() << " -> " << primary << endl; + dout(10) << " rg " << rgid << " acting primary change " << rg->get_primary() << " -> " << primary << endl; // re-peer rg->state_clear(RG_STATE_PEERED); rg->set_primary(primary); rg->store(store); - primary_ping_queue[primary].push_back(rg); + notify_list[primary].push_back(rgid); } } - else if (role == 0) { - // i am primary. - - // check replicas - for (int r=1; rget_peer(r) == 0) { - dout(7) << " rg " << rgid << " primary not peered with replica " << r << " osd" << acting[r] << endl; - - // *** - } - } - + } + + if (role == 0) { + // i am primary. + + // old peers + // *** + + // check replicas + for (int r=1; rget_peer(r) == 0) { + dout(10) << " rg " << rgid << " primary needs to peer with replica " << r << " osd" << acting[r] << endl; + start_set[acting[r]].insert(rg); + } } } } - // initiate any new peering sessions! - for (map< int, list >::iterator pit = primary_ping_queue.begin(); - pit != primary_ping_queue.end(); - pit++) { - // create peer message - int primary = pit->first; - + // notify? + for (map< int, list >::iterator pit = notify_list.begin(); + pit != notify_list.end(); + pit++) + peer_notify(pit->first, pit->second); - for (list::iterator rit = pit->second.begin(); - rit != pit->second.end(); - rit++) { - // add this RG to peer message - } + // start peer? + for (map< int, set >::iterator pit = start_set.begin(); + pit != start_set.end(); + pit++) + peer_start(pit->first, pit->second); - // send - - } } -void OSD::handle_getmap_ack(MOSDGetMapAck *m) +/** peer_notify + * Send an MOSDRGNotify to a primary, with a list of RGs that I have + * content for, and they are primary for. + */ +void OSD::peer_notify(int primary, list& rg_list) { - // SAB - osd_lock.Lock(); + dout(7) << "peer_notify osd" << primary << " on " << rg_list.size() << " RGs" << endl; + MOSDRGNotify *m = new MOSDRGNotify(primary, rg_list); + messenger->send_message(m, + MSG_ADDR_OSD(primary)); +} - update_map(m->get_osdmap()); - delete m; - // process waiters - list waiting; - waiting.splice(waiting.begin(), waiting_for_osdmap); +/** peer_start + * initiate a peer session with a replica on given list of RGs + */ +void OSD::peer_start(int replica, set& rg_set) +{ + dout(7) << "peer_start with osd" << replica << " on " << rg_set.size() << " RGs" << endl; + + +} - list w = waiting; - osd_lock.Unlock(); - for (list::iterator it = w.begin(); - it != w.end(); + + +void OSD::handle_rg_notify(MOSDRGNotify *m) +{ + int from = MSG_ADDR_NUM(m->get_source()); + dout(7) << "handle_rg_notify from osd" << from << endl; + + // older map? + if (m->get_version() < osdmap->get_version()) { + dout(7) << " from old map version " << m->get_version() << " < " << osdmap->get_version() << endl; + delete m; // discard and ignore.* + return; + } + + // newer map? + if (m->get_version() > osdmap->get_version()) { + dout(7) << " for newer map version " << m->get_version() << " > " << osdmap->get_version() << endl; + wait_for_new_map(m); + return; + } + + assert(m->get_version() == osdmap->get_version()); + + // look for unknown RGs i'm primary for + map< int, set > start_set; + + for (list::iterator it = m->get_rg_list().begin(); + it != m->get_rg_list().end(); it++) { - handle_op(*it); + repgroup_t rgid = *it; + + int acting[NUM_RUSH_REPLICAS]; + int nrep = osdmap->repgroup_to_acting_osds(rgid, acting, NUM_RUSH_REPLICAS); + assert(acting[0] == whoami); + + // get/open RG + RG *rg = open_rg(rgid); + + // previously unknown RG? + if (rg->get_peers().empty()) { + dout(10) << " rg " << rgid << " is new" << endl; + for (int r=1; rget_peer(r) == 0) { + dout(10) << " rg " << rgid << " primary needs to peer with replica " << r << " osd" << acting[r] << endl; + start_set[acting[r]].insert(rg); + } + } + } + + // peered with this guy specifically? + RGPeer *rgp = rg->get_peer(from); + if (!rgp) { + dout(7) << " not yet peered with osd" << from << " on rg " << rgid << endl; + start_set[from].insert(rg); + } } + + // start peers? + if (start_set.empty()) { + dout(7) << " no new peers" << endl; + } else { + for (map< int, set >::iterator pit = start_set.begin(); + pit != start_set.end(); + pit++) + peer_start(pit->first, pit->second); + } + + delete m; } + + + + +// ========================================================= +// OPS + + void OSD::handle_op(MOSDOp *op) { // mkfs is special @@ -411,7 +575,7 @@ void OSD::handle_op(MOSDOp *op) // no map? starting up? if (!osdmap) { osd_lock.Lock(); - dout(7) << "no OSDMap, starting up" << endl; + dout(7) << "no OSDMap, asking MDS" << endl; if (waiting_for_osdmap.empty()) messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), MSG_ADDR_MDS(0), MDS_PORT_MAIN); @@ -424,17 +588,7 @@ void OSD::handle_op(MOSDOp *op) if (op->get_map_version() > osdmap->get_version()) { // op's is newer dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl; - - // query MDS - dout(7) << "querying MDS" << endl; - messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), - MSG_ADDR_MDS(0), MDS_PORT_MAIN); - - assert(0); - - osd_lock.Lock(); - waiting_for_osdmap.push_back(op); - osd_lock.Unlock(); + wait_for_new_map(op); return; } diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 55bb12ba07ecb..a287f34529e21 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -174,28 +174,45 @@ class OSD : public Dispatcher { Messenger *messenger; int whoami; - class OSDMap *osdmap; class ObjectStore *store; class HostMonitor *monitor; class Logger *logger; + + // global lock + Mutex osd_lock; + + + // -- ops -- class ThreadPool *threadpool; - list waiting_for_osdmap; + void queue_op(class MOSDOp *m); + public: + void do_op(class MOSDOp *m); + static void doop(OSD *o, MOSDOp *op) { + o->do_op(op); + }; + + protected: + + // -- osd map -- + class OSDMap *osdmap; + list waiting_for_osdmap; + + void update_map(bufferlist& state); + void wait_for_new_map(Message *m); + void handle_osd_map(class MOSDMap *m); - // replica hack + + // __uint64_t last_tid; Mutex replica_write_lock; map replica_write_cond; map > replica_write_tids; map<__uint64_t, MOSDOp*> replica_writes; + // - // global lock - Mutex osd_lock; - - - void update_map(bufferlist& state); - // rg's + // -- replication -- hash_map rg_map; void get_rg_list(list& ls); @@ -204,6 +221,14 @@ class OSD : public Dispatcher { void close_rg(repgroup_t rg); // close in-memory state void remove_rg(repgroup_t rg); // remove state from store + void scan_rg(); + void peer_notify(int primary, list& rg_list); + void peer_start(int replica, set& rg_list); + + void handle_rg_notify(class MOSDRGNotify *m); + void handle_rg_peer(class MOSDRGPeer *m); + void handle_rg_peer_ack(class MOSDRGPeerAck *m); + public: OSD(int id, Messenger *m); ~OSD(); @@ -212,18 +237,10 @@ class OSD : public Dispatcher { int init(); int shutdown(); - // ops - void queue_op(class MOSDOp *m); - void do_op(class MOSDOp *m); - static void doop(OSD *o, MOSDOp *op) { - o->do_op(op); - }; - // messages virtual void dispatch(Message *m); void handle_ping(class MPing *m); - void handle_getmap_ack(class MOSDGetMapAck *m); void handle_op(class MOSDOp *m); void op_read(class MOSDOp *m); void op_write(class MOSDOp *m); diff --git a/ceph/osd/OSDMap.h b/ceph/osd/OSDMap.h index 583822d1ce2c8..33b748838b93c 100644 --- a/ceph/osd/OSDMap.h +++ b/ceph/osd/OSDMap.h @@ -23,7 +23,7 @@ using namespace std; /* * some system constants */ -#define NUM_REPLICA_GROUPS (1<<20) // ~1M +//#define NUM_REPLICA_GROUPS (1<<20) // ~1M #define NUM_RUSH_REPLICAS 4 // this should be big enough to cope w/ failing disks. #define MAX_REPLICAS 3 @@ -44,9 +44,10 @@ class OSDFileLayout { int stripe_size; // stripe unit, in bytes int stripe_count; // over this many objects int object_size; // until objects are this big, then use a new set of objects. + int num_rep; - OSDFileLayout(int ss, int sc, int os) : - stripe_size(ss), stripe_count(sc), object_size(os) { } + OSDFileLayout(int ss, int sc, int os, int nr=2) : + stripe_size(ss), stripe_count(sc), object_size(os), num_rep(nr) { } }; @@ -109,6 +110,7 @@ class OSDMap { Mutex osd_cluster_lock; + public: void init_rush() { // SAB @@ -135,6 +137,7 @@ class OSDMap { } __uint64_t get_version() { return version; } + void inc_version() { version++; } // cluster state bool is_failed(int osd) { return failed_osds.count(osd) ? true:false; } @@ -173,12 +176,18 @@ class OSDMap { /* map (ino, blockno) into a replica group */ repgroup_t file_to_repgroup(inodeno_t ino, - size_t ono) { + size_t ono, + int nrep) { // something simple for now // hash this eventually! - return (ino+ono) % NUM_REPLICA_GROUPS; + return ((ino+ono) % g_conf.osd_num_rg) + + (nrep * g_conf.osd_num_rg); } + /* get nrep from rgid */ + int repgroup_to_nrep(repgroup_t rg) { + return rg / g_conf.osd_num_rg; + } /* map (repgroup) to a list of osds. this is where we invoke RUSH. */ @@ -310,7 +319,7 @@ class OSDMap { else { ex = &object_extents[oid]; ex->oid = oid; - ex->rg = file_to_repgroup( ino, objectno ); + ex->rg = file_to_repgroup( ino, objectno, layout.num_rep ); ex->osd = get_rg_acting_primary( ex->rg ); } diff --git a/ceph/osdc/Filer.cc b/ceph/osdc/Filer.cc index f52f4a72c7c58..f4c8bcc3361a4 100644 --- a/ceph/osdc/Filer.cc +++ b/ceph/osdc/Filer.cc @@ -10,6 +10,7 @@ //#include "messages/MOSDWriteReply.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" +#include "messages/MOSDMap.h" #include "msg/Messenger.h" @@ -48,6 +49,30 @@ void Filer::dispatch(Message *m) } + +void Filer::handle_osd_map(MOSDMap *m) +{ + if (!osdmap || + m->get_version() > osdmap->get_version()) { + if (osdmap) { + dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl; + } else { + dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl; + } + + osdmap->decode(m->get_osdmap()); + + // kick requests who might be timing out on the wrong osds + // ** FIXME ** + + } else { + dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl; + } +} + + + + /* void Filer::queue_outgoing(Message *m, int osd) { diff --git a/ceph/osdc/Filer.h b/ceph/osdc/Filer.h index cf2a5a706371c..37c4004749372 100644 --- a/ceph/osdc/Filer.h +++ b/ceph/osdc/Filer.h @@ -128,6 +128,8 @@ class Filer : public Dispatcher { void handle_osd_read_reply(class MOSDOpReply *m); void handle_osd_write_reply(class MOSDOpReply *m); void handle_osd_op_reply(class MOSDOpReply *m); + + void handle_osd_map(class MOSDMap *m); }; -- 2.39.5