fakefuse: fakefuse.cc mds/allmds.o client/Client.o osd/OSD.o client/fuse.o msg/FakeMessenger.cc msg/CheesySerializer.o ${COMMON_OBJS}
${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@
+testmpi: test/testmpi.cc msg/MPIMessenger.cc ${COMMON_OBJS}
+ ${MPICC} ${CFLAGS} ${LIBS} $^ -o $@
+
+
clean:
rm -f *.o */*.o ${TARGETS} ${TEST_TARGETS}
- log length versus cache size, workload
-notes and todos.
-- SyntheticClient
-
-- virtual mega-filesystem (metadata only)
finish HARD LINKS
mds_bal_replicate_threshold: 500,
mds_bal_unreplicate_threshold: 200,
- mds_bal_interval: 10000,
+ mds_bal_interval: 60, // seconds
mds_commit_on_shutdown: true,
#define dout(x) if (x <= g_conf.debug) cout << "cinode: "
-map<int, int> cinode_pins; // counts
+int cinode_pins[CINODE_NUM_PINS]; // counts
ostream& operator<<(ostream& out, CInode& in)
ostream& operator<<(ostream& out, CInode& in);
-extern map<int, int> cinode_pins; // counts
+extern int cinode_pins[CINODE_NUM_PINS]; // counts
// cached inode wrapper
if (g_conf.log_pins) {
// pin
- for (map<int,int>::iterator it = cinode_pins.begin();
- it != cinode_pins.end();
- it++) {
- //string s = "I";
- //s += cinode_pin_names[it->first];
- mds->logger2->set(//s,
- cinode_pin_names[it->first],
- it->second);
+ for (int i=0; i<CINODE_NUM_PINS; i++) {
+ mds->logger2->set(cinode_pin_names[i],
+ cinode_pins[i]);
}
/*
for (map<int,int>::iterator it = cdir_pins.begin();
mds_paused = false;
stat_ops = 0;
- last_heartbeat = 0;
+ last_balancer_heartbeat = g_clock.gettimepair();
// log
string name;
}
// balance?
+ timepair_t now = g_clock.gettimepair();
if (true && whoami == 0 &&
- stat_ops >= last_heartbeat + g_conf.mds_bal_interval) {
- last_heartbeat = stat_ops;
+ now.first - last_balancer_heartbeat.first >= g_conf.mds_bal_interval) {
+ last_balancer_heartbeat = now;
balancer->send_heartbeat();
}
protected:
__uint64_t stat_ops;
- __uint64_t last_heartbeat;
+ timepair_t last_balancer_heartbeat;
public:
MDS(MDCluster *mdc, int whoami, Messenger *m);
msg_addr_t asker;
object_t oid;
- repgroup_t rg;
+ repgroup_t rg, rg_role;
__uint64_t ocv;
int op;
object_t get_oid() { return st.oid; }
repgroup_t get_rg() { return st.rg; }
+ int get_rg_role() { return st.rg_role; } // who am i asking for?
__uint64_t get_ocv() { return st.ocv; }
int get_op() { return st.op; }
this->st.oid = oid;
this->st.rg = rg;
+ this->st.rg_role = 0;
this->st.ocv = ocv;
this->st.op = op;
}
MOSDOp() {}
+ void set_rg_role(int r) { st.rg_role = r; }
+
void set_length(size_t l) { st.length = l; }
void set_offset(size_t o) { st.offset = o; }
bool mpi_done = false; // set this flag to stop the event loop
-#define FUNNEL_MPI // if we want to funnel mpi through a single thread
+#define FUNNEL_MPI // if we want to funnel mpi through a single thread
#define TAG_UNSOLICITED 0
#define DBLVL 18
if (op->get_ocv() < osdcluster->get_version()) {
// op's is old
dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+ }
+
+
+ // am i the right rg_role?
+ repgroup_t rg = op->get_rg();
+ if (op->get_rg_role() == 0) {
+ // PRIMARY
+
// verify that we are primary, or acting primary
int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
if (acting_primary != whoami) {
logger->inc("fwd");
return;
}
+ } else {
+ // REPLICA
+ int my_role = osdcluster->get_rg_role(rg, whoami);
+
+ dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
+
+ if (my_role != op->get_rg_role()) {
+ assert(0);
+ }
}
+
+
// do the op
switch (op->get_op()) {
class ReplicaGroup {
public:
repgroup_t rg;
- int role; // 1 = primary, 2 = secondary, etc. 0=undef.
+ int role; // 0 = primary, 1 = secondary, etc. 0=undef.
int state;
map<object_t, int> dirty_map; // dirty objects
void enumerate_objects(list<object_t>& ls);
};
+
+
class OSD : public Dispatcher {
protected:
Messenger *messenger;
return -1; // we fail!
}
+ /* what replica # is a given osd? 0 primary, -1 for none. */
+ int get_rg_role(repgroup_t rg, int osd) {
+ int group[NUM_RUSH_REPLICAS];
+ repgroup_to_osds(rg, group, NUM_RUSH_REPLICAS);
+ int role = 0;
+ for (int i=0; i<NUM_RUSH_REPLICAS; i++) {
+ if (failed_osds.count(group[i])) continue;
+ if (group[i] == osd) return role;
+ role++;
+ }
+ return -1; // none
+ }
+
/* map (ino, offset, len) to a (list of) OSDExtents
(byte ranges in objects on osds) */
void file_to_extents(inodeno_t ino,
double myWeight;
RushRNG rng;
+ // There may not be more replicas than servers!
+ assert (nReplicas <= totalServers);
+
for (cluster = nClusters-1; (cluster >= 0) && (replicasLeft > 0); cluster--) {
if (serversInPrevious[cluster] < replicasLeft) {
mustAssign = replicasLeft - serversInPrevious[cluster];
rng.Seed (myhash (key)^cluster, cluster^0xb90738);
numberAssigned = mustAssign +
rng.HyperGeometricWeighted (toDraw, myWeight,
- totalWeightBefore[cluster] + myWeight,
- clusterWeight[cluster]);
+ totalWeightBefore[cluster] + myWeight,
+ clusterWeight[cluster]);
if (numberAssigned > 0) {
rng.Seed (myhash (key)^cluster ^ 11, cluster^0xfea937);
rng.DrawKofN (srv, numberAssigned, clusterSize[cluster]);
for (i = 0; i < numberAssigned; i++) {
- srv[i] += serversInPrevious[cluster];
+ srv[i] += serversInPrevious[cluster];
}
replicasLeft -= numberAssigned;
srv += numberAssigned;
--- /dev/null
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "include/config.h"
+#include "messages/MPing.h"
+
+#include "msg/MPIMessenger.h"
+
+class Pinger : public Dispatcher {
+public:
+ Messenger *messenger;
+ Pinger(Messenger *m) : messenger(m) {
+ m->set_dispatcher(this);
+ }
+ void dispatch(Message *m) {
+ cout << "got incoming " << m << endl;
+ delete m;
+ }
+};
+
+int main(int argc, char **argv) {
+
+
+ int myrank = mpimessenger_init(argc, argv);
+ int world = mpimessenger_world();
+
+ Pinger *p = new Pinger( new MPIMessenger(myrank) );
+
+ mpimessenger_start();
+
+ while (1) {
+
+ // ping random nodes
+ int d = rand() % world;
+ p->messenger->send_message(new MPing(), d);
+
+ }
+
+
+ mpimessenger_wait();
+ mpimessenger_shutdown(); // shutdown MPI
+}