From: sage Date: Fri, 29 Jul 2005 05:54:28 +0000 (+0000) Subject: OSDCluster -> OSDMap X-Git-Tag: v0.1~1904 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1b6f27008ac80eb0edb4b7c527804666940aadce;p=ceph.git OSDCluster -> OSDMap git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@477 29311d96-e01e-0410-9327-a35deaab8ce9 --- diff --git a/ceph/Makefile b/ceph/Makefile index 9009d445877..d8ead3bd68c 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -44,7 +44,7 @@ COMMON_OBJS= \ msg/HostMonitor.o\ osd/FakeStore.o\ osd/Filer.o\ - osd/OSDCluster.o\ + osd/OSDMap.o\ osd/rush.o\ common/Logger.o\ common/Clock.o\ diff --git a/ceph/TODO b/ceph/TODO index e665e04be3b..ed738631335 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -1,13 +1,8 @@ -/- finish hashed_subset notify business -- hashed readdir +- test hashed readdir +- interactive hash/unhash interface - carefully define/document frozen wrt dir_auth vs hashing - -/- lacc -/- streakwave -/- mapreduce - - make logstream.flush align itself to stipes @@ -36,6 +31,7 @@ finish HARD LINKS MDS TODO +- fix hashed readdir: should (optionally) do a lock on dir namespace - fix hard links - they mostly work, but they're fragile - sync clients on stat diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index 27f02d4bd29..753d041c3b5 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -58,8 +58,8 @@ Client::Client(MDCluster *mdc, int id, Messenger *m) messenger->set_dispatcher(this); // osd interfaces - osdcluster = new OSDCluster(); // initially blank.. see mount() - filer = new Filer(messenger, osdcluster); + osdmap = new OSDMap(); // initially blank.. see mount() + filer = new Filer(messenger, osdmap); } @@ -67,7 +67,7 @@ Client::~Client() { if (messenger) { delete messenger; messenger = 0; } if (filer) { delete filer; filer = 0; } - if (osdcluster) { delete osdcluster; osdcluster = 0; } + if (osdmap) { delete osdmap; osdmap = 0; } tear_down_cache(); } @@ -647,8 +647,8 @@ int Client::mount(int mkfs) client_lock.Lock(); assert(reply); - // we got osdcluster! - osdcluster->decode(reply->get_osd_cluster_state()); + // we got osdmap! + osdmap->decode(reply->get_osd_map_state()); dout(2) << "mounted" << endl; mounted = true; diff --git a/ceph/client/Client.h b/ceph/client/Client.h index 047d0895eca..f8bedef886a 100644 --- a/ceph/client/Client.h +++ b/ceph/client/Client.h @@ -4,7 +4,7 @@ #include "Buffercache.h" #include "mds/MDCluster.h" -#include "osd/OSDCluster.h" +#include "osd/OSDMap.h" #include "msg/Message.h" #include "msg/Dispatcher.h" @@ -190,7 +190,7 @@ class Client : public Dispatcher { // cluster descriptors MDCluster *mdcluster; - OSDCluster *osdcluster; + OSDMap *osdmap; bool mounted; Filer *filer; // (non-blocking) osd interface diff --git a/ceph/config.cc b/ceph/config.cc index 2db42d4be75..39290850d34 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -1,6 +1,6 @@ #include "config.h" -#include "osd/OSDCluster.h" +#include "osd/OSDMap.h" //#define MDS_CACHE_SIZE 4*10000 -> <20mb diff --git a/ceph/mds/CDir.h b/ceph/mds/CDir.h index cdffc6e1d61..78a7fd43978 100644 --- a/ceph/mds/CDir.h +++ b/ceph/mds/CDir.h @@ -147,7 +147,9 @@ class Context; // waiter export_dir // trigger handel_export_dir_prep_ack -#define CDIR_WAIT_HASHED (1<<19) // hash finish +#define CDIR_WAIT_HASHED (1<<17) // hash finish +#define CDIR_WAIT_THISHASHEDREADDIR (1<<18) // current readdir lock +#define CDIR_WAIT_NEXTHASHEDREADDIR (1<<19) // after current readdir lock finishes #define CDIR_WAIT_DNREAD (1<<20) #define CDIR_WAIT_DNLOCK (1<<21) @@ -208,6 +210,10 @@ class CDir { // hashed dirs set hashed_subset; // HASHING: subset of mds's that are hashed + public: + // for class MDS + map > hashed_readdir; + protected: // context MDS *mds; diff --git a/ceph/mds/IdAllocator.cc b/ceph/mds/IdAllocator.cc index c5355dd73c0..e742109e9c9 100644 --- a/ceph/mds/IdAllocator.cc +++ b/ceph/mds/IdAllocator.cc @@ -7,7 +7,7 @@ #include "events/EAlloc.h" #include "osd/Filer.h" -#include "osd/OSDCluster.h" +#include "osd/OSDMap.h" #include "include/types.h" diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index ba9f25cca1a..6f1fe9dd308 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -4,7 +4,7 @@ #include "msg/Messenger.h" -#include "osd/OSDCluster.h" +#include "osd/OSDMap.h" #include "osd/Filer.h" #include "MDS.h" @@ -26,12 +26,14 @@ #include "messages/MPingAck.h" #include "messages/MGenericMessage.h" -#include "messages/MOSDGetClusterAck.h" +#include "messages/MOSDGetMapAck.h" #include "messages/MClientMount.h" #include "messages/MClientMountAck.h" #include "messages/MClientRequest.h" #include "messages/MClientReply.h" +#include "messages/MHashReaddir.h" +#include "messages/MHashReaddirReply.h" #include "messages/MLock.h" @@ -88,17 +90,17 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) { osdmonitor = new OSDMonitor(this); - // - osdcluster = new OSDCluster(); + // + osdmap = new OSDMap(); OSDGroup osdg; osdg.num_osds = g_conf.num_osd; for (int i=0; iadd_group(osdg); + osdmap->add_group(osdg); // - filer = new Filer(messenger, osdcluster); + filer = new Filer(messenger, osdmap); mdlog->set_max_events(g_conf.mds_log_max_len); @@ -166,7 +168,7 @@ MDS::~MDS() { if (osdmonitor) { delete osdmonitor; osdmonitor = 0; } if (idalloc) { delete idalloc; idalloc = NULL; } if (anchormgr) { delete anchormgr; anchormgr = NULL; } - if (osdcluster) { delete osdcluster; osdcluster = 0; } + if (osdmap) { delete osdmap; osdmap = 0; } if (filer) { delete filer; filer = 0; } if (messenger) { delete messenger; messenger = NULL; } @@ -272,9 +274,6 @@ mds_load_t MDS::get_load() l.root_pop = mdcache->get_root()->popularity[MDS_POP_ANYDOM].get(); } else l.root_pop = 0; - l.req_rate = stat_req.get(); - l.rd_rate = stat_read.get(); - l.wr_rate = stat_write.get(); return l; } @@ -301,8 +300,8 @@ void MDS::proc_message(Message *m) filer->handle_osd_op_reply((class MOSDOpReply*)m); return; - case MSG_OSD_GETCLUSTER: - handle_osd_getcluster(m); + case MSG_OSD_GETMAP: + handle_osd_getmap(m); return; // MDS @@ -355,6 +354,14 @@ void MDS::proc_message(Message *m) case MSG_CLIENT_REQUEST: handle_client_request((MClientRequest*)m); return; + + case MSG_MDS_HASHREADDIR: + handle_hash_readdir((MHashReaddir*)m); + return; + case MSG_MDS_HASHREADDIRREPLY: + handle_hash_readdir_reply((MHashReaddirReply*)m); + return; + } dout(1) << " main unknown message " << m->get_type() << endl; @@ -478,28 +485,31 @@ void MDS::my_dispatch(Message *m) num_bal_times--; } - static map didhash; - if (0 && elapsed.sec() > 15 && !didhash[whoami]) { - CInode *in = mdcache->get_inode(100000010); - if (in && in->dir) { - if (in->dir->is_auth()) - mdcache->hash_dir(in->dir); - didhash[whoami] = 1; + + // HACK to test hashing stuff + if (1) { + static map didhash; + if (elapsed.sec() > 15 && !didhash[whoami]) { + CInode *in = mdcache->get_inode(100000010); + if (in && in->dir) { + if (in->dir->is_auth()) + mdcache->hash_dir(in->dir); + didhash[whoami] = 1; + } } - } - if (0 && elapsed.sec() > 25 && didhash[whoami] == 1) { - CInode *in = mdcache->get_inode(100000010); - if (in && in->dir) { - if (in->dir->is_auth()) - mdcache->unhash_dir(in->dir); - didhash[whoami] = 2; + if (0 && elapsed.sec() > 25 && didhash[whoami] == 1) { + CInode *in = mdcache->get_inode(100000010); + if (in && in->dir) { + if (in->dir->is_auth() && in->dir->is_hashed()) + mdcache->unhash_dir(in->dir); + didhash[whoami] = 2; + } } } - } - // hack + // HACK to force export to test foreign renames if (false && whoami == 0) { static bool didit = false; @@ -515,6 +525,8 @@ void MDS::my_dispatch(Message *m) } } + + // shut down? if (shutting_down && !shut_down) { if (mdcache->shutdown_pass()) { @@ -528,11 +540,11 @@ void MDS::my_dispatch(Message *m) } -void MDS::handle_osd_getcluster(Message *m) +void MDS::handle_osd_getmap(Message *m) { - dout(7) << "osd_getcluster from " << MSG_ADDR_NICE(m->get_source()) << endl; + dout(7) << "osd_getmap from " << MSG_ADDR_NICE(m->get_source()) << endl; - messenger->send_message(new MOSDGetClusterAck(osdcluster), + messenger->send_message(new MOSDGetMapAck(osdmap), m->get_source()); delete m; } @@ -581,7 +593,7 @@ void MDS::handle_client_mount(MClientMount *m) // ack - messenger->send_message(new MClientMountAck(m, osdcluster), + messenger->send_message(new MClientMountAck(m, osdmap), m->get_source(), m->get_source_port()); delete m; } @@ -971,9 +983,6 @@ void MDS::handle_client_stat(MClientRequest *req, mdcache->inode_soft_read_finish(ref); - stat_read.hit(); - stat_req.hit(); - balancer->hit_inode(ref); // reply @@ -1096,9 +1105,12 @@ bool MDS::try_open_dir(CInode *in, MClientRequest *req) // DIRECTORY and NAMESPACE OPS +// READDIR -void MDS::encode_dir_contents(CDir *dir, list& items, int& numfiles) +int MDS::encode_dir_contents(CDir *dir, list& items) { + int numfiles = 0; + for (CDir_map_t::iterator it = dir->begin(); it != dir->end(); it++) { @@ -1125,6 +1137,133 @@ void MDS::encode_dir_contents(CDir *dir, list& items, int& numfil items.push_back( new c_inode_info(in, whoami, it->first) ); numfiles++; } + return numfiles; +} + + +/* + * note: this is pretty sloppy, but should work just fine i think... + */ +void MDS::handle_hash_readdir(MHashReaddir *m) +{ + CInode *cur = mdcache->get_inode(m->get_ino()); + assert(cur); + + if (!cur->dir || + !cur->dir->is_hashed()) { + assert(0); + dout(7) << "handle_hash_readdir don't have dir open, or not hashed. giving up!" << endl; + delete m; + return; + } + CDir *dir = cur->dir; + assert(dir); + assert(dir->is_hashed()); + + // complete? + if (!dir->is_complete()) { + dout(10) << " incomplete dir contents for readdir on " << *dir << ", fetching" << endl; + mdstore->fetch_dir(dir, new C_MDS_RetryMessage(this, m)); + return; + } + + // get content + list items; + encode_dir_contents(dir, items); + + // sent it back! + messenger->send_message(new MHashReaddirReply(dir->ino(), items), + m->get_source(), MDS_PORT_CACHE, MDS_PORT_CACHE); +} + + +void MDS::handle_hash_readdir_reply(MHashReaddirReply *m) +{ + CInode *cur = mdcache->get_inode(m->get_ino()); + assert(cur); + + if (!cur->dir || + !cur->dir->is_hashed()) { + assert(0); + dout(7) << "handle_hash_readdir don't have dir open, or not hashed. giving up!" << endl; + delete m; + return; + } + CDir *dir = cur->dir; + assert(dir); + assert(dir->is_hashed()); + + // move items to hashed_readdir gather + int from = m->get_source(); + assert(dir->hashed_readdir.count(from) == 0); + dir->hashed_readdir[from].splice(dir->hashed_readdir[from].begin(), + m->get_items()); + delete m; + + // gather finished? + if (dir->hashed_readdir.size() < (unsigned)get_cluster()->get_num_mds()) { + dout(7) << "still waiting for more hashed readdir bits" << endl; + return; + } + + dout(7) << "got last bit! finishing waiters" << endl; + + // do these finishers. they'll copy the results. + list finished; + dir->take_waiting(CDIR_WAIT_THISHASHEDREADDIR, finished); + finish_contexts(finished); + + // now discard these results + for (map >::iterator it = dir->hashed_readdir.begin(); + it != dir->hashed_readdir.end(); + it++) { + for (list::iterator ci = it->second.begin(); + ci != it->second.end(); + ci++) + delete *ci; + } + dir->hashed_readdir.clear(); + + // unpin dir (we're done!) + dir->auth_unpin(); + + // trigger any waiters for next hashed readdir cycle + dir->take_waiting(CDIR_WAIT_NEXTHASHEDREADDIR, finished_queue); +} + + +class C_MDS_HashReaddir : public Context { + MDS *mds; + MClientRequest *req; + CDir *dir; +public: + C_MDS_HashReaddir(MDS *mds, MClientRequest *req, CDir *dir) { + this->mds = mds; + this->req = req; + this->dir = dir; + } + void finish(int r) { + mds->finish_hash_readdir(req, dir); + } +}; + +void MDS::finish_hash_readdir(MClientRequest *req, CDir *dir) +{ + dout(7) << "finish_hash_readdir on " << *dir << endl; + + assert(dir->is_hashed()); + assert(dir->hashed_readdir.size() == (unsigned)get_cluster()->get_num_mds()); + + // reply! + MClientReply *reply = new MClientReply(req); + reply->set_result(0); + + for (int i=0; iget_num_mds(); i++) { + reply->copy_dir_items(dir->hashed_readdir[i]); + } + + // ok! + reply_request(req, reply, dir->inode); } @@ -1139,7 +1278,6 @@ void MDS::handle_client_readdir(MClientRequest *req, return; } - // auth? if (!cur->dir_is_auth()) { int dirauth = cur->authority(); @@ -1158,23 +1296,71 @@ void MDS::handle_client_readdir(MClientRequest *req, return; assert(cur->dir->is_auth()); + // unhashing? wait! + if (cur->dir->is_hashed() && + cur->dir->is_unhashing()) { + dout(10) << "unhashing, waiting" << endl; + cur->dir->add_waiter(CDIR_WAIT_UNFREEZE, + new C_MDS_RetryRequest(this, req, cur)); + return; + } + // check perm if (!mdcache->inode_hard_read_start(cur,req)) return; mdcache->inode_hard_read_finish(cur); + CDir *dir = cur->dir; + assert(dir); - if (!cur->dir->is_complete()) { + if (!dir->is_complete()) { // fetch dout(10) << " incomplete dir contents for readdir on " << *cur->dir << ", fetching" << endl; - mdstore->fetch_dir(cur->dir, new C_MDS_RetryRequest(this, req, cur)); + mdstore->fetch_dir(dir, new C_MDS_RetryRequest(this, req, cur)); return; } - + + if (dir->is_hashed()) { + // HASHED + dout(7) << "hashed dir" << endl; + if (!dir->can_auth_pin()) { + dout(7) << "can't auth_pin dir " << *dir << " waiting" << endl; + dir->add_waiter(CDIR_WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(this, req, cur)); + return; + } + + if (!dir->hashed_readdir.empty()) { + dout(7) << "another readdir gather in progres, waiting" << endl; + dir->add_waiter(CDIR_WAIT_NEXTHASHEDREADDIR, new C_MDS_RetryRequest(this, req, cur)); + return; + } + + // start new readdir gather + dout(7) << "staring new hashed readdir gather" << endl; + + // pin auth for process! + dir->auth_pin(); + + // get local bits + encode_dir_contents(cur->dir, dir->hashed_readdir[whoami]); + + // request other bits + for (int i=0; iget_num_mds(); i++) { + if (i == get_nodeid()) continue; + messenger->send_message(new MHashReaddir(dir->ino()), + MSG_ADDR_MDS(i), MDS_PORT_SERVER, MDS_PORT_SERVER); + } + + // wait + dir->add_waiter(CDIR_WAIT_THISHASHEDREADDIR, + new C_MDS_HashReaddir(this, req, dir)); + return; + } + + // NON-HASHED // build dir contents list items; - int numfiles = 0; - encode_dir_contents(cur->dir, items, numfiles); + int numfiles = encode_dir_contents(cur->dir, items); // yay, reply MClientReply *reply = new MClientReply(req); @@ -1183,9 +1369,6 @@ void MDS::handle_client_readdir(MClientRequest *req, dout(10) << "reply to " << *req << " readdir " << numfiles << " files" << endl; reply->set_result(0); - stat_read.hit(); - stat_req.hit(); - balancer->hit_dir(cur->dir); // reply @@ -1309,6 +1492,9 @@ CInode *MDS::mknod(MClientRequest *req, CInode *diri, bool okexist) dn->mark_dirty(); newi->mark_dirty(); + // journal it + mdlog->submit_entry(new EInodeUpdate(newi)); // FIXME WRONG EVENT + // ok! return newi; } diff --git a/ceph/mds/MDS.h b/ceph/mds/MDS.h index 94975650579..bc859ed8a17 100644 --- a/ceph/mds/MDS.h +++ b/ceph/mds/MDS.h @@ -52,7 +52,7 @@ typedef __uint64_t object_t; class filepath; -class OSDCluster; +class OSDMap; class Filer; class AnchorTable; @@ -68,6 +68,8 @@ class Messenger; class Message; class MClientRequest; class MClientReply; +class MHashReaddir; +class MHashReaddirReply; class MDBalancer; class LogEvent; class IdAllocator; @@ -89,7 +91,7 @@ class MDS : public Dispatcher { MDCluster *mdcluster; public: - OSDCluster *osdcluster; + OSDMap *osdmap; Filer *filer; // for reading/writing to/from osds AnchorTable *anchormgr; OSDMonitor *osdmonitor; @@ -113,10 +115,6 @@ class MDS : public Dispatcher { friend class MDStore; // stats - DecayCounter stat_req; - DecayCounter stat_read; - DecayCounter stat_write; - set mounted_clients; @@ -146,7 +144,7 @@ class MDS : public Dispatcher { int get_nodeid() { return whoami; } MDCluster *get_cluster() { return mdcluster; } MDCluster *get_mds_cluster() { return mdcluster; } - OSDCluster *get_osd_cluster() { return osdcluster; } + OSDMap *get_osd_map() { return osdmap; } mds_load_t get_load(); @@ -185,7 +183,7 @@ class MDS : public Dispatcher { void handle_shutdown_finish(Message *m); // osds - void handle_osd_getcluster(Message *m); + void handle_osd_getmap(Message *m); // clients void handle_client_mount(class MClientMount *m); @@ -214,9 +212,14 @@ class MDS : public Dispatcher { MClientReply *reply, CInode *ref); - // namespace - void encode_dir_contents(CDir *dir, list& items, int& numfiles); + // readdir void handle_client_readdir(MClientRequest *req, CInode *ref); + int encode_dir_contents(CDir *dir, list& items); + void handle_hash_readdir(MHashReaddir *m); + void handle_hash_readdir_reply(MHashReaddirReply *m); + void finish_hash_readdir(MClientRequest *req, CDir *dir); + + // namespace changes void handle_client_mknod(MClientRequest *req, CInode *ref); void handle_client_link(MClientRequest *req, CInode *ref); void handle_client_link_2(int r, MClientRequest *req, CInode *ref, vector& trace); diff --git a/ceph/mds/MDStore.cc b/ceph/mds/MDStore.cc index b55fe0158c7..ec9b0dccab8 100644 --- a/ceph/mds/MDStore.cc +++ b/ceph/mds/MDStore.cc @@ -8,7 +8,7 @@ #include "MDCluster.h" #include "osd/Filer.h" -#include "osd/OSDCluster.h" +#include "osd/OSDMap.h" #include "msg/Message.h" diff --git a/ceph/mds/OSDMonitor.cc b/ceph/mds/OSDMonitor.cc index 023a87a176f..3837ef2206c 100644 --- a/ceph/mds/OSDMonitor.cc +++ b/ceph/mds/OSDMonitor.cc @@ -1,7 +1,7 @@ #include "OSDMonitor.h" #include "MDS.h" -#include "osd/OSDCluster.h" +#include "osd/OSDMap.h" #include "msg/Message.h" #include "msg/Messenger.h" diff --git a/ceph/messages/MOSDMap.h b/ceph/messages/MOSDMap.h index ffbb9b9a2f7..c47ba8e8069 100644 --- a/ceph/messages/MOSDMap.h +++ b/ceph/messages/MOSDMap.h @@ -1,11 +1,11 @@ -#ifndef __MOSDGETCLUSTERACK_H -#define __MOSDGETCLUSTERACK_H +#ifndef __MOSDGETMAPACK_H +#define __MOSDGETMAPACK_H #include "msg/Message.h" #include "osd/OSDMap.h" -class MOSDGetClusterAck : public Message { +class MOSDGetMapAck : public Message { bufferlist osdmap; public: @@ -14,11 +14,11 @@ class MOSDGetClusterAck : public Message { return osdmap; } - MOSDGetClusterAck(OSDMap *oc) : - Message(MSG_OSD_GETCLUSTERACK) { + MOSDGetMapAck(OSDMap *oc) : + Message(MSG_OSD_GETMAPACK) { oc->encode(osdmap); } - MOSDGetClusterAck() {} + MOSDGetMapAck() {} // marshalling @@ -29,7 +29,7 @@ class MOSDGetClusterAck : public Message { payload.claim(osdmap); } - virtual char *get_type_name() { return "ogca"; } + virtual char *get_type_name() { return "ogma"; } }; #endif diff --git a/ceph/messages/MOSDOpReply.h b/ceph/messages/MOSDOpReply.h index ac12e40deeb..8250874556c 100644 --- a/ceph/messages/MOSDOpReply.h +++ b/ceph/messages/MOSDOpReply.h @@ -2,7 +2,7 @@ #define __MOSDOPREPLY_H #include "msg/Message.h" -#include "osd/OSDCluster.h" +#include "osd/OSDMap.h" #include "MOSDOp.h" @@ -37,7 +37,7 @@ typedef struct { class MOSDOpReply : public Message { MOSDOpReply_st st; bufferlist data; - bufferlist osdcluster; + bufferlist osdmap; public: long get_tid() { return st.tid; } @@ -63,17 +63,17 @@ class MOSDOpReply : public Message { return data; } - // osdcluster + // osdmap __uint64_t get_ocv() { return st._new_ocv; } - bufferlist& get_osdcluster() { - return osdcluster; + bufferlist& get_osdmap() { + return osdmap; } // keep a pcid (procedure call id) to match up request+reply void set_pcid(long pcid) { this->st.pcid = pcid; } long get_pcid() { return st.pcid; } - MOSDOpReply(MOSDOp *req, int result, OSDCluster *oc) : + MOSDOpReply(MOSDOp *req, int result, OSDMap *oc) : Message(MSG_OSD_OPREPLY) { memset(&st, 0, sizeof(st)); this->st.pcid = req->st.pcid; @@ -88,9 +88,9 @@ class MOSDOpReply : public Message { // attach updated cluster spec? if (req->get_ocv() < oc->get_version()) { - oc->encode(osdcluster); + oc->encode(osdmap); st._new_ocv = oc->get_version(); - st._oc_len = osdcluster.length(); + st._oc_len = osdmap.length(); } } MOSDOpReply() {} @@ -101,12 +101,12 @@ class MOSDOpReply : public Message { payload.copy(0, sizeof(st), (char*)&st); payload.splice(0, sizeof(st)); if (st._data_len) payload.splice(0, st._data_len, &data); - if (st._oc_len) payload.splice(0, st._oc_len, &osdcluster); + if (st._oc_len) payload.splice(0, st._oc_len, &osdmap); } virtual void encode_payload() { payload.push_back( new buffer((char*)&st, sizeof(st)) ); payload.claim_append( data ); - payload.claim_append( osdcluster ); + payload.claim_append( osdmap ); } virtual char *get_type_name() { return "oopr"; } diff --git a/ceph/msg/Message.h b/ceph/msg/Message.h index 5817636d215..a08592cea04 100644 --- a/ceph/msg/Message.h +++ b/ceph/msg/Message.h @@ -14,8 +14,8 @@ #define MSG_OSD_OPREPLY 15 // delete, etc. #define MSG_OSD_PING 16 -#define MSG_OSD_GETCLUSTER 17 -#define MSG_OSD_GETCLUSTERACK 18 +#define MSG_OSD_GETMAP 17 +#define MSG_OSD_GETMAPACK 18 #define MSG_CLIENT_REQUEST 20 #define MSG_CLIENT_REPLY 21 @@ -69,7 +69,9 @@ #define MSG_MDS_HASHDIR 164 #define MSG_MDS_HASHDIRACK 165 #define MSG_MDS_HASHDIRNOTIFY 166 -#define MSG_MDS_HASHDIRFINISH 167 + +#define MSG_MDS_HASHREADDIR 168 +#define MSG_MDS_HASHREADDIRREPLY 169 #define MSG_MDS_UNHASHDIRPREP 170 #define MSG_MDS_UNHASHDIRPREPACK 171 diff --git a/ceph/msg/Messenger.cc b/ceph/msg/Messenger.cc index 44663a63e51..6a687802e91 100644 --- a/ceph/msg/Messenger.cc +++ b/ceph/msg/Messenger.cc @@ -20,7 +20,7 @@ using namespace std; #include "messages/MOSDPing.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" -#include "messages/MOSDGetClusterAck.h" +#include "messages/MOSDGetMapAck.h" #include "messages/MClientMount.h" #include "messages/MClientMountAck.h" @@ -218,8 +218,8 @@ decode_message(msg_envelope_t& env, bufferlist& payload) case MSG_OSD_OPREPLY: m = new MOSDOpReply(); break; - case MSG_OSD_GETCLUSTERACK: - m = new MOSDGetClusterAck(); + case MSG_OSD_GETMAPACK: + m = new MOSDGetMapAck(); break; // clients @@ -399,7 +399,7 @@ decode_message(msg_envelope_t& env, bufferlist& payload) case MSG_MDS_SHUTDOWNFINISH: case MSG_SHUTDOWN: case MSG_CLIENT_UNMOUNT: - case MSG_OSD_GETCLUSTER: + case MSG_OSD_GETMAP: m = new MGenericMessage(env.type); break; diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 8bb6dc152c1..a45711dc664 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -2,7 +2,7 @@ #include "include/types.h" #include "OSD.h" -#include "OSDCluster.h" +#include "OSDMap.h" #ifdef USE_OBFS # include "OBFSStore.h" @@ -23,7 +23,7 @@ #include "messages/MPingAck.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" -#include "messages/MOSDGetClusterAck.h" +#include "messages/MOSDGetMapAck.h" #include "common/Logger.h" #include "common/LogType.h" @@ -56,7 +56,7 @@ OSD::OSD(int id, Messenger *m) messenger = m; messenger->set_dispatcher(this); - osdcluster = 0; + osdmap = 0; last_tid = 0; @@ -106,7 +106,7 @@ OSD::OSD(int id, Messenger *m) OSD::~OSD() { if (threadpool) { delete threadpool; threadpool = 0; } - if (osdcluster) { delete osdcluster; osdcluster = 0; } + if (osdmap) { delete osdmap; osdmap = 0; } if (monitor) { delete monitor; monitor = 0; } if (messenger) { delete messenger; messenger = 0; } if (logger) { delete logger; logger = 0; } @@ -164,8 +164,8 @@ void OSD::dispatch(Message *m) delete m; break; - case MSG_OSD_GETCLUSTERACK: - handle_getcluster_ack((MOSDGetClusterAck*)m); + case MSG_OSD_GETMAPACK: + handle_getmap_ack((MOSDGetMapAck*)m); break; case MSG_PING: @@ -224,19 +224,19 @@ void OSD::handle_ping(MPing *m) } -void OSD::handle_getcluster_ack(MOSDGetClusterAck *m) +void OSD::handle_getmap_ack(MOSDGetMapAck *m) { // SAB osd_lock.Lock(); - if (!osdcluster) osdcluster = new OSDCluster(); - osdcluster->decode(m->get_osdcluster()); - dout(7) << "got OSDCluster version " << osdcluster->get_version() << endl; + if (!osdmap) osdmap = new OSDMap(); + osdmap->decode(m->get_osdmap()); + dout(7) << "got OSDMap version " << osdmap->get_version() << endl; delete m; // process waiters list waiting; - waiting.splice(waiting.begin(), waiting_for_osdcluster); + waiting.splice(waiting.begin(), waiting_for_osdmap); for (list::iterator it = waiting.begin(); it != waiting.end(); @@ -252,15 +252,15 @@ void OSD::handle_op(MOSDOp *op) { // starting up? - if (!osdcluster) { + if (!osdmap) { // SAB osd_lock.Lock(); - dout(7) << "no OSDCluster, starting up" << endl; - if (waiting_for_osdcluster.empty()) - messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER), + dout(7) << "no OSDMap, starting up" << endl; + if (waiting_for_osdmap.empty()) + messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), MSG_ADDR_MDS(0), MDS_PORT_MAIN); - waiting_for_osdcluster.push_back(op); + waiting_for_osdmap.push_back(op); // SAB osd_lock.Unlock(); @@ -270,20 +270,20 @@ void OSD::handle_op(MOSDOp *op) // check cluster version - if (op->get_ocv() > osdcluster->get_version()) { + if (op->get_ocv() > osdmap->get_version()) { // op's is newer - dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl; + dout(7) << "op cluster " << op->get_ocv() << " > " << osdmap->get_version() << endl; // query MDS dout(7) << "querying MDS" << endl; - messenger->send_message(new MGenericMessage(MSG_OSD_GETCLUSTER), + messenger->send_message(new MGenericMessage(MSG_OSD_GETMAP), MSG_ADDR_MDS(0), MDS_PORT_MAIN); assert(0); // SAB osd_lock.Lock(); - waiting_for_osdcluster.push_back(op); + waiting_for_osdmap.push_back(op); // SAB osd_lock.Unlock(); @@ -291,9 +291,9 @@ void OSD::handle_op(MOSDOp *op) return; } - if (op->get_ocv() < osdcluster->get_version()) { + if (op->get_ocv() < osdmap->get_version()) { // op's is old - dout(7) << "op cluster " << op->get_ocv() << " > " << osdcluster->get_version() << endl; + dout(7) << "op cluster " << op->get_ocv() << " > " << osdmap->get_version() << endl; } @@ -305,7 +305,7 @@ void OSD::handle_op(MOSDOp *op) // PRIMARY // verify that we are primary, or acting primary - int acting_primary = osdcluster->get_rg_acting_primary( op->get_rg() ); + int acting_primary = osdmap->get_rg_acting_primary( op->get_rg() ); if (acting_primary != whoami) { dout(7) << " acting primary is " << acting_primary << ", forwarding" << endl; messenger->send_message(op, MSG_ADDR_OSD(acting_primary), 0); @@ -314,7 +314,7 @@ void OSD::handle_op(MOSDOp *op) } } else { // REPLICA - int my_role = osdcluster->get_rg_role(rg, whoami); + int my_role = osdmap->get_rg_role(rg, whoami); dout(7) << "rg " << rg << " my_role " << my_role << " wants " << op->get_rg_role() << endl; @@ -377,7 +377,7 @@ void OSD::op_read(MOSDOp *r) r->get_length(), r->get_offset(), bptr.c_str()); // set up reply - MOSDOpReply *reply = new MOSDOpReply(r, 0, osdcluster); + MOSDOpReply *reply = new MOSDOpReply(r, 0, osdmap); if (got >= 0) { bptr.set_length(got); // properly size the buffer @@ -418,7 +418,7 @@ void OSD::op_write(MOSDOp *op) if (op->get_rg_nrep() > 1) { dout(7) << "op_write nrep=" << op->get_rg_nrep() << endl; int reps[op->get_rg_nrep()]; - osdcluster->repgroup_to_osds(op->get_rg(), + osdmap->repgroup_to_osds(op->get_rg(), reps, op->get_rg_nrep()); @@ -432,7 +432,7 @@ void OSD::op_write(MOSDOp *op) messenger->get_myaddr(), op->get_oid(), op->get_rg(), - osdcluster->get_version(), + osdmap->get_version(), op->get_op()); wr->get_data() = op->get_data(); // copy bufferlist messenger->send_message(wr, MSG_ADDR_OSD(reps[i])); @@ -501,7 +501,7 @@ void OSD::op_write(MOSDOp *op) } // reply - MOSDOpReply *reply = new MOSDOpReply(op, 0, osdcluster); + MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap); messenger->send_message(reply, op->get_asker()); delete op; @@ -512,7 +512,7 @@ void OSD::op_mkfs(MOSDOp *op) dout(3) << "MKFS" << endl; { int r = store->mkfs(); - messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker()); + messenger->send_message(new MOSDOpReply(op, r, osdmap), op->get_asker()); } delete op; } @@ -523,7 +523,7 @@ void OSD::op_delete(MOSDOp *op) dout(3) << "delete on " << op->get_oid() << " r = " << r << endl; // "ack" - messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker()); + messenger->send_message(new MOSDOpReply(op, r, osdmap), op->get_asker()); logger->inc("rm"); delete op; @@ -535,7 +535,7 @@ void OSD::op_truncate(MOSDOp *op) dout(3) << "truncate on " << op->get_oid() << " at " << op->get_offset() << " r = " << r << endl; // "ack" - messenger->send_message(new MOSDOpReply(op, r, osdcluster), op->get_asker()); + messenger->send_message(new MOSDOpReply(op, r, osdmap), op->get_asker()); logger->inc("trunc"); @@ -550,7 +550,7 @@ void OSD::op_stat(MOSDOp *op) dout(3) << "stat on " << op->get_oid() << " r = " << r << " size = " << st.st_size << endl; - MOSDOpReply *reply = new MOSDOpReply(op, r, osdcluster); + MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap); reply->set_object_size(st.st_size); messenger->send_message(reply, op->get_asker()); diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 547a5826880..b525ff6e78a 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -14,41 +14,119 @@ using namespace std; class Messenger; class Message; +typedef __uint64_t version_t; -// ways to be dirty -#define RG_DIRTY_LOCAL_LOG 1 -#define RG_DIRTY_LOCAL_SYNC 2 -#define RG_DIRTY_REPLICA_MEM 4 -#define RG_DIRTY_REPLICA_SYNC 8 +/** RGImport + * state associated with import of RG contents from another + * OSD. + */ +#define RG_IMPORT_STATE_STARTING 1 // fetching object, delete lists. +#define RG_IMPORT_STATE_IMPORTING 2 // fetching replicas. +#define RG_IMPORT_STATE_FINISHING 3 // got everything; telling old guy to hose residual state -class ReplicaGroup { +struct RGImport { + int peer; // the peer + int peer_role; // peer's role. if <0, we should delete strays. + int import_state; + + map remaining_objects; // remote object list + map stray_objects; // imported but not deleted. + + // FIXME: add destructive vs non-destructive. maybe peer is a replica! +}; + + +/** RGPeer + * state associated with (possibly old) RG peers. + * only used by primary? + * + */ + +// by primary +#define RG_PEER_STATE_ACTIVE 1 // active peer +#define RG_PEER_STATE_COMPLETE 2 // peer has everything replicated + +struct RGPeer { + int peer; + int role; // 0 primary, 1+ replica, -1 residual + int state; + + // used by primary for syncing (old) replicas + map objects; // remote object list + map deleted; // remote delete list + map fetching; // objects i'm reading from replica + map stray; // objects that need to be deleted + + // used by primary for normal replication stuff + map writing; // objects i've written to replica + map flushing; // objects i've written to remote buffer cache only +}; + + + +/** RG - Replica Group + * + */ + +// bits used on any +#define RG_STATE_COMPLETE 1 // i have full RG contents locally. +#define RG_STATE_PEERED 2 // i have contacted prior primary and all + // replica osds and/or fetched their + // content lists, and thus know what's up. + // or, i have check in w/ new primary (on replica) + +// on primary or old-primary only +#define RG_STATE_CLEAN 4 // i am fully replicated + +class RG { public: repgroup_t rg; - int role; // 0 = primary, 1 = secondary, etc. 0=undef. + int role; // 0 = primary, 1 = secondary, etc. -1=undef/none. int state; - - map dirty_map; // dirty objects - ReplicaGroup(repgroup_t rg); + map peers; + // for unstable states, + map deleted_objects; // locally + + public: + RG(repgroup_t rg); + + repgroup_t get_rg() { return rg; } + int get_role() { return role; } + int get_state() { return state; } + void enumerate_objects(list& ls); }; +/** Onode + * per-object OSD metadata + */ +class Onode { + object_t oid; + version_t version; + + map stray_replicas; // osds w/ stray replicas. + + public: + +}; + class OSD : public Dispatcher { protected: Messenger *messenger; int whoami; - class OSDCluster *osdcluster; + class OSDMap *osdmap; class ObjectStore *store; class HostMonitor *monitor; class Logger *logger; class ThreadPool *threadpool; - list waiting_for_osdcluster; + list waiting_for_osdmap; // replica hack __uint64_t last_tid; @@ -69,8 +147,8 @@ class OSD : public Dispatcher { int init(); int shutdown(); - // OSDCluster - void update_osd_cluster(__uint64_t ocv, bufferlist& blist); + // OSDMap + void update_osd_map(__uint64_t ocv, bufferlist& blist); void queue_op(class MOSDOp *m); void do_op(class MOSDOp *m); @@ -83,7 +161,7 @@ class OSD : public Dispatcher { virtual void dispatch(Message *m); void handle_ping(class MPing *m); - void handle_getcluster_ack(class MOSDGetClusterAck *m); + void handle_getmap_ack(class MOSDGetMapAck *m); void handle_op(class MOSDOp *m); void op_read(class MOSDOp *m); void op_write(class MOSDOp *m); diff --git a/ceph/osd/OSDMap.cc b/ceph/osd/OSDMap.cc index 2aef9b3fee7..f76fba184af 100644 --- a/ceph/osd/OSDMap.cc +++ b/ceph/osd/OSDMap.cc @@ -1,12 +1,12 @@ -#include "OSDCluster.h" +#include "OSDMap.h" // serialize/unserialize -void OSDCluster::encode(bufferlist& blist) +void OSDMap::encode(bufferlist& blist) { blist.append((char*)&version, sizeof(version)); @@ -20,7 +20,7 @@ void OSDCluster::encode(bufferlist& blist) _encode(failed_osds, blist); } -void OSDCluster::decode(bufferlist& blist) +void OSDMap::decode(bufferlist& blist) { int off = 0; blist.copy(off, sizeof(version), (char*)&version); @@ -40,3 +40,4 @@ void OSDCluster::decode(bufferlist& blist) init_rush(); } + diff --git a/ceph/osd/OSDMap.h b/ceph/osd/OSDMap.h index ecb57463c02..858a2f92e21 100644 --- a/ceph/osd/OSDMap.h +++ b/ceph/osd/OSDMap.h @@ -1,5 +1,5 @@ -#ifndef __OSDCLUSTER_H -#define __OSDCLUSTER_H +#ifndef __OSDMAP_H +#define __OSDMAP_H /* * describe properties of the OSD cluster. @@ -94,9 +94,9 @@ class OSDExtent { }; -/** OSDCluster +/** OSDMap */ -class OSDCluster { +class OSDMap { __uint64_t version; // what version of the osd cluster descriptor is this // RUSH disk groups @@ -129,8 +129,8 @@ class OSDCluster { public: - OSDCluster() : version(0), rush(0) { } - ~OSDCluster() { + OSDMap() : version(0), rush(0) { } + ~OSDMap() { if (rush) { delete rush; rush = 0; } } diff --git a/ceph/osdc/Filer.cc b/ceph/osdc/Filer.cc index 2073b945556..f1cb5318c3b 100644 --- a/ceph/osdc/Filer.cc +++ b/ceph/osdc/Filer.cc @@ -2,7 +2,7 @@ #include #include "Filer.h" -#include "OSDCluster.h" +#include "OSDMap.h" //#include "messages/MOSDRead.h" //#include "messages/MOSDReadReply.h" @@ -23,11 +23,11 @@ -Filer::Filer(Messenger *m, OSDCluster *o) +Filer::Filer(Messenger *m, OSDMap *o) { last_tid = 0; messenger = m; - osdcluster = o; + osdmap = o; } Filer::~Filer() @@ -84,7 +84,7 @@ Filer::read(inodeno_t ino, p->onfinish = onfinish; // map buffer into OSD extents - osdcluster->file_to_extents(ino, layout, len, offset, p->extents); + osdmap->file_to_extents(ino, layout, len, offset, p->extents); dout(7) << "osd read ino " << ino << " len " << len << " off " << offset << " in " << p->extents.size() << " object extents" << endl; @@ -96,7 +96,7 @@ Filer::read(inodeno_t ino, // issue read MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->rg, osdcluster->get_version(), + it->oid, it->rg, osdmap->get_version(), OSD_OP_READ); m->set_length(it->len); m->set_offset(it->offset); @@ -274,7 +274,7 @@ Filer::write(inodeno_t ino, // find data list extents; - osdcluster->file_to_extents(ino, layout, len, offset, extents); + osdmap->file_to_extents(ino, layout, len, offset, extents); dout(7) << "osd write ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl; @@ -287,7 +287,7 @@ Filer::write(inodeno_t ino, // issue write MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->rg, osdcluster->get_version(), + it->oid, it->rg, osdmap->get_version(), OSD_OP_WRITE); m->set_length(it->len); m->set_offset(it->offset); @@ -358,9 +358,9 @@ Filer::handle_osd_op_reply(MOSDOpReply *m) { // updated cluster info? if (m->get_ocv() && - m->get_ocv() > osdcluster->get_version()) { - dout(3) << "op reply has newer cluster " << m->get_ocv() << " > " << osdcluster->get_version() << endl; - osdcluster->decode( m->get_osdcluster() ); + m->get_ocv() > osdmap->get_version()) { + dout(3) << "op reply has newer cluster " << m->get_ocv() << " > " << osdmap->get_version() << endl; + osdmap->decode( m->get_osdmap() ); } @@ -429,7 +429,7 @@ int Filer::truncate(inodeno_t ino, // find data list extents; - osdcluster->file_to_extents(ino, layout, old_size, new_size, extents); + osdmap->file_to_extents(ino, layout, old_size, new_size, extents); dout(7) << "osd truncate ino " << ino << " to new size " << new_size << " from old_size " << old_size << " in " << extents.size() << " extents" << endl; @@ -443,12 +443,12 @@ int Filer::truncate(inodeno_t ino, if (it->offset == 0) { // issue delete m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->rg, osdcluster->get_version(), + it->oid, it->rg, osdmap->get_version(), OSD_OP_DELETE); } else { // issue a truncate m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->rg, osdcluster->get_version(), + it->oid, it->rg, osdmap->get_version(), OSD_OP_TRUNCATE); m->set_length( it->offset ); } @@ -507,7 +507,7 @@ int Filer::mkfs(Context *onfinish) // send MKFS to osds set ls; - osdcluster->get_all_osds(ls); + osdmap->get_all_osds(ls); for (set::iterator it = ls.begin(); it != ls.end(); @@ -517,7 +517,7 @@ int Filer::mkfs(Context *onfinish) // issue mkfs MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(), - 0, 0, osdcluster->get_version(), + 0, 0, osdmap->get_version(), OSD_OP_MKFS); messenger->send_message(m, MSG_ADDR_OSD(*it), 0); @@ -557,7 +557,7 @@ int Filer::zero(inodeno_t ino, // find data list extents; - osdcluster->file_to_extents(ino, len, offset, num_rep, extents); + osdmap->file_to_extents(ino, len, offset, num_rep, extents); dout(7) << "osd zero ino " << ino << " len " << len << " off " << offset << " in " << extents.size() << " extents on " << num_rep << " replicas" << endl; @@ -571,7 +571,7 @@ int Filer::zero(inodeno_t ino, MOSDOp *m; //if (it->len == m = new MOSDOp(last_tid, messenger->get_myaddr(), - it->oid, it->rg, osdcluster->get_version(), + it->oid, it->rg, osdmap->get_version(), OSD_OP_DELETE); it->len, it->offset); messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0); diff --git a/ceph/osdc/Filer.h b/ceph/osdc/Filer.h index ff55a663072..cf2a5a70637 100644 --- a/ceph/osdc/Filer.h +++ b/ceph/osdc/Filer.h @@ -6,7 +6,7 @@ * client/mds interface to access "files" in OSD cluster. * * generic non-blocking interface for reading/writing to osds, using - * the file-to-object mappings defined by OSDCluster. + * the file-to-object mappings defined by OSDMap. * * Filer also handles details of replication on OSDs (to the extent that * it affects OSD clients) @@ -24,11 +24,11 @@ using namespace __gnu_cxx; #include "include/types.h" #include "msg/Dispatcher.h" -#include "OSDCluster.h" +#include "OSDMap.h" class Context; class Messenger; -class OSDCluster; +class OSDMap; /*** types ***/ typedef __uint64_t tid_t; @@ -64,7 +64,7 @@ typedef struct { /**** Filer interface ***/ class Filer : public Dispatcher { - OSDCluster *osdcluster; // what osds am i dealing with? + OSDMap *osdmap; // what osds am i dealing with? Messenger *messenger; __uint64_t last_tid; @@ -76,7 +76,7 @@ class Filer : public Dispatcher { hash_map op_mkfs; public: - Filer(Messenger *m, OSDCluster *o); + Filer(Messenger *m, OSDMap *o); ~Filer(); void dispatch(Message *m);