msg/HostMonitor.o\
osd/FakeStore.o\
osd/Filer.o\
- osd/OSDCluster.o\
+ osd/OSDMap.o\
osd/rush.o\
common/Logger.o\
common/Clock.o\
-/- finish hashed_subset notify business
-- hashed readdir
+- test hashed readdir
+- interactive hash/unhash interface
- carefully define/document frozen wrt dir_auth vs hashing
-
-/- lacc
-/- streakwave
-/- mapreduce
-
- make logstream.flush align itself to stipes
MDS TODO
+- fix hashed readdir: should (optionally) do a lock on dir namespace
- fix hard links
- they mostly work, but they're fragile
- sync clients on stat
messenger->set_dispatcher(this);
// osd interfaces
- osdcluster = new OSDCluster(); // initially blank.. see mount()
- filer = new Filer(messenger, osdcluster);
+ osdmap = new OSDMap(); // initially blank.. see mount()
+ filer = new Filer(messenger, osdmap);
}
{
if (messenger) { delete messenger; messenger = 0; }
if (filer) { delete filer; filer = 0; }
- if (osdcluster) { delete osdcluster; osdcluster = 0; }
+ if (osdmap) { delete osdmap; osdmap = 0; }
tear_down_cache();
}
client_lock.Lock();
assert(reply);
- // we got osdcluster!
- osdcluster->decode(reply->get_osd_cluster_state());
+ // we got osdmap!
+ osdmap->decode(reply->get_osd_map_state());
dout(2) << "mounted" << endl;
mounted = true;
#include "Buffercache.h"
#include "mds/MDCluster.h"
-#include "osd/OSDCluster.h"
+#include "osd/OSDMap.h"
#include "msg/Message.h"
#include "msg/Dispatcher.h"
// cluster descriptors
MDCluster *mdcluster;
- OSDCluster *osdcluster;
+ OSDMap *osdmap;
bool mounted;
Filer *filer; // (non-blocking) osd interface
#include "config.h"
-#include "osd/OSDCluster.h"
+#include "osd/OSDMap.h"
//#define MDS_CACHE_SIZE 4*10000 -> <20mb
// waiter export_dir
// trigger handel_export_dir_prep_ack
-#define CDIR_WAIT_HASHED (1<<19) // hash finish
+#define CDIR_WAIT_HASHED (1<<17) // hash finish
+#define CDIR_WAIT_THISHASHEDREADDIR (1<<18) // current readdir lock
+#define CDIR_WAIT_NEXTHASHEDREADDIR (1<<19) // after current readdir lock finishes
#define CDIR_WAIT_DNREAD (1<<20)
#define CDIR_WAIT_DNLOCK (1<<21)
// hashed dirs
set<int> hashed_subset; // HASHING: subset of mds's that are hashed
+ public:
+ // for class MDS
+ map<int, list<class c_inode_info*> > hashed_readdir;
+ protected:
// context
MDS *mds;
#include "events/EAlloc.h"
#include "osd/Filer.h"
-#include "osd/OSDCluster.h"
+#include "osd/OSDMap.h"
#include "include/types.h"
#include "msg/Messenger.h"
-#include "osd/OSDCluster.h"
+#include "osd/OSDMap.h"
#include "osd/Filer.h"
#include "MDS.h"
#include "messages/MPingAck.h"
#include "messages/MGenericMessage.h"
-#include "messages/MOSDGetClusterAck.h"
+#include "messages/MOSDGetMapAck.h"
#include "messages/MClientMount.h"
#include "messages/MClientMountAck.h"
#include "messages/MClientRequest.h"
#include "messages/MClientReply.h"
+#include "messages/MHashReaddir.h"
+#include "messages/MHashReaddirReply.h"
#include "messages/MLock.h"
osdmonitor = new OSDMonitor(this);
- // <HACK set up OSDCluster from g_conf>
- osdcluster = new OSDCluster();
+ // <HACK set up OSDMap from g_conf>
+ osdmap = new OSDMap();
OSDGroup osdg;
osdg.num_osds = g_conf.num_osd;
for (int i=0; i<osdg.num_osds; i++) osdg.osds.push_back(i);
osdg.weight = 100;
osdg.osd_size = 100; // not used yet?
- osdcluster->add_group(osdg);
+ osdmap->add_group(osdg);
// </HACK>
- filer = new Filer(messenger, osdcluster);
+ filer = new Filer(messenger, osdmap);
mdlog->set_max_events(g_conf.mds_log_max_len);
if (osdmonitor) { delete osdmonitor; osdmonitor = 0; }
if (idalloc) { delete idalloc; idalloc = NULL; }
if (anchormgr) { delete anchormgr; anchormgr = NULL; }
- if (osdcluster) { delete osdcluster; osdcluster = 0; }
+ if (osdmap) { delete osdmap; osdmap = 0; }
if (filer) { delete filer; filer = 0; }
if (messenger) { delete messenger; messenger = NULL; }
l.root_pop = mdcache->get_root()->popularity[MDS_POP_ANYDOM].get();
} else
l.root_pop = 0;
- l.req_rate = stat_req.get();
- l.rd_rate = stat_read.get();
- l.wr_rate = stat_write.get();
return l;
}
filer->handle_osd_op_reply((class MOSDOpReply*)m);
return;
- case MSG_OSD_GETCLUSTER:
- handle_osd_getcluster(m);
+ case MSG_OSD_GETMAP:
+ handle_osd_getmap(m);
return;
// MDS
case MSG_CLIENT_REQUEST:
handle_client_request((MClientRequest*)m);
return;
+
+ case MSG_MDS_HASHREADDIR:
+ handle_hash_readdir((MHashReaddir*)m);
+ return;
+ case MSG_MDS_HASHREADDIRREPLY:
+ handle_hash_readdir_reply((MHashReaddirReply*)m);
+ return;
+
}
dout(1) << " main unknown message " << m->get_type() << endl;
num_bal_times--;
}
- static map<int,int> didhash;
- if (0 && elapsed.sec() > 15 && !didhash[whoami]) {
- CInode *in = mdcache->get_inode(100000010);
- if (in && in->dir) {
- if (in->dir->is_auth())
- mdcache->hash_dir(in->dir);
- didhash[whoami] = 1;
+
+ // HACK to test hashing stuff
+ if (1) {
+ static map<int,int> didhash;
+ if (elapsed.sec() > 15 && !didhash[whoami]) {
+ CInode *in = mdcache->get_inode(100000010);
+ if (in && in->dir) {
+ if (in->dir->is_auth())
+ mdcache->hash_dir(in->dir);
+ didhash[whoami] = 1;
+ }
}
- }
- if (0 && elapsed.sec() > 25 && didhash[whoami] == 1) {
- CInode *in = mdcache->get_inode(100000010);
- if (in && in->dir) {
- if (in->dir->is_auth())
- mdcache->unhash_dir(in->dir);
- didhash[whoami] = 2;
+ if (0 && elapsed.sec() > 25 && didhash[whoami] == 1) {
+ CInode *in = mdcache->get_inode(100000010);
+ if (in && in->dir) {
+ if (in->dir->is_auth() && in->dir->is_hashed())
+ mdcache->unhash_dir(in->dir);
+ didhash[whoami] = 2;
+ }
}
}
-
}
- // hack
+ // HACK to force export to test foreign renames
if (false && whoami == 0) {
static bool didit = false;
}
}
+
+
// shut down?
if (shutting_down && !shut_down) {
if (mdcache->shutdown_pass()) {
}
-void MDS::handle_osd_getcluster(Message *m)
+void MDS::handle_osd_getmap(Message *m)
{
- dout(7) << "osd_getcluster from " << MSG_ADDR_NICE(m->get_source()) << endl;
+ dout(7) << "osd_getmap from " << MSG_ADDR_NICE(m->get_source()) << endl;
- messenger->send_message(new MOSDGetClusterAck(osdcluster),
+ messenger->send_message(new MOSDGetMapAck(osdmap),
m->get_source());
delete m;
}
// ack
- messenger->send_message(new MClientMountAck(m, osdcluster),
+ messenger->send_message(new MClientMountAck(m, osdmap),
m->get_source(), m->get_source_port());
delete m;
}
mdcache->inode_soft_read_finish(ref);
- stat_read.hit();
- stat_req.hit();
-
balancer->hit_inode(ref);
// reply
// DIRECTORY and NAMESPACE OPS
+// READDIR
-void MDS::encode_dir_contents(CDir *dir, list<c_inode_info*>& items, int& numfiles)
+int MDS::encode_dir_contents(CDir *dir, list<c_inode_info*>& items)
{
+ int numfiles = 0;
+
for (CDir_map_t::iterator it = dir->begin();
it != dir->end();
it++) {
items.push_back( new c_inode_info(in, whoami, it->first) );
numfiles++;
}
+ return numfiles;
+}
+
+
+/*
+ * note: this is pretty sloppy, but should work just fine i think...
+ */
+void MDS::handle_hash_readdir(MHashReaddir *m)
+{
+ CInode *cur = mdcache->get_inode(m->get_ino());
+ assert(cur);
+
+ if (!cur->dir ||
+ !cur->dir->is_hashed()) {
+ assert(0);
+ dout(7) << "handle_hash_readdir don't have dir open, or not hashed. giving up!" << endl;
+ delete m;
+ return;
+ }
+ CDir *dir = cur->dir;
+ assert(dir);
+ assert(dir->is_hashed());
+
+ // complete?
+ if (!dir->is_complete()) {
+ dout(10) << " incomplete dir contents for readdir on " << *dir << ", fetching" << endl;
+ mdstore->fetch_dir(dir, new C_MDS_RetryMessage(this, m));
+ return;
+ }
+
+ // get content
+ list<c_inode_info*> items;
+ encode_dir_contents(dir, items);
+
+ // sent it back!
+ messenger->send_message(new MHashReaddirReply(dir->ino(), items),
+ m->get_source(), MDS_PORT_CACHE, MDS_PORT_CACHE);
+}
+
+
+void MDS::handle_hash_readdir_reply(MHashReaddirReply *m)
+{
+ CInode *cur = mdcache->get_inode(m->get_ino());
+ assert(cur);
+
+ if (!cur->dir ||
+ !cur->dir->is_hashed()) {
+ assert(0);
+ dout(7) << "handle_hash_readdir don't have dir open, or not hashed. giving up!" << endl;
+ delete m;
+ return;
+ }
+ CDir *dir = cur->dir;
+ assert(dir);
+ assert(dir->is_hashed());
+
+ // move items to hashed_readdir gather
+ int from = m->get_source();
+ assert(dir->hashed_readdir.count(from) == 0);
+ dir->hashed_readdir[from].splice(dir->hashed_readdir[from].begin(),
+ m->get_items());
+ delete m;
+
+ // gather finished?
+ if (dir->hashed_readdir.size() < (unsigned)get_cluster()->get_num_mds()) {
+ dout(7) << "still waiting for more hashed readdir bits" << endl;
+ return;
+ }
+
+ dout(7) << "got last bit! finishing waiters" << endl;
+
+ // do these finishers. they'll copy the results.
+ list<Context*> finished;
+ dir->take_waiting(CDIR_WAIT_THISHASHEDREADDIR, finished);
+ finish_contexts(finished);
+
+ // now discard these results
+ for (map<int, list<c_inode_info*> >::iterator it = dir->hashed_readdir.begin();
+ it != dir->hashed_readdir.end();
+ it++) {
+ for (list<c_inode_info*>::iterator ci = it->second.begin();
+ ci != it->second.end();
+ ci++)
+ delete *ci;
+ }
+ dir->hashed_readdir.clear();
+
+ // unpin dir (we're done!)
+ dir->auth_unpin();
+
+ // trigger any waiters for next hashed readdir cycle
+ dir->take_waiting(CDIR_WAIT_NEXTHASHEDREADDIR, finished_queue);
+}
+
+
+class C_MDS_HashReaddir : public Context {
+ MDS *mds;
+ MClientRequest *req;
+ CDir *dir;
+public:
+ C_MDS_HashReaddir(MDS *mds, MClientRequest *req, CDir *dir) {
+ this->mds = mds;
+ this->req = req;
+ this->dir = dir;
+ }
+ void finish(int r) {
+ mds->finish_hash_readdir(req, dir);
+ }
+};
+
+void MDS::finish_hash_readdir(MClientRequest *req, CDir *dir)
+{
+ dout(7) << "finish_hash_readdir on " << *dir << endl;
+
+ assert(dir->is_hashed());
+ assert(dir->hashed_readdir.size() == (unsigned)get_cluster()->get_num_mds());
+
+ // reply!
+ MClientReply *reply = new MClientReply(req);
+ reply->set_result(0);
+
+ for (int i=0; i<get_cluster()->get_num_mds(); i++) {
+ reply->copy_dir_items(dir->hashed_readdir[i]);
+ }
+
+ // ok!
+ reply_request(req, reply, dir->inode);
}
return;
}
-
// auth?
if (!cur->dir_is_auth()) {
int dirauth = cur->authority();
return;
assert(cur->dir->is_auth());
+ // unhashing? wait!
+ if (cur->dir->is_hashed() &&
+ cur->dir->is_unhashing()) {
+ dout(10) << "unhashing, waiting" << endl;
+ cur->dir->add_waiter(CDIR_WAIT_UNFREEZE,
+ new C_MDS_RetryRequest(this, req, cur));
+ return;
+ }
+
// check perm
if (!mdcache->inode_hard_read_start(cur,req))
return;
mdcache->inode_hard_read_finish(cur);
+ CDir *dir = cur->dir;
+ assert(dir);
- if (!cur->dir->is_complete()) {
+ if (!dir->is_complete()) {
// fetch
dout(10) << " incomplete dir contents for readdir on " << *cur->dir << ", fetching" << endl;
- mdstore->fetch_dir(cur->dir, new C_MDS_RetryRequest(this, req, cur));
+ mdstore->fetch_dir(dir, new C_MDS_RetryRequest(this, req, cur));
return;
}
-
+
+ if (dir->is_hashed()) {
+ // HASHED
+ dout(7) << "hashed dir" << endl;
+ if (!dir->can_auth_pin()) {
+ dout(7) << "can't auth_pin dir " << *dir << " waiting" << endl;
+ dir->add_waiter(CDIR_WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(this, req, cur));
+ return;
+ }
+
+ if (!dir->hashed_readdir.empty()) {
+ dout(7) << "another readdir gather in progres, waiting" << endl;
+ dir->add_waiter(CDIR_WAIT_NEXTHASHEDREADDIR, new C_MDS_RetryRequest(this, req, cur));
+ return;
+ }
+
+ // start new readdir gather
+ dout(7) << "staring new hashed readdir gather" << endl;
+
+ // pin auth for process!
+ dir->auth_pin();
+
+ // get local bits
+ encode_dir_contents(cur->dir, dir->hashed_readdir[whoami]);
+
+ // request other bits
+ for (int i=0; i<get_cluster()->get_num_mds(); i++) {
+ if (i == get_nodeid()) continue;
+ messenger->send_message(new MHashReaddir(dir->ino()),
+ MSG_ADDR_MDS(i), MDS_PORT_SERVER, MDS_PORT_SERVER);
+ }
+
+ // wait
+ dir->add_waiter(CDIR_WAIT_THISHASHEDREADDIR,
+ new C_MDS_HashReaddir(this, req, dir));
+ return;
+ }
+
+ // NON-HASHED
// build dir contents
list<c_inode_info*> items;
- int numfiles = 0;
- encode_dir_contents(cur->dir, items, numfiles);
+ int numfiles = encode_dir_contents(cur->dir, items);
// yay, reply
MClientReply *reply = new MClientReply(req);
dout(10) << "reply to " << *req << " readdir " << numfiles << " files" << endl;
reply->set_result(0);
- stat_read.hit();
- stat_req.hit();
-
balancer->hit_dir(cur->dir);
// reply
dn->mark_dirty();
newi->mark_dirty();
+ // journal it
+ mdlog->submit_entry(new EInodeUpdate(newi)); // FIXME WRONG EVENT
+
// ok!
return newi;
}
class filepath;
-class OSDCluster;
+class OSDMap;
class Filer;
class AnchorTable;
class Message;
class MClientRequest;
class MClientReply;
+class MHashReaddir;
+class MHashReaddirReply;
class MDBalancer;
class LogEvent;
class IdAllocator;
MDCluster *mdcluster;
public:
- OSDCluster *osdcluster;
+ OSDMap *osdmap;
Filer *filer; // for reading/writing to/from osds
AnchorTable *anchormgr;
OSDMonitor *osdmonitor;
friend class MDStore;
// stats
- DecayCounter stat_req;
- DecayCounter stat_read;
- DecayCounter stat_write;
-
set<int> mounted_clients;
int get_nodeid() { return whoami; }
MDCluster *get_cluster() { return mdcluster; }
MDCluster *get_mds_cluster() { return mdcluster; }
- OSDCluster *get_osd_cluster() { return osdcluster; }
+ OSDMap *get_osd_map() { return osdmap; }
mds_load_t get_load();
void handle_shutdown_finish(Message *m);
// osds
- void handle_osd_getcluster(Message *m);
+ void handle_osd_getmap(Message *m);
// clients
void handle_client_mount(class MClientMount *m);
MClientReply *reply,
CInode *ref);
- // namespace
- void encode_dir_contents(CDir *dir, list<class c_inode_info*>& items, int& numfiles);
+ // readdir
void handle_client_readdir(MClientRequest *req, CInode *ref);
+ int encode_dir_contents(CDir *dir, list<class c_inode_info*>& items);
+ void handle_hash_readdir(MHashReaddir *m);
+ void handle_hash_readdir_reply(MHashReaddirReply *m);
+ void finish_hash_readdir(MClientRequest *req, CDir *dir);
+
+ // namespace changes
void handle_client_mknod(MClientRequest *req, CInode *ref);
void handle_client_link(MClientRequest *req, CInode *ref);
void handle_client_link_2(int r, MClientRequest *req, CInode *ref, vector<CDentry*>& trace);
#include "MDCluster.h"
#include "osd/Filer.h"
-#include "osd/OSDCluster.h"
+#include "osd/OSDMap.h"
#include "msg/Message.h"
#include "OSDMonitor.h"
#include "MDS.h"
-#include "osd/OSDCluster.h"
+#include "osd/OSDMap.h"
#include "msg/Message.h"
#include "msg/Messenger.h"
-#ifndef __MOSDGETCLUSTERACK_H
-#define __MOSDGETCLUSTERACK_H
+#ifndef __MOSDGETMAPACK_H
+#define __MOSDGETMAPACK_H
#include "msg/Message.h"
#include "osd/OSDMap.h"
-class MOSDGetClusterAck : public Message {
+class MOSDGetMapAck : public Message {
bufferlist osdmap;
public:
return osdmap;
}
- MOSDGetClusterAck(OSDMap *oc) :
- Message(MSG_OSD_GETCLUSTERACK) {
+ MOSDGetMapAck(OSDMap *oc) :
+ Message(MSG_OSD_GETMAPACK) {
oc->encode(osdmap);
}
- MOSDGetClusterAck() {}
+ MOSDGetMapAck() {}
// marshalling
payload.claim(osdmap);
}
- virtual char *get_type_name() { return "ogca"; }
+ virtual char *get_type_name() { return "ogma"; }
};
#endif
#define __MOSDOPREPLY_H
#include "msg/Message.h"
-#include "osd/OSDCluster.h"
+#include "osd/OSDMap.h"
#include "MOSDOp.h"
class MOSDOpReply : public Message {
MOSDOpReply_st st;
bufferlist data;
- bufferlist osdcluster;
+ bufferlist osdmap;
public:
long get_tid() { return st.tid; }
return data;
}
- // osdcluster
+ // osdmap
__uint64_t get_ocv() { return st._new_ocv; }
- bufferlist& get_osdcluster() {
- return osdcluster;
+ bufferlist& get_osdmap() {
+ return osdmap;
}
// keep a pcid (procedure call id) to match up request+reply
void set_pcid(long pcid) { this->st.pcid = pcid; }
long get_pcid() { return st.pcid; }
- MOSDOpReply(MOSDOp *req, int result, OSDCluster *oc) :
+ MOSDOpReply(MOSDOp *req, int result, OSDMap *oc) :
Message(MSG_OSD_OPREPLY) {
memset(&st, 0, sizeof(st));
this->st.pcid = req->st.pcid;
// attach updated cluster spec?
if (req->get_ocv() < oc->get_version()) {
- oc->encode(osdcluster);
+ oc->encode(osdmap);
st._new_ocv = oc->get_version();
- st._oc_len = osdcluster.length();
+ st._oc_len = osdmap.length();
}
}
MOSDOpReply() {}
payload.copy(0, sizeof(st), (char*)&st);
payload.splice(0, sizeof(st));
if (st._data_len) payload.splice(0, st._data_len, &data);
- if (st._oc_len) payload.splice(0, st._oc_len, &osdcluster);
+ if (st._oc_len) payload.splice(0, st._oc_len, &osdmap);
}
virtual void encode_payload() {
payload.push_back( new buffer((char*)&st, sizeof(st)) );
payload.claim_append( data );
- payload.claim_append( osdcluster );
+ payload.claim_append( osdmap );
}
virtual char *get_type_name() { return "oopr"; }
#define MSG_OSD_OPREPLY 15 // delete, etc.
#define MSG_OSD_PING 16
-#define MSG_OSD_GETCLUSTER 17
-#define MSG_OSD_GETCLUSTERACK 18
+#define MSG_OSD_GETMAP 17
+#define MSG_OSD_GETMAPACK 18
#define MSG_CLIENT_REQUEST 20
#define MSG_CLIENT_REPLY 21
#define MSG_MDS_HASHDIR 164
#define MSG_MDS_HASHDIRACK 165
#define MSG_MDS_HASHDIRNOTIFY 166
-#define MSG_MDS_HASHDIRFINISH 167
+
+#define MSG_MDS_HASHREADDIR 168
+#define MSG_MDS_HASHREADDIRREPLY 169
#define MSG_MDS_UNHASHDIRPREP 170
#define MSG_MDS_UNHASHDIRPREPACK 171
#include "messages/MOSDPing.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
-#include "messages/MOSDGetClusterAck.h"
+#include "messages/MOSDGetMapAck.h"
#include "messages/MClientMount.h"
#include "messages/MClientMountAck.h"
case MSG_OSD_OPREPLY:
m = new MOSDOpReply();
break;
- case MSG_OSD_GETCLUSTERACK:
- m = new MOSDGetClusterAck();
+ case MSG_OSD_GETMAPACK:
+ m = new MOSDGetMapAck();
break;
// clients
case MSG_MDS_SHUTDOWNFINISH:
case MSG_SHUTDOWN:
case MSG_CLIENT_UNMOUNT:
- case MSG_OSD_GETCLUSTER:
+ case MSG_OSD_GETMAP:
m = new MGenericMessage(env.type);
break;
#include "include/types.h"
#include "OSD.h"
-#include "OSDCluster.h"
+#include "OSDMap.h"
#ifdef USE_OBFS
# include "OBFSStore.h"
#include "messages/MPingAck.h"
#include "messages/MOSDOp.h"
#include "messages/MOSDOpReply.h"
-#include "messages/MOSDGetClusterAck.h"
+#include "messages/MOSDGetMapAck.h"
#include "common/Logger.h"
#include "common/LogType.h"
messenger = m;
messenger->set_dispatcher(this);
- osdcluster = 0;
+ osdmap = 0;
last_tid = 0;
OSD::~OSD()
{
if (threadpool) { delete threadpool; threadpool = 0; }
- if (osdcluster) { delete osdcluster; osdcluster = 0; }
+ if (osdmap) { delete osdmap; osdmap = 0; }
if (monitor) { delete monitor; monitor = 0; }
if (messenger) { delete messenger; messenger = 0; }
if (logger) { delete logger; logger = 0; }
delete m;
break;
- case MSG_OSD_GETCLUSTERACK:
- handle_getcluster_ack((MOSDGetClusterAck*)m);
+ case MSG_OSD_GETMAPACK:
+ handle_getmap_ack((MOSDGetMapAck*)m);
break;
case MSG_PING:
}
-void OSD::handle_getcluster_ack(MOSDGetClusterAck *m)
+void OSD::handle_getmap_ack(MOSDGetMapAck *m)
{
// SAB
osd_lock.Lock();
- if (!osdcluster) osdcluster = new OSDCluster();
- osdcluster->decode(m->get_osdcluster());
- dout(7) << "got OSDCluster version " << osdcluster->get_version() << endl;
+ if (!osdmap) osdmap = new OSDMap();
+ osdmap->decode(m->get_osdmap());
+ dout(7) << "got OSDMap version " << osdmap->get_version() << endl;
delete m;
// process waiters
list<MOSDOp*> waiting;
- waiting.splice(waiting.begin(), waiting_for_osdcluster);
+ waiting.splice(waiting.begin(), waiting_for_osdmap);
for (list<MOSDOp*>::iterator it = waiting.begin();
it != waiting.end();
{
// starting up?
- if (!osdcluster) {
+ if (!osdmap) {
// SAB
osd_lock.Lock();
- dout(7) << "no OSDCluster, starting up" << endl;
- if (waiting_for_osdcluster.empty())
- messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER),
+ dout(7) << "no OSDMap, starting up" << endl;
+ if (waiting_for_osdmap.empty())
+ messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
- waiting_for_osdcluster.push_back(op);
+ waiting_for_osdmap.push_back(op);
// SAB
osd_lock.Unlock();
// check cluster version
- if (op->get_ocv() > osdcluster->get_version()) {
+ if (op->get_ocv() > osdmap->get_version()) {
// op's is newer
- dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+ dout(7) << "op cluster " << op->get_ocv() << " > " << osdmap->get_version() << endl;
// query MDS
dout(7) << "querying MDS" << endl;
- messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER),
+ messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP),
MSG_ADDR_MDS(0), MDS_PORT_MAIN);
assert(0);
// SAB
osd_lock.Lock();
- waiting_for_osdcluster.push_back(op);
+ waiting_for_osdmap.push_back(op);
// SAB
osd_lock.Unlock();
return;
}
- if (op->get_ocv() < osdcluster->get_version()) {
+ if (op->get_ocv() < osdmap->get_version()) {
// op's is old
- dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl;
+ dout(7) << "op cluster " << op->get_ocv() << " > " << osdmap->get_version() << endl;
}
// PRIMARY
// verify that we are primary, or acting primary
- int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() );
+ int acting_primary = osdmap->get_rg_acting_primary( op->get_rg() );
if (acting_primary != whoami) {
dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl;
messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0);
}
} else {
// REPLICA
- int my_role = osdcluster->get_rg_role(rg, whoami);
+ int my_role = osdmap->get_rg_role(rg, whoami);
dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl;
r->get_length(), r->get_offset(),
bptr.c_str());
// set up reply
- MOSDOpReply *reply = new MOSDOpReply(r, 0, osdcluster);
+ MOSDOpReply *reply = new MOSDOpReply(r, 0, osdmap);
if (got >= 0) {
bptr.set_length(got); // properly size the buffer
if (op->get_rg_nrep() > 1) {
dout(7) << "op_write nrep=" << op->get_rg_nrep() << endl;
int reps[op->get_rg_nrep()];
- osdcluster->repgroup_to_osds(op->get_rg(),
+ osdmap->repgroup_to_osds(op->get_rg(),
reps,
op->get_rg_nrep());
messenger->get_myaddr(),
op->get_oid(),
op->get_rg(),
- osdcluster->get_version(),
+ osdmap->get_version(),
op->get_op());
wr->get_data() = op->get_data(); // copy bufferlist
messenger->send_message(wr, MSG_ADDR_OSD(reps[i]));
}
// reply
- MOSDOpReply *reply = new MOSDOpReply(op, 0, osdcluster);
+ MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap);
messenger->send_message(reply, op->get_asker());
delete op;
dout(3) << "MKFS" << endl;
{
int r = store->mkfs();
- messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+ messenger->send_message(new MOSDOpReply(op, r, osdmap), op->get_asker());
}
delete op;
}
dout(3) << "delete on " << op->get_oid() << " r = " << r << endl;
// "ack"
- messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+ messenger->send_message(new MOSDOpReply(op, r, osdmap), op->get_asker());
logger->inc("rm");
delete op;
dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl;
// "ack"
- messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker());
+ messenger->send_message(new MOSDOpReply(op, r, osdmap), op->get_asker());
logger->inc("trunc");
dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl;
- MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster);
+ MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap);
reply->set_object_size(st.st_size);
messenger->send_message(reply, op->get_asker());
class Messenger;
class Message;
+typedef __uint64_t version_t;
-// ways to be dirty
-#define RG_DIRTY_LOCAL_LOG 1
-#define RG_DIRTY_LOCAL_SYNC 2
-#define RG_DIRTY_REPLICA_MEM 4
-#define RG_DIRTY_REPLICA_SYNC 8
+/** RGImport
+ * state associated with import of RG contents from another
+ * OSD.
+ */
+#define RG_IMPORT_STATE_STARTING 1 // fetching object, delete lists.
+#define RG_IMPORT_STATE_IMPORTING 2 // fetching replicas.
+#define RG_IMPORT_STATE_FINISHING 3 // got everything; telling old guy to hose residual state
-class ReplicaGroup {
+struct RGImport {
+ int peer; // the peer
+ int peer_role; // peer's role. if <0, we should delete strays.
+ int import_state;
+
+ map<object_t,version_t> remaining_objects; // remote object list
+ map<object_t,version_t> stray_objects; // imported but not deleted.
+
+ // FIXME: add destructive vs non-destructive. maybe peer is a replica!
+};
+
+
+/** RGPeer
+ * state associated with (possibly old) RG peers.
+ * only used by primary?
+ *
+ */
+
+// by primary
+#define RG_PEER_STATE_ACTIVE 1 // active peer
+#define RG_PEER_STATE_COMPLETE 2 // peer has everything replicated
+
+struct RGPeer {
+ int peer;
+ int role; // 0 primary, 1+ replica, -1 residual
+ int state;
+
+ // used by primary for syncing (old) replicas
+ map<object_t,version_t> objects; // remote object list
+ map<object_t,version_t> deleted; // remote delete list
+ map<object_t,version_t> fetching; // objects i'm reading from replica
+ map<object_t,version_t> stray; // objects that need to be deleted
+
+ // used by primary for normal replication stuff
+ map<object_t,version_t> writing; // objects i've written to replica
+ map<object_t,version_t> flushing; // objects i've written to remote buffer cache only
+};
+
+
+
+/** RG - Replica Group
+ *
+ */
+
+// bits used on any
+#define RG_STATE_COMPLETE 1 // i have full RG contents locally.
+#define RG_STATE_PEERED 2 // i have contacted prior primary and all
+ // replica osds and/or fetched their
+ // content lists, and thus know what's up.
+ // or, i have check in w/ new primary (on replica)
+
+// on primary or old-primary only
+#define RG_STATE_CLEAN 4 // i am fully replicated
+
+class RG {
public:
repgroup_t rg;
- int role; // 0 = primary, 1 = secondary, etc. 0=undef.
+ int role; // 0 = primary, 1 = secondary, etc. -1=undef/none.
int state;
-
- map<object_t, int> dirty_map; // dirty objects
- ReplicaGroup(repgroup_t rg);
+ map<int, RGPeer> peers;
+ // for unstable states,
+ map<object_t, version_t> deleted_objects; // locally
+
+ public:
+ RG(repgroup_t rg);
+
+ repgroup_t get_rg() { return rg; }
+ int get_role() { return role; }
+ int get_state() { return state; }
+
void enumerate_objects(list<object_t>& ls);
};
+/** Onode
+ * per-object OSD metadata
+ */
+class Onode {
+ object_t oid;
+ version_t version;
+
+ map<int, version_t> stray_replicas; // osds w/ stray replicas.
+
+ public:
+
+};
+
class OSD : public Dispatcher {
protected:
Messenger *messenger;
int whoami;
- class OSDCluster *osdcluster;
+ class OSDMap *osdmap;
class ObjectStore *store;
class HostMonitor *monitor;
class Logger *logger;
class ThreadPool<class OSD, class MOSDOp> *threadpool;
- list<class MOSDOp*> waiting_for_osdcluster;
+ list<class MOSDOp*> waiting_for_osdmap;
// replica hack
__uint64_t last_tid;
int init();
int shutdown();
- // OSDCluster
- void update_osd_cluster(__uint64_t ocv, bufferlist& blist);
+ // OSDMap
+ void update_osd_map(__uint64_t ocv, bufferlist& blist);
void queue_op(class MOSDOp *m);
void do_op(class MOSDOp *m);
virtual void dispatch(Message *m);
void handle_ping(class MPing *m);
- void handle_getcluster_ack(class MOSDGetClusterAck *m);
+ void handle_getmap_ack(class MOSDGetMapAck *m);
void handle_op(class MOSDOp *m);
void op_read(class MOSDOp *m);
void op_write(class MOSDOp *m);
-#include "OSDCluster.h"
+#include "OSDMap.h"
// serialize/unserialize
-void OSDCluster::encode(bufferlist& blist)
+void OSDMap::encode(bufferlist& blist)
{
blist.append((char*)&version, sizeof(version));
_encode(failed_osds, blist);
}
-void OSDCluster::decode(bufferlist& blist)
+void OSDMap::decode(bufferlist& blist)
{
int off = 0;
blist.copy(off, sizeof(version), (char*)&version);
init_rush();
}
+
-#ifndef __OSDCLUSTER_H
-#define __OSDCLUSTER_H
+#ifndef __OSDMAP_H
+#define __OSDMAP_H
/*
* describe properties of the OSD cluster.
};
-/** OSDCluster
+/** OSDMap
*/
-class OSDCluster {
+class OSDMap {
__uint64_t version; // what version of the osd cluster descriptor is this
// RUSH disk groups
public:
- OSDCluster() : version(0), rush(0) { }
- ~OSDCluster() {
+ OSDMap() : version(0), rush(0) { }
+ ~OSDMap() {
if (rush) { delete rush; rush = 0; }
}
#include <assert.h>
#include "Filer.h"
-#include "OSDCluster.h"
+#include "OSDMap.h"
//#include "messages/MOSDRead.h"
//#include "messages/MOSDReadReply.h"
-Filer::Filer(Messenger *m, OSDCluster *o)
+Filer::Filer(Messenger *m, OSDMap *o)
{
last_tid = 0;
messenger = m;
- osdcluster = o;
+ osdmap = o;
}
Filer::~Filer()
p->onfinish = onfinish;
// map buffer into OSD extents
- osdcluster->file_to_extents(ino, layout, len, offset, p->extents);
+ osdmap->file_to_extents(ino, layout, len, offset, p->extents);
dout(7) << "osd read ino " << ino << " len " << len << " off " << offset << " in " << p->extents.size() << " object extents" << endl;
// issue read
MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
- it->oid, it->rg, osdcluster->get_version(),
+ it->oid, it->rg, osdmap->get_version(),
OSD_OP_READ);
m->set_length(it->len);
m->set_offset(it->offset);
// find data
list<OSDExtent> extents;
- osdcluster->file_to_extents(ino, layout, len, offset, extents);
+ osdmap->file_to_extents(ino, layout, len, offset, extents);
dout(7) << "osd write ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl;
// issue write
MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
- it->oid, it->rg, osdcluster->get_version(),
+ it->oid, it->rg, osdmap->get_version(),
OSD_OP_WRITE);
m->set_length(it->len);
m->set_offset(it->offset);
{
// updated cluster info?
if (m->get_ocv() &&
- m->get_ocv() > osdcluster->get_version()) {
- dout(3) << "op reply has newer cluster " << m->get_ocv() << " > " << osdcluster->get_version() << endl;
- osdcluster->decode( m->get_osdcluster() );
+ m->get_ocv() > osdmap->get_version()) {
+ dout(3) << "op reply has newer cluster " << m->get_ocv() << " > " << osdmap->get_version() << endl;
+ osdmap->decode( m->get_osdmap() );
}
// find data
list<OSDExtent> extents;
- osdcluster->file_to_extents(ino, layout, old_size, new_size, extents);
+ osdmap->file_to_extents(ino, layout, old_size, new_size, extents);
dout(7) << "osd truncate ino " << ino << " to new size " << new_size << " from old_size " << old_size << " in " << extents.size() << " extents" << endl;
if (it->offset == 0) {
// issue delete
m = new MOSDOp(last_tid, messenger->get_myaddr(),
- it->oid, it->rg, osdcluster->get_version(),
+ it->oid, it->rg, osdmap->get_version(),
OSD_OP_DELETE);
} else {
// issue a truncate
m = new MOSDOp(last_tid, messenger->get_myaddr(),
- it->oid, it->rg, osdcluster->get_version(),
+ it->oid, it->rg, osdmap->get_version(),
OSD_OP_TRUNCATE);
m->set_length( it->offset );
}
// send MKFS to osds
set<int> ls;
- osdcluster->get_all_osds(ls);
+ osdmap->get_all_osds(ls);
for (set<int>::iterator it = ls.begin();
it != ls.end();
// issue mkfs
MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
- 0, 0, osdcluster->get_version(),
+ 0, 0, osdmap->get_version(),
OSD_OP_MKFS);
messenger->send_message(m, MSG_ADDR_OSD(*it), 0);
// find data
list<OSDExtent> extents;
- osdcluster->file_to_extents(ino, len, offset, num_rep, extents);
+ osdmap->file_to_extents(ino, len, offset, num_rep, extents);
dout(7) << "osd zero ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl;
MOSDOp *m;
//if (it->len ==
m = new MOSDOp(last_tid, messenger->get_myaddr(),
- it->oid, it->rg, osdcluster->get_version(),
+ it->oid, it->rg, osdmap->get_version(),
OSD_OP_DELETE);
it->len, it->offset);
messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
* client/mds interface to access "files" in OSD cluster.
*
* generic non-blocking interface for reading/writing to osds, using
- * the file-to-object mappings defined by OSDCluster.
+ * the file-to-object mappings defined by OSDMap.
*
* Filer also handles details of replication on OSDs (to the extent that
* it affects OSD clients)
#include "include/types.h"
#include "msg/Dispatcher.h"
-#include "OSDCluster.h"
+#include "OSDMap.h"
class Context;
class Messenger;
-class OSDCluster;
+class OSDMap;
/*** types ***/
typedef __uint64_t tid_t;
/**** Filer interface ***/
class Filer : public Dispatcher {
- OSDCluster *osdcluster; // what osds am i dealing with?
+ OSDMap *osdmap; // what osds am i dealing with?
Messenger *messenger;
__uint64_t last_tid;
hash_map<tid_t,PendingOSDOp_t*> op_mkfs;
public:
- Filer(Messenger *m, OSDCluster *o);
+ Filer(Messenger *m, OSDMap *o);
~Filer();
void dispatch(Message *m);