case MSG_OSD_OPREPLY:
filer->handle_osd_op_reply((MOSDOpReply*)m);
break;
+ case MSG_OSD_MAP:
+ filer->handle_osd_map((MOSDMap*)m);
+ break;
// client
case MSG_CLIENT_FILECAPS:
handle_file_caps((MClientFileCaps*)m);
break;
+
default:
cout << "dispatch doesn't recognize message type " << m->get_type() << endl;
assert(0); // fail loudly
+
// -------------------
// fs ops
// --- osd ---
+ osd_num_rg: 10000,
osd_nrep: 1,
osd_fsync: true,
osd_writesync: false,
osd_maxthreads: 10,
+
// --- fakeclient (mds regression testing) ---
g_conf.client_bcache_ttl = atoi(argv[++i]);
+ else if (strcmp(argv[i], "--osd_num_rg") == 0)
+ g_conf.osd_num_rg = atoi(argv[++i]);
else if (strcmp(argv[i], "--osd_nrep") == 0)
g_conf.osd_nrep = atoi(argv[++i]);
else if (strcmp(argv[i], "--osd_fsync") == 0)
bool mds_verify_export_dirauth; // debug flag
// osd
+ int osd_num_rg;
int osd_nrep;
bool osd_fsync;
bool osd_writesync;
assert(s.size() == (unsigned)n);
}
+// list<__uint64_t>
+inline void _encode(list<__uint64_t>& s, bufferlist& bl)
+{
+ int n = s.size();
+ bl.append((char*)&n, sizeof(n));
+ for (list<__uint64_t>::iterator it = s.begin();
+ it != s.end();
+ it++) {
+ __uint64_t v = *it;
+ bl.append((char*)&v, sizeof(v));
+ n--;
+ }
+ assert(n==0);
+}
+inline void _decode(list<__uint64_t>& s, bufferlist& bl, int& off)
+{
+ s.clear();
+ int n;
+ bl.copy(off, sizeof(n), (char*)&n);
+ off += sizeof(n);
+ for (int i=0; i<n; i++) {
+ __uint64_t v;
+ bl.copy(off, sizeof(v), (char*)&v);
+ off += sizeof(v);
+ s.push_back(v);
+ }
+ assert(s.size() == (unsigned)n);
+}
+
#include "messages/MPingAck.h"
#include "messages/MGenericMessage.h"
-#include "messages/MOSDGetMapAck.h"
+#include "messages/MOSDMap.h"
#include "messages/MClientMount.h"
#include "messages/MClientMountAck.h"
}
+void MDS::bcast_osd_map()
+{
+ dout(1) << "bcast_osd_map version " << osdmap->get_version() << endl;
+ assert(get_nodeid() == 0);
+
+ // tell mds
+ for (int i=0; i<get_cluster()->get_num_mds(); i++) {
+ messenger->send_message(new MOSDMap(osdmap),
+ MSG_ADDR_MDS(i));
+ }
+
+ // tell osds
+ set<int> osds;
+ osdmap->get_all_osds(osds);
+ for (set<int>::iterator it = osds.begin();
+ it != osds.end();
+ it++) {
+ messenger->send_message(new MOSDMap(osdmap),
+ MSG_ADDR_OSD(*it));
+ }
+
+ // tell clients
+ for (set<int>::iterator it = mounted_clients.begin();
+ it != mounted_clients.end();
+ it++) {
+ messenger->send_message(new MOSDMap(osdmap),
+ MSG_ADDR_CLIENT(*it));
+ }
+}
+
+
mds_load_t MDS::get_load()
{
case MSG_OSD_OPREPLY:
filer->handle_osd_op_reply((class MOSDOpReply*)m);
return;
+ case MSG_OSD_MAP:
+ handle_osd_map((MOSDMap*)m);
+ return;
case MSG_OSD_GETMAP:
handle_osd_getmap(m);
}
}
+
+ // HACK osd map change
+ if (0) {
+ static int didit = 0;
+ if (whoami == 0 &&
+ elapsed.sec() > 10 && !didit) {
+ didit = 1;
+
+ dout(1) << "changing OSD map, removing one OSD" << endl;
+ osdmap->get_group(0).num_osds--;
+ osdmap->init_rush();
+ osdmap->inc_version();
+
+ // bcast
+ bcast_osd_map();
+ }
+ }
+
}
// HACK to force export to test foreign renames
{
dout(7) << "osd_getmap from " << MSG_ADDR_NICE(m->get_source()) << endl;
- messenger->send_message(new MOSDGetMapAck(osdmap),
+ messenger->send_message(new MOSDMap(osdmap),
m->get_source());
delete m;
}
+void MDS::handle_osd_map(MOSDMap *m)
+{
+ if (!osdmap ||
+ m->get_version() > osdmap->get_version()) {
+ if (osdmap) {
+ dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl;
+ } else {
+ dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl;
+ }
+
+ osdmap->decode(m->get_osdmap());
+
+ // kick requests who might be timing out on the wrong osds
+ // ** FIXME **
+
+ } else {
+ dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
+ }
+}
+
+
void MDS::handle_client_mount(MClientMount *m)
{
// mkfs? (sorta hack!)
int shutdown_start();
int shutdown_final();
+ void bcast_osd_map();
+
// messages
void proc_message(Message *m);
virtual void dispatch(Message *m);
// osds
void handle_osd_getmap(Message *m);
+ void handle_osd_map(class MOSDMap *m);
// clients
void handle_client_mount(class MClientMount *m);
#include "osd/OSDMap.h"
-class MOSDGetMapAck : public Message {
+class MOSDMap : public Message {
bufferlist osdmap;
+ __uint64_t version;
public:
// osdmap
return osdmap;
}
- MOSDGetMapAck(OSDMap *oc) :
- Message(MSG_OSD_GETMAPACK) {
+ __uint64_t get_version() { return version; }
+
+ MOSDMap(OSDMap *oc) :
+ Message(MSG_OSD_MAP) {
oc->encode(osdmap);
+ version = oc->get_version();
}
- MOSDGetMapAck() {}
+ MOSDMap() {}
// marshalling
virtual void decode_payload() {
+ payload.copy(0, sizeof(version), (char*)&version);
+ payload.splice(0, sizeof(version));
osdmap.claim(payload);
}
virtual void encode_payload() {
- payload.claim(osdmap);
+ payload.append((char*)&version, sizeof(version));
+ payload.claim_append(osdmap);
}
virtual char *get_type_name() { return "ogma"; }
--- /dev/null
+#ifndef __MOSDPEER_H
+#define __MOSDPEER_H
+
+#include "msg/Message.h"
+
+
+class MOSDRGNotify : public Message {
+ __uint64_t map_version;
+ list<repgroup_t> rg_list;
+
+ public:
+ __uint64_t get_version() { return map_version; }
+ list<repgroup_t>& get_rg_list() { return rg_list; }
+
+ MOSDRGNotify() {}
+ MOSDRGNotify(__uint64_t v, list<repgroup_t>& l) :
+ Message(MSG_OSD_RG_NOTIFY) {
+ this->map_version = v;
+ rg_list.splice(rg_list.begin(), l);
+ }
+
+ char *get_type_name() { return "RGnot"; }
+
+ void encode_payload() {
+ payload.append((char*)&map_version, sizeof(map_version));
+ _encode(rg_list, payload);
+ }
+ void decode_payload() {
+ int off = 0;
+ payload.copy(off, sizeof(map_version), (char*)&map_version);
+ _decode(rg_list, payload, off);
+ }
+};
+
+#endif
#define MSG_OSD_PING 16
#define MSG_OSD_GETMAP 17
-#define MSG_OSD_GETMAPACK 18
+#define MSG_OSD_MAP 18
+
+#define MSG_OSD_RG_NOTIFY 50
+#define MSG_OSD_RG_PEER 51
+#define MSG_OSD_RG_PEERACK 52
+
#define MSG_CLIENT_REQUEST 20
#define MSG_CLIENT_REPLY 21
#include "messages/MOSDPing.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
-#include "messages/MOSDGetMapAck.h"
+#include "messages/MOSDMap.h"
+#include "messages/MOSDRGNotify.h"
#include "messages/MClientMount.h"
#include "messages/MClientMountAck.h"
case MSG_OSD_OPREPLY:
m = new MOSDOpReply();
break;
- case MSG_OSD_GETMAPACK:
- m = new MOSDGetMapAck();
+ case MSG_OSD_MAP:
+ m = new MOSDMap();
+ break;
+ case MSG_OSD_RG_NOTIFY:
+ m = new MOSDRGNotify();
break;
// clients
#include "messages/MPingAck.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
-#include "messages/MOSDGetMapAck.h"
+#include "messages/MOSDMap.h"
+#include "messages/MOSDRGNotify.h"
#include "common/Logger.h"
#include "common/LogType.h"
-// ------------------------------------
-// replica groups
-
-void OSD::get_rg_list(list<repgroup_t>& ls)
-{
- // just list collections; assume they're all rg's (for now)
- store->list_collections(ls);
-}
-
-
-bool OSD::rg_exists(repgroup_t rg)
-{
- struct stat st;
- if (store->collection_stat(rg, &st) == 0)
- return true;
- else
- return false;
-}
-
-
-RG *OSD::open_rg(repgroup_t rg)
-{
- // already open?
- if (rg_map.count(rg))
- return rg_map[rg];
-
- // stat collection
- RG *r = new RG(rg);
- if (rg_exists(rg)) {
- // exists
- r->fetch(store);
- } else {
- // dne
- r->store(store);
- }
- rg_map[rg] = r;
-
- return r;
-}
-
monitor->proc_message(m);
break;
+ // map and replication
+ case MSG_OSD_MAP:
+ handle_osd_map((MOSDMap*)m);
+ break;
+
+ case MSG_OSD_RG_NOTIFY:
+ handle_rg_notify((MOSDRGNotify*)m);
+ break;
// osd
case MSG_SHUTDOWN:
delete m;
break;
- case MSG_OSD_GETMAPACK:
- handle_getmap_ack((MOSDGetMapAck*)m);
- break;
-
case MSG_PING:
// take note.
monitor->host_is_alive(m->get_source());
+// =====================================================
+// MAP
+
+void OSD::wait_for_new_map(Message *m)
+{
+ // ask MDS
+ messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
+ MSG_ADDR_MDS(0), MDS_PORT_MAIN);
+
+ osd_lock.Lock();
+ waiting_for_osdmap.push_back(m);
+ osd_lock.Unlock();
+}
+
+
+/** update_map
+ * assimilate a new OSDMap. scan rgs.
+ */
void OSD::update_map(bufferlist& state)
{
// decode new map
osdmap->decode(state);
dout(7) << "update_map version " << osdmap->get_version() << endl;
+ // scan known replica groups!
+ scan_rg();
+}
+
+
+void OSD::handle_osd_map(MOSDMap *m)
+{
+ // SAB
+ osd_lock.Lock();
+
+ if (!osdmap ||
+ m->get_version() > osdmap->get_version()) {
+ if (osdmap) {
+ dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl;
+ } else {
+ dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl;
+ }
+
+ update_map(m->get_osdmap());
+ delete m;
+
+ // process waiters
+ list<Message*> waiting;
+ waiting.splice(waiting.begin(), waiting_for_osdmap);
+
+ osd_lock.Unlock();
+
+ for (list<Message*>::iterator it = waiting.begin();
+ it != waiting.end();
+ it++) {
+ dispatch(*it);
+ }
+ } else {
+ dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
+ osd_lock.Unlock();
+ }
+}
+
+
+
+// ======================================================
+// REPLICATION
+
+
+// ------------------------------------
+// replica groups
+
+void OSD::get_rg_list(list<repgroup_t>& ls)
+{
+ // just list collections; assume they're all rg's (for now)
+ store->list_collections(ls);
+}
+
+
+bool OSD::rg_exists(repgroup_t rg)
+{
+ struct stat st;
+ if (store->collection_stat(rg, &st) == 0)
+ return true;
+ else
+ return false;
+}
+
+
+RG *OSD::open_rg(repgroup_t rg)
+{
+ // already open?
+ if (rg_map.count(rg))
+ return rg_map[rg];
+
+ // stat collection
+ RG *r = new RG(rg);
+ if (rg_exists(rg)) {
+ // exists
+ r->fetch(store);
+ } else {
+ // dne
+ r->store(store);
+ }
+ rg_map[rg] = r;
+
+ return r;
+}
+
+
+
+
+
+/**
+ * scan replica groups, initiate any replication
+ * activities.
+ */
+void OSD::scan_rg()
+{
+ //dout(7) << "scan_rg map version " << osdmap->get_version() << endl;
+
// scan replica groups
list<repgroup_t> ls;
get_rg_list(ls);
- map< int, list<RG*> > primary_ping_queue;
+ map< int, list<repgroup_t> > notify_list;
+ map< int, set<RG*> > start_set;
for (list<repgroup_t>::iterator it = ls.begin();
it != ls.end();
if (role != rg->get_role()) {
// role change.
- dout(7) << " rg " << rgid << " acting role change " << rg->get_role() << " -> " << role << endl;
+ dout(10) << " rg " << rgid << " acting role change " << rg->get_role() << " -> " << role << endl;
// am i old-primary?
if (rg->get_role() == 0) {
for (map<int, RGPeer*>::iterator it = rg->get_peers().begin();
it != rg->get_peers().end();
it++) {
- dout(7) << " rg " << rgid << " old-primary, dropping old peer " << it->first << endl;
+ dout(10) << " rg " << rgid << " old-primary, dropping old peer " << it->first << endl;
rg->get_old_replica_set().insert(it->first);
delete it->second;
}
rg->state_clear(RG_STATE_PEERED);
rg->set_role(role);
rg->store(store);
- primary_ping_queue[primary].push_back(rg);
+
+ if (role == 0) {
+ // i am new primary
+
+ } else {
+ // i am replica
+ notify_list[primary].push_back(rgid);
+ }
} else {
// no role change.
// did primary change?
if (primary != rg->get_primary()) {
- dout(7) << " rg " << rgid << " acting primary change " << rg->get_primary() << " -> " << primary << endl;
+ dout(10) << " rg " << rgid << " acting primary change " << rg->get_primary() << " -> " << primary << endl;
// re-peer
rg->state_clear(RG_STATE_PEERED);
rg->set_primary(primary);
rg->store(store);
- primary_ping_queue[primary].push_back(rg);
+ notify_list[primary].push_back(rgid);
}
}
- else if (role == 0) {
- // i am primary.
-
- // check replicas
- for (int r=1; r<nrep; r++) {
- if (rg->get_peer(r) == 0) {
- dout(7) << " rg " << rgid << " primary not peered with replica " << r << " osd" << acting[r] << endl;
-
- // ***
- }
- }
-
+ }
+
+ if (role == 0) {
+ // i am primary.
+
+ // old peers
+ // ***
+
+ // check replicas
+ for (int r=1; r<nrep; r++) {
+ if (rg->get_peer(r) == 0) {
+ dout(10) << " rg " << rgid << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
+ start_set[acting[r]].insert(rg);
+ }
}
}
}
- // initiate any new peering sessions!
- for (map< int, list<RG*> >::iterator pit = primary_ping_queue.begin();
- pit != primary_ping_queue.end();
- pit++) {
- // create peer message
- int primary = pit->first;
-
+ // notify?
+ for (map< int, list<repgroup_t> >::iterator pit = notify_list.begin();
+ pit != notify_list.end();
+ pit++)
+ peer_notify(pit->first, pit->second);
- for (list<RG*>::iterator rit = pit->second.begin();
- rit != pit->second.end();
- rit++) {
- // add this RG to peer message
- }
+ // start peer?
+ for (map< int, set<RG*> >::iterator pit = start_set.begin();
+ pit != start_set.end();
+ pit++)
+ peer_start(pit->first, pit->second);
- // send
-
- }
}
-void OSD::handle_getmap_ack(MOSDGetMapAck *m)
+/** peer_notify
+ * Send an MOSDRGNotify to a primary, with a list of RGs that I have
+ * content for, and they are primary for.
+ */
+void OSD::peer_notify(int primary, list<repgroup_t>& rg_list)
{
- // SAB
- osd_lock.Lock();
+ dout(7) << "peer_notify osd" << primary << " on " << rg_list.size() << " RGs" << endl;
+ MOSDRGNotify *m = new MOSDRGNotify(primary, rg_list);
+ messenger->send_message(m,
+ MSG_ADDR_OSD(primary));
+}
- update_map(m->get_osdmap());
- delete m;
- // process waiters
- list<MOSDOp*> waiting;
- waiting.splice(waiting.begin(), waiting_for_osdmap);
+/** peer_start
+ * initiate a peer session with a replica on given list of RGs
+ */
+void OSD::peer_start(int replica, set<RG*>& rg_set)
+{
+ dout(7) << "peer_start with osd" << replica << " on " << rg_set.size() << " RGs" << endl;
+
+
+}
- list<MOSDOp*> w = waiting;
- osd_lock.Unlock();
- for (list<MOSDOp*>::iterator it = w.begin();
- it != w.end();
+
+
+void OSD::handle_rg_notify(MOSDRGNotify *m)
+{
+ int from = MSG_ADDR_NUM(m->get_source());
+ dout(7) << "handle_rg_notify from osd" << from << endl;
+
+ // older map?
+ if (m->get_version() < osdmap->get_version()) {
+ dout(7) << " from old map version " << m->get_version() << " < " << osdmap->get_version() << endl;
+ delete m; // discard and ignore.*
+ return;
+ }
+
+ // newer map?
+ if (m->get_version() > osdmap->get_version()) {
+ dout(7) << " for newer map version " << m->get_version() << " > " << osdmap->get_version() << endl;
+ wait_for_new_map(m);
+ return;
+ }
+
+ assert(m->get_version() == osdmap->get_version());
+
+ // look for unknown RGs i'm primary for
+ map< int, set<RG*> > start_set;
+
+ for (list<repgroup_t>::iterator it = m->get_rg_list().begin();
+ it != m->get_rg_list().end();
it++) {
- handle_op(*it);
+ repgroup_t rgid = *it;
+
+ int acting[NUM_RUSH_REPLICAS];
+ int nrep = osdmap->repgroup_to_acting_osds(rgid, acting, NUM_RUSH_REPLICAS);
+ assert(acting[0] == whoami);
+
+ // get/open RG
+ RG *rg = open_rg(rgid);
+
+ // previously unknown RG?
+ if (rg->get_peers().empty()) {
+ dout(10) << " rg " << rgid << " is new" << endl;
+ for (int r=1; r<nrep; r++) {
+ if (rg->get_peer(r) == 0) {
+ dout(10) << " rg " << rgid << " primary needs to peer with replica " << r << " osd" << acting[r] << endl;
+ start_set[acting[r]].insert(rg);
+ }
+ }
+ }
+
+ // peered with this guy specifically?
+ RGPeer *rgp = rg->get_peer(from);
+ if (!rgp) {
+ dout(7) << " not yet peered with osd" << from << " on rg " << rgid << endl;
+ start_set[from].insert(rg);
+ }
}
+
+ // start peers?
+ if (start_set.empty()) {
+ dout(7) << " no new peers" << endl;
+ } else {
+ for (map< int, set<RG*> >::iterator pit = start_set.begin();
+ pit != start_set.end();
+ pit++)
+ peer_start(pit->first, pit->second);
+ }
+
+ delete m;
}
+
+
+
+
+// =========================================================
+// OPS
+
+
void OSD::handle_op(MOSDOp *op)
{
// mkfs is special
// no map? starting up?
if (!osdmap) {
osd_lock.Lock();
- dout(7) << "no OSDMap, starting up" << endl;
+ dout(7) << "no OSDMap, asking MDS" << endl;
if (waiting_for_osdmap.empty())
messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
if (op->get_map_version() > osdmap->get_version()) {
// op's is newer
dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl;
-
- // query MDS
- dout(7) << "querying MDS" << endl;
- messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
- MSG_ADDR_MDS(0), MDS_PORT_MAIN);
-
- assert(0);
-
- osd_lock.Lock();
- waiting_for_osdmap.push_back(op);
- osd_lock.Unlock();
+ wait_for_new_map(op);
return;
}
Messenger *messenger;
int whoami;
- class OSDMap *osdmap;
class ObjectStore *store;
class HostMonitor *monitor;
class Logger *logger;
+
+ // global lock
+ Mutex osd_lock;
+
+
+ // -- ops --
class ThreadPool<class OSD, class MOSDOp> *threadpool;
- list<class MOSDOp*> waiting_for_osdmap;
+ void queue_op(class MOSDOp *m);
+ public:
+ void do_op(class MOSDOp *m);
+ static void doop(OSD *o, MOSDOp *op) {
+ o->do_op(op);
+ };
+
+ protected:
+
+ // -- osd map --
+ class OSDMap *osdmap;
+ list<class Message*> waiting_for_osdmap;
+
+ void update_map(bufferlist& state);
+ void wait_for_new_map(Message *m);
+ void handle_osd_map(class MOSDMap *m);
- // replica hack
+
+ // <old replica hack>
__uint64_t last_tid;
Mutex replica_write_lock;
map<MOSDOp*, Cond*> replica_write_cond;
map<MOSDOp*, set<__uint64_t> > replica_write_tids;
map<__uint64_t, MOSDOp*> replica_writes;
+ // </hack>
- // global lock
- Mutex osd_lock;
-
-
- void update_map(bufferlist& state);
- // rg's
+ // -- replication --
hash_map<repgroup_t, RG*> rg_map;
void get_rg_list(list<repgroup_t>& ls);
void close_rg(repgroup_t rg); // close in-memory state
void remove_rg(repgroup_t rg); // remove state from store
+ void scan_rg();
+ void peer_notify(int primary, list<repgroup_t>& rg_list);
+ void peer_start(int replica, set<RG*>& rg_list);
+
+ void handle_rg_notify(class MOSDRGNotify *m);
+ void handle_rg_peer(class MOSDRGPeer *m);
+ void handle_rg_peer_ack(class MOSDRGPeerAck *m);
+
public:
OSD(int id, Messenger *m);
~OSD();
int init();
int shutdown();
- // ops
- void queue_op(class MOSDOp *m);
- void do_op(class MOSDOp *m);
- static void doop(OSD *o, MOSDOp *op) {
- o->do_op(op);
- };
-
// messages
virtual void dispatch(Message *m);
void handle_ping(class MPing *m);
- void handle_getmap_ack(class MOSDGetMapAck *m);
void handle_op(class MOSDOp *m);
void op_read(class MOSDOp *m);
void op_write(class MOSDOp *m);
/*
* some system constants
*/
-#define NUM_REPLICA_GROUPS (1<<20) // ~1M
+//#define NUM_REPLICA_GROUPS (1<<20) // ~1M
#define NUM_RUSH_REPLICAS 4 // this should be big enough to cope w/ failing disks.
#define MAX_REPLICAS 3
int stripe_size; // stripe unit, in bytes
int stripe_count; // over this many objects
int object_size; // until objects are this big, then use a new set of objects.
+ int num_rep;
- OSDFileLayout(int ss, int sc, int os) :
- stripe_size(ss), stripe_count(sc), object_size(os) { }
+ OSDFileLayout(int ss, int sc, int os, int nr=2) :
+ stripe_size(ss), stripe_count(sc), object_size(os), num_rep(nr) { }
};
Mutex osd_cluster_lock;
+ public:
void init_rush() {
// SAB
}
__uint64_t get_version() { return version; }
+ void inc_version() { version++; }
// cluster state
bool is_failed(int osd) { return failed_osds.count(osd) ? true:false; }
/* map (ino, blockno) into a replica group */
repgroup_t file_to_repgroup(inodeno_t ino,
- size_t ono) {
+ size_t ono,
+ int nrep) {
// something simple for now
// hash this eventually!
- return (ino+ono) % NUM_REPLICA_GROUPS;
+ return ((ino+ono) % g_conf.osd_num_rg) +
+ (nrep * g_conf.osd_num_rg);
}
+ /* get nrep from rgid */
+ int repgroup_to_nrep(repgroup_t rg) {
+ return rg / g_conf.osd_num_rg;
+ }
/* map (repgroup) to a list of osds.
this is where we invoke RUSH. */
else {
ex = &object_extents[oid];
ex->oid = oid;
- ex->rg = file_to_repgroup( ino, objectno );
+ ex->rg = file_to_repgroup( ino, objectno, layout.num_rep );
ex->osd = get_rg_acting_primary( ex->rg );
}
//#include "messages/MOSDWriteReply.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
+#include "messages/MOSDMap.h"
#include "msg/Messenger.h"
}
+
+void Filer::handle_osd_map(MOSDMap *m)
+{
+ if (!osdmap ||
+ m->get_version() > osdmap->get_version()) {
+ if (osdmap) {
+ dout(3) << "handle_osd_map got osd map version " << m->get_version() << " > " << osdmap->get_version() << endl;
+ } else {
+ dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl;
+ }
+
+ osdmap->decode(m->get_osdmap());
+
+ // kick requests who might be timing out on the wrong osds
+ // ** FIXME **
+
+ } else {
+ dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
+ }
+}
+
+
+
+
/*
void Filer::queue_outgoing(Message *m, int osd)
{
void handle_osd_read_reply(class MOSDOpReply *m);
void handle_osd_write_reply(class MOSDOpReply *m);
void handle_osd_op_reply(class MOSDOpReply *m);
+
+ void handle_osd_map(class MOSDMap *m);
};