From 88086b83b7dcb0eb5c092e30fde8570475173f5e Mon Sep 17 00:00:00 2001 From: sage Date: Sat, 6 Aug 2005 19:05:49 +0000 Subject: [PATCH] lots of OSD peering stuff (still not complete) TCPMessenger rewrite!!! git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@485 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/Makefile | 14 +- ceph/TODO | 38 +- ceph/client/Client.cc | 12 +- ceph/client/Client.h | 6 +- ceph/client/SyntheticClient.cc | 78 ++++ ceph/client/SyntheticClient.h | 9 +- ceph/common/Mutex.h | 7 +- ceph/config.cc | 215 +++++----- ceph/config.h | 12 +- ceph/fakefuse.cc | 29 +- ceph/fakesyn.cc | 85 +--- ceph/include/bufferlist.h | 34 ++ ceph/mds/MDLog.cc | 25 +- ceph/mds/MDS.cc | 5 +- ceph/mds/MDS.h | 10 +- ceph/messages/MNSConnect.h | 31 ++ ceph/messages/MNSConnectAck.h | 31 ++ ceph/messages/MNSLookup.h | 32 ++ ceph/messages/MNSLookupReply.h | 80 ++++ ceph/messages/MNSRegister.h | 45 ++ ceph/messages/MNSRegisterAck.h | 39 ++ ceph/messages/MOSDRGNotify.h | 4 +- ceph/messages/MOSDRGPeer.h | 36 ++ ceph/messages/MOSDRGPeerAck.h | 55 +++ ceph/messages/MOSDRGPeerRequest.h | 36 ++ ceph/msg/Message.h | 72 ++-- ceph/msg/Messenger.cc | 38 +- ceph/msg/Messenger.h | 4 +- ceph/msg/TCPDirectory.cc | 162 ++++++++ ceph/msg/TCPDirectory.h | 81 ++++ ceph/msg/TCPMessenger.cc | 665 +++++++++++++++++++++--------- ceph/msg/TCPMessenger.h | 44 +- ceph/msg/mpistarter.cc | 45 ++ ceph/osd/OSD.cc | 156 ++++++- ceph/osd/OSD.h | 71 +++- ceph/osdc/Filer.cc | 10 +- ceph/tcpfuse.cc | 188 +++------ ceph/tcpsyn.cc | 238 ++++------- 38 files changed, 1965 insertions(+), 777 deletions(-) create mode 100644 ceph/messages/MNSConnect.h create mode 100644 ceph/messages/MNSConnectAck.h create mode 100644 ceph/messages/MNSLookup.h create mode 100644 ceph/messages/MNSLookupReply.h create mode 100644 ceph/messages/MNSRegister.h create mode 100644 ceph/messages/MNSRegisterAck.h create mode 100644 ceph/messages/MOSDRGPeer.h create mode 100644 ceph/messages/MOSDRGPeerAck.h create mode 100644 ceph/messages/MOSDRGPeerRequest.h create mode 100644 ceph/msg/TCPDirectory.cc create mode 100644 ceph/msg/TCPDirectory.h create mode 100644 ceph/msg/mpistarter.cc diff --git a/ceph/Makefile b/ceph/Makefile index d8ead3bd68ce9..64ffe0af7b94f 100644 --- a/ceph/Makefile +++ b/ceph/Makefile @@ -55,8 +55,12 @@ SYN_OBJS = \ client/SyntheticClient.o\ client/Trace.o -TEST_TARGETS = fakemds mpitest -TARGETS = import singleclient mpifuse fakefuse mpisyn fakesyn tcpsyn +TCP_OBJS = \ + msg/TCPMessenger.o\ + msg/TCPDirectory.o + +TEST_TARGETS = fakemds +TARGETS = fakefuse fakesyn tcpsyn tcpfuse SRCS=*.cc */*.cc @@ -97,10 +101,10 @@ mttest: test/mttest.cc msg/MTMessenger.cc ${COMMON_OBJS} fakefuse: fakefuse.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o client/fuse.o msg/FakeMessenger.cc ${COMMON_OBJS} ${CC} -pg ${CFLAGS} ${LIBS} -lfuse $^ -o $@ -tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o client/fuse.o msg/TCPMessenger.cc ${COMMON_OBJS} +tcpfuse: tcpfuse.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o client/fuse.o ${TCP_OBJS} ${COMMON_OBJS} ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ -mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o client/fuse.o msg/MPIMessenger.cc ${COMMON_OBJS} +mpifuse: mpifuse.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o client/fuse.o ${TCP_OBJS} ${COMMON_OBJS} ${MPICC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@ @@ -111,7 +115,7 @@ fakesyn: fakesyn.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o mpisyn: mpisyn.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o msg/MPIMessenger.cc ${COMMON_OBJS} ${SYN_OBJS} ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ -tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o msg/TCPMessenger.cc ${COMMON_OBJS} ${SYN_OBJS} +tcpsyn: tcpsyn.cc mds/allmds.o client/Client.o client/Buffercache.o osd/OSD.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS} ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@ # obfs + synthetic diff --git a/ceph/TODO b/ceph/TODO index 7cd44ee93c040..aefcaf999e71c 100644 --- a/ceph/TODO +++ b/ceph/TODO @@ -1,20 +1,30 @@ -I think the OSD stuff breaks down info a few different areas: +IPDPS -- Lay out OSD in-memory data structures. I've started this based on the stuff I posted to the list. +map/reduce refs +12 huston: diamond + - early discard .. distributed filters to reduce data... ultra-parallel grep +13 fischer: parallel prefix coputation.. N records in log N time? -- The object store needs to be able to store the metadata for objects and replica groups, via the ObjectStore interface. This boils down to collections and extended attributes (on both objects and collections). The beginnings of both are are implemented in FakeStore, but untested and unfinished. -- OSD::op_write needs to put new objects in a rg collection. +15 riedel:fast01 active disks for large scale data processing + - nearest neighbor .. sort of a map only, no distributed reduce + - association rule .. successive passives over map +16 condor - cluster mgmt +17 bulk synchronous -- Map the actual metadata we're keeping to object and collection xattrs. Probably via fetch/store methods on the in-memory data structures. -- When an OSD gets a new version of the map it needs to go through the process of checking it's active replica groups, seeing if its role has changed, etc. -- The migration process itself. Needs to be tunable somehow +o locality aware scheduling +o fine partitioning -> balanced scheduling +o redundant execution and re-execution -- OSD::op_read needs to do the proxying sort of thing, where if it's is primary but !COMPLETE it needs to wait and fetch the object from the (old) replica it currently resides on, or block until old replicas are scanned. -- OSD::op_write needs to do the replication thing. I think replication should be implemented last, it's probably easier to add once the role changing stuff already works. + +REPLICATION + +requirements +- We should support a fast succession of map updates, even when intermediate reorganizations are not allowed to complete before the next starts. +- Reorganizations can be arbitrary, potentially involving a completely disparate set of OSDs in a RG between epochs. (We will of course seek to minimize movement in practice.) @@ -31,21 +41,15 @@ I think the OSD stuff breaks down info a few different areas: KNOWN BUGS to fix after fast -- hard links! +- fix softlock, stat - implement truncate() for real +- hard links! UPCOMING TODOS: -/- redo client capability stuff -- finish buffer cache - -- plan out osd replication, recovery structures - - redo CDir hash_map in terms of const char * in CDentry? - or try google's hash library!! -- what to do about fuse direct_io and mmap()? - finish HARD LINKS - reclaim danglers from inode file on discover... diff --git a/ceph/client/Client.cc b/ceph/client/Client.cc index a24daf0bbbebf..f53e7517bcb67 100644 --- a/ceph/client/Client.cc +++ b/ceph/client/Client.cc @@ -38,10 +38,12 @@ Logger *client_logger = 0; // cons/des -Client::Client(MDCluster *mdc, int id, Messenger *m) +Client::Client(Messenger *m) { - mdcluster = mdc; - whoami = id; + // which client am i? + whoami = MSG_ADDR_NUM(m->get_myaddr()); + cout << "i am client " << whoami << " " << MSG_ADDR_NICE(m->get_myaddr()) << endl; + mounted = false; @@ -431,6 +433,10 @@ void Client::dispatch(Message *m) case MSG_OSD_OPREPLY: filer->handle_osd_op_reply((MOSDOpReply*)m); break; + + case MSG_OSD_MAP: + filer->handle_osd_map((class MOSDMap*)m); + break; // client case MSG_CLIENT_FILECAPS: diff --git a/ceph/client/Client.h b/ceph/client/Client.h index d6c509692ff87..21f72ad52edd3 100644 --- a/ceph/client/Client.h +++ b/ceph/client/Client.h @@ -1,7 +1,7 @@ #ifndef __CLIENT_H #define __CLIENT_H -#include "mds/MDCluster.h" +//#include "mds/MDCluster.h" #include "osd/OSDMap.h" #include "msg/Message.h" @@ -189,7 +189,7 @@ class Client : public Dispatcher { bool all_files_closed; // cluster descriptors - MDCluster *mdcluster; + //MDCluster *mdcluster; OSDMap *osdmap; bool mounted; @@ -332,7 +332,7 @@ class Client : public Dispatcher { friend class SyntheticClient; public: - Client(MDCluster *mdc, int id, Messenger *m); + Client(Messenger *m); ~Client(); void tear_down_cache(); diff --git a/ceph/client/SyntheticClient.cc b/ceph/client/SyntheticClient.cc index 2cd13566248ba..2c0a4dc8f7f8a 100644 --- a/ceph/client/SyntheticClient.cc +++ b/ceph/client/SyntheticClient.cc @@ -21,6 +21,84 @@ //void trace_openssh(SyntheticClient *syn, Client *cl, string& prefix); +list syn_modes; +list syn_iargs; +list syn_sargs; + +void parse_syn_options(vector& args) +{ + vector nargs; + + for (unsigned i=0; iclient = client; + thread_id = 0; + + did_readdir = false; + + this->modes = syn_modes; + this->iargs = syn_iargs; + this->sargs = syn_sargs; +} + + + + #define DBL 2 void *synthetic_client_thread_entry(void *ptr) diff --git a/ceph/client/SyntheticClient.h b/ceph/client/SyntheticClient.h index ecc5b2de60b5c..01b05716a96d9 100644 --- a/ceph/client/SyntheticClient.h +++ b/ceph/client/SyntheticClient.h @@ -24,6 +24,8 @@ #define SYNCLIENT_MODE_RANDOMSLEEP 12 +void parse_syn_options(vector& args); + class SyntheticClient { Client *client; @@ -95,12 +97,7 @@ class SyntheticClient { } public: - SyntheticClient(Client *client) { - this->client = client; - thread_id = 0; - - did_readdir = false; - } + SyntheticClient(Client *client); int start_thread(); int join_thread(); diff --git a/ceph/common/Mutex.h b/ceph/common/Mutex.h index 964a2dacdf835..a2d3098d8e0e9 100755 --- a/ceph/common/Mutex.h +++ b/ceph/common/Mutex.h @@ -22,7 +22,7 @@ class Mutex public: - Mutex() + Mutex() : tag(false) { pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); @@ -30,18 +30,17 @@ class Mutex pthread_mutex_init(&M,&attr); //cout << this << " mutex init = " << r << endl; pthread_mutexattr_destroy(&attr); - this->tag = false; } - Mutex(bool tag) + Mutex(bool t) : tag(t) { + assert(0); pthread_mutexattr_t attr; pthread_mutexattr_init(&attr); pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE); pthread_mutex_init(&M,&attr); //cout << this << " mutex init = " << r << endl; pthread_mutexattr_destroy(&attr); - this->tag = tag; } virtual ~Mutex() diff --git a/ceph/config.cc b/ceph/config.cc index 7e3834c6bedcd..00e8189a66ba5 100644 --- a/ceph/config.cc +++ b/ceph/config.cc @@ -149,118 +149,119 @@ md_config_t g_conf = { using namespace std; +void argv_to_vec(int argc, char **argv, + vector& args) +{ + for (int i=1; i& args, + int& argc, char **&argv) { - // alloc new argc - nargv = (char**)malloc(sizeof(char*) * argc); - nargc = 0; - nargv[nargc++] = argv[0]; - - int extras = 0; - - for (int i=1; i& args) +{ + vector nargs; + + for (unsigned i=0; i +using namespace std; + struct md_config_t { int num_mds; int num_osd; @@ -116,8 +119,11 @@ extern md_config_t g_conf; #define dout(x) if ((x) <= g_conf.debug) cout #define dout2(x) if ((x) <= g_conf.debug) cout -void parse_config_options(int argc, char **argv, - int& nargc, char**&nargv, - bool barf_on_extras=false); +void argv_to_vec(int argc, char **argv, + vector& args); +void vec_to_argv(vector& args, + int& argc, char **&argv); + +void parse_config_options(vector& args); #endif diff --git a/ceph/fakefuse.cc b/ceph/fakefuse.cc index 39ec93f5208d0..d94c5c34744f3 100644 --- a/ceph/fakefuse.cc +++ b/ceph/fakefuse.cc @@ -41,13 +41,12 @@ public: -int main(int oargc, char **oargv) { +int main(int argc, char **argv) { cerr << "fakefuse starting" << endl; - int argc; - char **argv; - parse_config_options(oargc, oargv, - argc, argv); + vector args; + argv_to_vec(argc, argv, args); + parse_config_options(args); MDCluster *mdc = new MDCluster(NUMMDS, NUMOSD); @@ -57,21 +56,19 @@ int main(int oargc, char **oargv) { //g_timer.add_event_after(5.0, new C_Test2); //g_timer.add_event_after(10.0, new C_Test); + vector nargs; int mkfs = 0; - for (int i=1; iinit(); diff --git a/ceph/fakesyn.cc b/ceph/fakesyn.cc index cc2f0e79317bb..649dd518922a5 100644 --- a/ceph/fakesyn.cc +++ b/ceph/fakesyn.cc @@ -29,88 +29,38 @@ public: }; -int main(int oargc, char **oargv) +int main(int argc, char **argv) { cerr << "fakesyn starting" << endl; - int argc; - char **argv; - parse_config_options(oargc, oargv, - argc, argv); + vector args; + argv_to_vec(argc, argv, args); + + parse_config_options(args); int start = 0; - // build new argc+argv for fuse - typedef char* pchar; - int nargc = 0; - char **nargv = new pchar[argc]; - nargv[nargc++] = argv[0]; + parse_syn_options(args); - list syn_modes; - list syn_iargs; - list syn_sargs; + vector nargs; int mkfs = 0; - for (int i=1; imodes = syn_modes; - syn[i]->sargs = syn_sargs; - syn[i]->iargs = syn_iargs; syn[i]->start_thread(); } for (int i=0; i +#include #include #include using namespace std; @@ -495,6 +496,39 @@ inline void _decode(list<__uint64_t>& s, bufferlist& bl, int& off) assert(s.size() == (unsigned)n); } +// map<__uint64_t, __uint64_t> +inline void _encode(map<__uint64_t,__uint64_t>& s, bufferlist& bl) +{ + int n = s.size(); + bl.append((char*)&n, sizeof(n)); + for (map<__uint64_t,__uint64_t>::iterator it = s.begin(); + it != s.end(); + it++) { + __uint64_t k = it->first; + __uint64_t v = it->second; + bl.append((char*)&k, sizeof(k)); + bl.append((char*)&v, sizeof(v)); + n--; + } + assert(n==0); +} +inline void _decode(map<__uint64_t,__uint64_t>& s, bufferlist& bl, int& off) +{ + s.clear(); + int n; + bl.copy(off, sizeof(n), (char*)&n); + off += sizeof(n); + for (int i=0; i max_events) { + + if ((int)trimming.size() >= g_conf.mds_log_max_trimming) { + dout(7) << " already trimming max, waiting" << endl; + return; + } + off_t gap = logstream->get_append_pos() - logstream->get_read_pos(); dout(5) << "trim: num_events " << num_events << " - trimming " << trimming.size() << " > max " << max_events << " .. gap " << gap << endl; @@ -146,17 +152,14 @@ void MDLog::trim(Context *c) delete le; logger->inc("obs"); } else { - if ((int)trimming.size() < g_conf.mds_log_max_trimming) { - // trim! - dout(7) << " trimming " << le << endl; - trimming.insert(le); - le->retire(mds, new C_MDL_Trimmed(this, le)); - logger->inc("retire"); - logger->set("trim", trimming.size()); - } else { - dout(7) << " already trimming max, waiting" << endl; - return; - } + assert ((int)trimming.size() < g_conf.mds_log_max_trimming); + + // trim! + dout(7) << " trimming " << le << endl; + trimming.insert(le); + le->retire(mds, new C_MDL_Trimmed(this, le)); + logger->inc("retire"); + logger->set("trim", trimming.size()); } } else { // need to read! diff --git a/ceph/mds/MDS.cc b/ceph/mds/MDS.cc index 1e0c5e1cb9b21..aeca08e77ac1d 100644 --- a/ceph/mds/MDS.cc +++ b/ceph/mds/MDS.cc @@ -80,7 +80,6 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) { mdcluster = mdc; messenger = m; - messenger->set_dispatcher(this); mdcache = new MDCache(this); mdstore = new MDStore(this); @@ -158,6 +157,8 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) { // alloc idalloc = new IdAllocator(this); + // i'm ready! + messenger->set_dispatcher(this); } MDS::~MDS() { @@ -256,6 +257,8 @@ void MDS::handle_shutdown_finish(Message *m) int MDS::shutdown_final() { + dout(1) << "shutdown" << endl; + // shut down cache mdcache->shutdown(); diff --git a/ceph/mds/MDS.h b/ceph/mds/MDS.h index 8b28c2d2b187d..1ae5fd33f801c 100644 --- a/ceph/mds/MDS.h +++ b/ceph/mds/MDS.h @@ -35,11 +35,11 @@ typedef __uint64_t object_t; #define MDS_PORT_OSDMON 300 #define MDS_INO_ROOT 1 -#define MDS_INO_LOG_OFFSET 100 -#define MDS_INO_IDS_OFFSET 200 -#define MDS_INO_INODEFILE_OFFSET 300 -#define MDS_INO_ANCHORTABLE 400 -#define MDS_INO_BASE 1000 +#define MDS_INO_LOG_OFFSET 0x100 +#define MDS_INO_IDS_OFFSET 0x200 +#define MDS_INO_INODEFILE_OFFSET 0x300 +#define MDS_INO_ANCHORTABLE 0x400 +#define MDS_INO_BASE 0x1000 #define MDS_MKFS_FAST 1 // fake new root inode+dir #define MDS_MKFS_FULL 2 // wipe osd's too diff --git a/ceph/messages/MNSConnect.h b/ceph/messages/MNSConnect.h new file mode 100644 index 0000000000000..dd9517b330e44 --- /dev/null +++ b/ceph/messages/MNSConnect.h @@ -0,0 +1,31 @@ +#ifndef __MNSCONNECT_H +#define __MNSCONNECT_H + +#include "msg/Message.h" +#include "msg/TCPMessenger.h" + +class MNSConnect : public Message { + tcpaddr_t tcpaddr; + + public: + MNSConnect() {} + MNSConnect(tcpaddr_t t) : + Message(MSG_NS_CONNECT) { + tcpaddr = t; + } + + char *get_type_name() { return "NSCon"; } + + tcpaddr_t& get_addr() { return tcpaddr; } + + void encode_payload() { + payload.append((char*)&tcpaddr, sizeof(tcpaddr)); + } + void decode_payload() { + payload.copy(0, sizeof(tcpaddr), (char*)&tcpaddr); + } +}; + + +#endif + diff --git a/ceph/messages/MNSConnectAck.h b/ceph/messages/MNSConnectAck.h new file mode 100644 index 0000000000000..af602b112f205 --- /dev/null +++ b/ceph/messages/MNSConnectAck.h @@ -0,0 +1,31 @@ +#ifndef __MNSCONNECTACK_H +#define __MNSCONNECTACK_H + +#include "msg/Message.h" +#include "msg/TCPMessenger.h" + +class MNSConnectAck : public Message { + int rank; + + public: + MNSConnectAck() {} + MNSConnectAck(int r) : + Message(MSG_NS_CONNECTACK) { + rank = r; + } + + char *get_type_name() { return "NSConA"; } + + int get_rank() { return rank; } + + void encode_payload() { + payload.append((char*)&rank, sizeof(rank)); + } + void decode_payload() { + payload.copy(0, sizeof(rank), (char*)&rank); + } +}; + + +#endif + diff --git a/ceph/messages/MNSLookup.h b/ceph/messages/MNSLookup.h new file mode 100644 index 0000000000000..325eafb24b8f8 --- /dev/null +++ b/ceph/messages/MNSLookup.h @@ -0,0 +1,32 @@ +#ifndef __MNSLOOKUP_H +#define __MNSLOOKUP_H + +#include "msg/Message.h" + +class MNSLookup : public Message { + msg_addr_t entity; + + public: + MNSLookup() {} + MNSLookup(msg_addr_t e) : + Message(MSG_NS_LOOKUP) { + entity = e; + } + + char *get_type_name() { return "NSLook"; } + + msg_addr_t get_entity() { return entity; } + + void encode_payload() { + payload.append((char*)&entity, sizeof(entity)); + } + void decode_payload() { + int off = 0; + payload.copy(off, sizeof(entity), (char*)&entity); + off += sizeof(entity); + } +}; + + +#endif + diff --git a/ceph/messages/MNSLookupReply.h b/ceph/messages/MNSLookupReply.h new file mode 100644 index 0000000000000..8e0daa59e27e4 --- /dev/null +++ b/ceph/messages/MNSLookupReply.h @@ -0,0 +1,80 @@ +#ifndef __MNSLOOKUPREPLY_H +#define __MNSLOOKUPREPLY_H + +#include "msg/Message.h" +#include "msg/TCPMessenger.h" + +class MNSLookupReply : public Message { + public: + map entity_map; // e -> rank + map rank_addr; // rank -> addr + + public: + MNSLookupReply() {} + MNSLookupReply(MNSLookup *m) : + Message(MSG_NS_LOOKUPREPLY) { + } + + char *get_type_name() { return "NSLookR"; } + + /* + void map_rank(int e, int r) { + entity_map[e] = r; + } + void map_addr(int r, tcpaddr_t& a) { + rank_addr[r] = a; + } + */ + + void encode_payload() { + int n = entity_map.size(); + payload.append((char*)&n, sizeof(n)); + for (map::iterator it = entity_map.begin(); + it != entity_map.end(); + it++) { + payload.append((char*)&it->first, sizeof(it->first)); + payload.append((char*)&it->second, sizeof(it->second)); + } + + n = rank_addr.size(); + payload.append((char*)&n, sizeof(n)); + for (map::iterator it = rank_addr.begin(); + it != rank_addr.end(); + it++) { + payload.append((char*)&it->first, sizeof(it->first)); + payload.append((char*)&it->second, sizeof(it->second)); + } + } + void decode_payload() { + int off = 0; + int n; + + payload.copy(off, sizeof(n), (char*)&n); + off += sizeof(n); + for (int i=0; i rg_list; + + public: + __uint64_t get_version() { return map_version; } + list& get_rg_list() { return rg_list; } + + MOSDRGPeer() {} + MOSDRGPeer(__uint64_t v, list& l) : + Message(MSG_OSD_RG_PEER) { + this->map_version = v; + rg_list.splice(rg_list.begin(), l); + } + + char *get_type_name() { return "RGPeer"; } + + void encode_payload() { + payload.append((char*)&map_version, sizeof(map_version)); + _encode(rg_list, payload); + } + void decode_payload() { + int off = 0; + payload.copy(off, sizeof(map_version), (char*)&map_version); + off += sizeof(map_version); + _decode(rg_list, payload, off); + } +}; + +#endif diff --git a/ceph/messages/MOSDRGPeerAck.h b/ceph/messages/MOSDRGPeerAck.h new file mode 100644 index 0000000000000..eb176d2283267 --- /dev/null +++ b/ceph/messages/MOSDRGPeerAck.h @@ -0,0 +1,55 @@ +#ifndef __MOSDRGPEERACK_H +#define __MOSDRGPEERACK_H + +#include "msg/Message.h" +#include "osd/OSD.h" + +class MOSDRGPeerAck : public Message { + __uint64_t map_version; + + public: + list rg_dne; // rg dne + map rg_state; // state, lists, etc. + + __uint64_t get_version() { return map_version; } + + MOSDRGPeerAck() {} + MOSDRGPeerAck(__uint64_t v) : + Message(MSG_OSD_RG_PEERACK) { + this->map_version = v; + } + + char *get_type_name() { return "RGPeer"; } + + void encode_payload() { + payload.append((char*)&map_version, sizeof(map_version)); + _encode(rg_dne, payload); + + int n = rg_state.size(); + payload.append((char*)&n, sizeof(n)); + for (map::iterator it = rg_state.begin(); + it != rg_state.end(); + it++) { + payload.append((char*)&it->first, sizeof(it->first)); + it->second._encode(payload); + } + } + void decode_payload() { + int off = 0; + payload.copy(off, sizeof(map_version), (char*)&map_version); + off += sizeof(map_version); + _decode(rg_dne, payload, off); + + int n; + payload.copy(off, sizeof(n), (char*)&n); + off += sizeof(n); + for (int i=0; i rg_list; + + public: + __uint64_t get_version() { return map_version; } + list& get_rg_list() { return rg_list; } + + MOSDRGPeerRequest() {} + MOSDRGPeerRequest(__uint64_t v, list& l) : + Message(MSG_OSD_RG_PEERREQUEST) { + this->map_version = v; + rg_list.splice(rg_list.begin(), l); + } + + char *get_type_name() { return "RGPR"; } + + void encode_payload() { + payload.append((char*)&map_version, sizeof(map_version)); + _encode(rg_list, payload); + } + void decode_payload() { + int off = 0; + payload.copy(off, sizeof(map_version), (char*)&map_version); + off += sizeof(map_version); + _decode(rg_list, payload, off); + } +}; + +#endif diff --git a/ceph/msg/Message.h b/ceph/msg/Message.h index 7f537dba20f8c..e2c4a8153f8ff 100644 --- a/ceph/msg/Message.h +++ b/ceph/msg/Message.h @@ -2,13 +2,24 @@ #ifndef __MESSAGE_H #define __MESSAGE_H -#define MSG_PING 2 -#define MSG_PING_ACK 3 +#define MSG_NS_CONNECT 1 +#define MSG_NS_CONNECTACK 2 +#define MSG_NS_REGISTER 3 +#define MSG_NS_REGISTERACK 4 +#define MSG_NS_STARTED 5 +#define MSG_NS_UNREGISTER 6 +#define MSG_NS_LOOKUP 7 +#define MSG_NS_LOOKUPREPLY 8 -#define MSG_FAILURE 4 -#define MSG_FAILURE_ACK 5 -#define MSG_SHUTDOWN 6 +#define MSG_PING 10 +#define MSG_PING_ACK 11 + +#define MSG_FAILURE 12 +#define MSG_FAILURE_ACK 13 + +#define MSG_SHUTDOWN 99999 + #define MSG_OSD_OP 14 // delete, etc. #define MSG_OSD_OPREPLY 15 // delete, etc. @@ -17,9 +28,9 @@ #define MSG_OSD_GETMAP 17 #define MSG_OSD_MAP 18 -#define MSG_OSD_RG_NOTIFY 50 -#define MSG_OSD_RG_PEER 51 -#define MSG_OSD_RG_PEERACK 52 +#define MSG_OSD_RG_NOTIFY 50 +#define MSG_OSD_RG_PEER 51 +#define MSG_OSD_RG_PEERACK 52 #define MSG_CLIENT_REQUEST 60 @@ -101,28 +112,41 @@ #define MSG_MDS_SHUTDOWNSTART 900 #define MSG_MDS_SHUTDOWNFINISH 901 -//#include "config.h" - -#include "include/bufferlist.h" -// mds's, client's share same (integer) namespace ?????? -// osd's could be separate. - - -/* sandwich mds's, then osd's, then clients */ -#define MSG_ADDR_MDS(x) (x) -#define MSG_ADDR_OSD(x) (g_conf.num_mds+(x)) -#define MSG_ADDR_CLIENT(x) (g_conf.num_mds+g_conf.num_osd+(x)) +#include "include/bufferlist.h" -#define MSG_ADDR_ISCLIENT(x) ((x) >= g_conf.num_mds+g_conf.num_osd) -#define MSG_ADDR_TYPE(x) ((x) logical addr mapping! +#define MSG_ADDR_RANK_BASE 0x10000000 // per-rank messenger services +#define MSG_ADDR_MDS_BASE 0x20000000 +#define MSG_ADDR_OSD_BASE 0x30000000 +#define MSG_ADDR_CLIENT_BASE 0x40000000 +#define MSG_ADDR_TYPE_MASK 0xf0000000 +#define MSG_ADDR_NUM_MASK 0x0fffffff +#define MSG_ADDR_NEW 0x0fffffff + +#define MSG_ADDR_RANK(x) (MSG_ADDR_RANK_BASE + (x)) +#define MSG_ADDR_MDS(x) (MSG_ADDR_MDS_BASE + (x)) +#define MSG_ADDR_OSD(x) (MSG_ADDR_OSD_BASE + (x)) +#define MSG_ADDR_CLIENT(x) (MSG_ADDR_CLIENT_BASE + (x)) + +#define MSG_ADDR_DIRECTORY 0 +#define MSG_ADDR_RANK_NEW MSG_ADDR_RANK(MSG_ADDR_NEW) +#define MSG_ADDR_MDS_NEW MSG_ADDR_MDS(MSG_ADDR_NEW) +#define MSG_ADDR_OSD_NEW MSG_ADDR_OSD(MSG_ADDR_NEW) +#define MSG_ADDR_CLIENT_NEW MSG_ADDR_CLIENT(MSG_ADDR_NEW) + +#define MSG_ADDR_ISCLIENT(x) ((x) >= MSG_ADDR_CLIENT_BASE) +#define MSG_ADDR_TYPE(x) (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_RANK_BASE ? "rank": \ + (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_CLIENT_BASE ? "client": \ + (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_OSD_BASE ? "osd": \ + (((x) & MSG_ADDR_TYPE_MASK) == MSG_ADDR_MDS_BASE ? "mds": \ + ((x) == MSG_ADDR_DIRECTORY ? "namer":"unknown"))))) +#define MSG_ADDR_NUM(x) ((x) & MSG_ADDR_NUM_MASK) #define MSG_ADDR_NICE(x) MSG_ADDR_TYPE(x) << MSG_ADDR_NUM(x) + #include #include diff --git a/ceph/msg/Messenger.cc b/ceph/msg/Messenger.cc index 735cbdfe46a62..17a025b348bb4 100644 --- a/ceph/msg/Messenger.cc +++ b/ceph/msg/Messenger.cc @@ -12,6 +12,13 @@ using namespace std; +#include "messages/MNSConnect.h" +#include "messages/MNSConnectAck.h" +#include "messages/MNSRegister.h" +#include "messages/MNSRegisterAck.h" +#include "messages/MNSLookup.h" +#include "messages/MNSLookupReply.h" + #include "messages/MPing.h" #include "messages/MPingAck.h" #include "messages/MFailure.h" @@ -22,6 +29,8 @@ using namespace std; #include "messages/MOSDOpReply.h" #include "messages/MOSDMap.h" #include "messages/MOSDRGNotify.h" +#include "messages/MOSDRGPeer.h" +#include "messages/MOSDRGPeerAck.h" #include "messages/MClientMount.h" #include "messages/MClientMountAck.h" @@ -197,6 +206,25 @@ decode_message(msg_envelope_t& env, bufferlist& payload) // -- with payload -- + case MSG_NS_CONNECT: + m = new MNSConnect(); + break; + case MSG_NS_CONNECTACK: + m = new MNSConnectAck(); + break; + case MSG_NS_REGISTER: + m = new MNSRegister(); + break; + case MSG_NS_REGISTERACK: + m = new MNSRegisterAck(); + break; + case MSG_NS_LOOKUP: + m = new MNSLookup(); + break; + case MSG_NS_LOOKUPREPLY: + m = new MNSLookupReply(); + break; + case MSG_PING: m = new MPing(); break; @@ -225,6 +253,12 @@ decode_message(msg_envelope_t& env, bufferlist& payload) case MSG_OSD_RG_NOTIFY: m = new MOSDRGNotify(); break; + case MSG_OSD_RG_PEER: + m = new MOSDRGPeer(); + break; + case MSG_OSD_RG_PEERACK: + m = new MOSDRGPeerAck(); + break; // clients case MSG_CLIENT_MOUNT: @@ -399,9 +433,11 @@ decode_message(msg_envelope_t& env, bufferlist& payload) // -- simple messages without payload -- + case MSG_NS_STARTED: + case MSG_NS_UNREGISTER: + case MSG_SHUTDOWN: case MSG_MDS_SHUTDOWNSTART: case MSG_MDS_SHUTDOWNFINISH: - case MSG_SHUTDOWN: case MSG_CLIENT_UNMOUNT: case MSG_OSD_GETMAP: m = new MGenericMessage(env.type); diff --git a/ceph/msg/Messenger.h b/ceph/msg/Messenger.h index 8f1026cf98993..c1895693d2f14 100644 --- a/ceph/msg/Messenger.h +++ b/ceph/msg/Messenger.h @@ -30,13 +30,15 @@ class Messenger { Messenger(msg_addr_t w) : dispatcher(0), _myaddr(w), _last_pcid(1) { } virtual ~Messenger() { } + void set_myaddr(msg_addr_t m) { _myaddr = m; } msg_addr_t get_myaddr() { return _myaddr; } virtual int shutdown() = 0; // setup - void set_dispatcher(Dispatcher *d) { dispatcher = d; } + void set_dispatcher(Dispatcher *d) { dispatcher = d; ready(); } Dispatcher *get_dispatcher() { return dispatcher; } + virtual void ready() { } // dispatch incoming messages virtual void dispatch(Message *m); diff --git a/ceph/msg/TCPDirectory.cc b/ceph/msg/TCPDirectory.cc new file mode 100644 index 0000000000000..a06897c69a781 --- /dev/null +++ b/ceph/msg/TCPDirectory.cc @@ -0,0 +1,162 @@ + +#include "TCPDirectory.h" + +#include "messages/MNSConnect.h" +#include "messages/MNSConnectAck.h" +#include "messages/MNSRegister.h" +#include "messages/MNSRegisterAck.h" +#include "messages/MNSLookup.h" +#include "messages/MNSLookupReply.h" +//#include "messages/MNSUnregister.h" + +#include "config.h" +#undef dout +#define dout(x) if (x <= g_conf.debug) cout << "nameserver: " + + +void TCPDirectory::handle_connect(MNSConnect *m) +{ + int rank = nrank++; + dout(2) << "connect from new rank " << rank << endl; + + dir[MSG_ADDR_RANK(rank)] = rank; + messenger->map_entity_rank(MSG_ADDR_RANK(rank), rank); + + rank_addr[rank] = m->get_addr(); + messenger->map_rank_addr(rank, m->get_addr()); + + messenger->send_message(new MNSConnectAck(rank), + MSG_ADDR_RANK(rank)); + delete m; +} + + + +void TCPDirectory::handle_register(MNSRegister *m) +{ + dout(10) << "register from rank " << m->get_rank() << " addr " << MSG_ADDR_NICE(m->get_entity()) << endl; + + // pick id + int rank = m->get_rank(); + int entity = m->get_entity(); + + if ((entity & MSG_ADDR_NUM_MASK) == MSG_ADDR_NEW) { + // make up a new address! + switch (entity) { + + case MSG_ADDR_RANK_NEW: // stupid client should be able to figure this out + entity = MSG_ADDR_RANK(rank); + break; + + case MSG_ADDR_MDS_NEW: + entity = MSG_ADDR_MDS(nmds++); + break; + + case MSG_ADDR_OSD_NEW: + entity = MSG_ADDR_OSD(nosd++); + break; + + case MSG_ADDR_CLIENT_NEW: + entity = MSG_ADDR_CLIENT(nclient++); + break; + + default: + assert(0); + } + } else { + // specific address! + assert(dir.count(entity) == 0); // make sure it doesn't exist yet. + } + + dout(2) << "registered " << MSG_ADDR_NICE(entity) << endl; + + // register + dir[entity] = rank; + + if (entity == MSG_ADDR_RANK(rank)) // map this locally now so we can reply + messenger->map_entity_rank(entity, rank); // otherwise wait until they send STARTED msg + + hold.insert(entity); + + ++version; + update_log[version] = entity; + + // reply w/ new id + messenger->send_message(new MNSRegisterAck(m->get_tid(), entity), + MSG_ADDR_RANK(rank)); + delete m; +} + +void TCPDirectory::handle_started(Message *m) +{ + msg_addr_t entity = m->get_source(); + + dout(3) << "start signal from " << MSG_ADDR_NICE(entity) << endl; + hold.erase(entity); + messenger->map_entity_rank(entity, dir[entity]); + + // waiters? + if (waiting.count(entity)) { + list ls; + ls.splice(ls.begin(), waiting[entity]); + waiting.erase(entity); + + dout(10) << "doing waiter on " << MSG_ADDR_NICE(entity) << endl; + for (list::iterator it = ls.begin(); + it != ls.end(); + it++) { + dispatch(*it); + } + } +} + +void TCPDirectory::handle_unregister(Message *m) +{ + int who = m->get_source(); + dout(2) << "unregister from entity " << MSG_ADDR_NICE(who) << endl; + + assert(dir.count(who)); + dir.erase(who); + + // shutdown? + if (dir.size() <= 2) { + dout(2) << "dir is empty except for me, shutting down" << endl; + tcpmessenger_stop_nameserver(); + } + else { + if (0) { + dout(10) << "dir size now " << dir.size() << endl; + for (hash_map::iterator it = dir.begin(); + it != dir.end(); + it++) { + dout(10) << " dir: " << MSG_ADDR_NICE(it->first) << " on rank " << it->second << endl; + } + } + } + +} + + +void TCPDirectory::handle_lookup(MNSLookup *m) +{ + // have it? + if (dir.count(m->get_entity()) == 0 || + hold.count(m->get_entity())) { + dout(2) << "lookup " << MSG_ADDR_NICE(m->get_entity()) << " by " << MSG_ADDR_NICE(m->get_source()) << " dne or on hold" << endl; + waiting[m->get_entity()].push_back(m); + return; + } + + // look it up! + MNSLookupReply *reply = new MNSLookupReply(m); + + int rank = dir[m->get_entity()]; + reply->entity_map[m->get_entity()] = rank; + reply->rank_addr[rank] = rank_addr[rank]; + + dout(2) << "lookup " << MSG_ADDR_NICE(m->get_entity()) << " by " << MSG_ADDR_NICE(m->get_source()) << " is on rank " << rank << endl; + + messenger->send_message(reply, + m->get_source(), m->get_source_port()); + delete m; +} diff --git a/ceph/msg/TCPDirectory.h b/ceph/msg/TCPDirectory.h new file mode 100644 index 0000000000000..318ac172cea22 --- /dev/null +++ b/ceph/msg/TCPDirectory.h @@ -0,0 +1,81 @@ +#ifndef __TCPDIRECTORY_H +#define __TCPDIRECTORY_H + +/* + * rank -- a process (listening on some host:port) + * entity -- a logical entity (osd123, mds3, client3245, etc.) + * + * multiple entities can coexist on a single rank. + */ + +#include "Dispatcher.h" +#include "TCPMessenger.h" + +#include +using namespace std; +#include +using namespace __gnu_cxx; + +class TCPDirectory : public Dispatcher { + protected: + // how i communicate + TCPMessenger *messenger; + + // directory + hash_map dir; // entity -> rank + hash_map rank_addr; // rank -> ADDR (e.g. host:port) + + __uint64_t version; + map<__uint64_t, int> update_log; + + int nrank; + int nclient, nmds, nosd; + + set hold; + map > waiting; + + // messages + void handle_connect(class MNSConnect*); + void handle_register(class MNSRegister *m); + void handle_started(Message *m); + void handle_lookup(class MNSLookup *m); + void handle_unregister(Message *m); + + public: + TCPDirectory(TCPMessenger *m) : + messenger(m), + version(0), + nrank(0), nclient(0), nmds(0), nosd(0) { + messenger->set_dispatcher(this); + // i am rank 0! + dir[MSG_ADDR_DIRECTORY] = 0; + rank_addr[0] = m->get_tcpaddr(); + cout << "export CEPH_NAMESERVER=" << m->get_tcpaddr() << endl; + ++nrank; + } + + void dispatch(Message *m) { + switch (m->get_type()) { + case MSG_NS_CONNECT: + handle_connect((class MNSConnect*)m); + break; + case MSG_NS_REGISTER: + handle_register((class MNSRegister*)m); + break; + case MSG_NS_STARTED: + handle_started(m); + break; + case MSG_NS_UNREGISTER: + handle_unregister(m); + break; + case MSG_NS_LOOKUP: + handle_lookup((class MNSLookup*)m); + break; + + default: + assert(0); + } + } +}; + +#endif diff --git a/ceph/msg/TCPMessenger.cc b/ceph/msg/TCPMessenger.cc index 400176e9ad036..d72dd141d083e 100644 --- a/ceph/msg/TCPMessenger.cc +++ b/ceph/msg/TCPMessenger.cc @@ -25,83 +25,236 @@ using namespace __gnu_cxx; #include #include -#include +#include "messages/MGenericMessage.h" +#include "messages/MNSConnect.h" +#include "messages/MNSConnectAck.h" +#include "messages/MNSRegister.h" +#include "messages/MNSRegisterAck.h" +#include "messages/MNSLookup.h" +#include "messages/MNSLookupReply.h" + +#include "TCPDirectory.h" #define DBL 18 -int tcp_port = 9876; -/* - * We make a directory, so that we can have multiple Messengers in the - * same process (rank). This is useful for benchmarking and creating lots of - * simulated clients, e.g. - */ +TCPMessenger *rankmessenger = 0; // + +TCPDirectory *nameserver = 0; // only defined on rank 0 +TCPMessenger *nsmessenger = 0; + + +// local directory hash_map directory; // local Mutex directory_lock; + +// connecting +struct sockaddr_in listen_addr; // my listen addr +int listen_sd = 0; +int my_rank = -1; +Cond waiting_for_rank; + +// register +long regid = 0; +map waiting_for_register_cond; +map waiting_for_register_result; + +// incoming messages list incoming; Mutex incoming_lock; Cond incoming_cond; + +// outgoing messages list outgoing; Mutex outgoing_lock; Cond outgoing_cond; -struct sockaddr_in *remote_addr; -int *in_sd; // incoming sockets -pthread_t *in_threads; -int *out_sd; // outgoing sockets - -struct sockaddr_in listen_addr; -int listen_sd = 0; - +Mutex lookup_lock; // +hash_map entity_rank; // entity -> rank +hash_map rank_sd; // outgoing sockets, rank -> sd +hash_map rank_addr; // rank -> tcpaddr +map > waiting_for_lookup; /* this process */ -int mpi_world; -int mpi_rank; bool tcp_done = false; // set this flag to stop the event loop + +// threads pthread_t dispatch_thread_id = 0; // thread id of the event loop. init value == nobody -pthread_t out_thread_id = 0; // thread id of the event loop. init value == nobody +pthread_t out_thread_id = 0; // thread id of the event loop. init value == nobody pthread_t listen_thread_id = 0; -Mutex sender_lock; +map in_threads; // sd -> threadid bool pending_timer = false; +// per-rank fun // debug #undef dout -#define dout(l) if (l<=g_conf.debug) cout << "[TCP " << mpi_rank << "/" << mpi_world << " " << getpid() << "." << pthread_self() << "] " +#define dout(l) if (l<=g_conf.debug) cout << "[TCP " << my_rank << " " << getpid() << "." << pthread_self() << "] " -ostream& operator<<(ostream& out, struct sockaddr_in &a) -{ - char addr[4]; - memcpy((char*)addr, (char*)&a.sin_addr.s_addr, 4); - out << (unsigned)addr[0] << "." - << (unsigned)addr[1] << "." - << (unsigned)addr[2] << "." - << (unsigned)addr[3] << ":" - << (int)a.sin_port; - return out; -} -/***** - * MPI global methods for process-wide startup, shutdown. + +// some declarations +void tcp_open(int rank); +int tcp_send(Message *m); +void tcpmessenger_kick_dispatch_loop(); + + + + +/** rankserver + * + * one per rank. handles entity->rank lookup replies. */ -int tcpmessenger_init(int& argc, char**& argv) +class RankServer : public Dispatcher { +public: + void dispatch(Message *m) { + lookup_lock.Lock(); + + dout(DBL) << "rankserver dispatching " << *m << endl; + + switch (m->get_type()) { + case MSG_NS_CONNECTACK: + handle_connect_ack((MNSConnectAck*)m); + break; + + case MSG_NS_REGISTERACK: + handle_register_ack((MNSRegisterAck*)m); + break; + + case MSG_NS_LOOKUPREPLY: + handle_lookup_reply((MNSLookupReply*)m); + break; + + default: + assert(0); + } + + lookup_lock.Unlock(); + } + + void handle_connect_ack(MNSConnectAck *m) { + dout(DBL) << "my rank is " << m->get_rank(); + my_rank = m->get_rank(); + + // now that i know my rank, + entity_rank[MSG_ADDR_RANK(my_rank)] = my_rank; + rank_addr[my_rank] = listen_addr; + + waiting_for_rank.Signal(); + + delete m; + } + + void handle_register_ack(MNSRegisterAck *m) { + long tid = m->get_tid(); + waiting_for_register_result[tid] = m->get_entity(); + waiting_for_register_cond[tid]->Signal(); + delete m; + } + + void handle_lookup_reply(MNSLookupReply *m) { + list waiting; + dout(DBL) << "got lookup reply" << endl; + + for (map::iterator it = m->entity_map.begin(); + it != m->entity_map.end(); + it++) { + dout(DBL) << "lookup got " << MSG_ADDR_NICE(it->first) << " on rank " << it->second << endl; + entity_rank[it->first] = it->second; + + // take waiters + waiting.splice(waiting.begin(), waiting_for_lookup[it->first]); + waiting_for_lookup.erase(it->first); + } + + for (map::iterator it = m->rank_addr.begin(); + it != m->rank_addr.end(); + it++) { + dout(DBL) << "lookup got rank " << it->first << " addr " << it->second << endl; + rank_addr[it->first] = it->second; + + // open it now + tcp_open(it->first); + } + + // send waiting messages + for (list::iterator it = waiting.begin(); + it != waiting.end(); + it++) { + tcp_send(*it); + } + + delete m; + } + +} rankserver; + + +class C_TCPKicker : public Context { + void finish(int r) { + dout(DBL) << "timer kick" << endl; + tcpmessenger_kick_dispatch_loop(); + } +}; + + +extern int tcpmessenger_lookup(char *str, tcpaddr_t& ta) { - // exhcnage addresses with other nodes - MPI_Init(&argc, &argv); + char *host = str; + char *port = 0; - MPI_Comm_size(MPI_COMM_WORLD, &mpi_world); - MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + for (int i=0; str[i]; i++) { + if (str[i] == ':') { + port = str+i+1; + str[i] = 0; + break; + } + } + if (!port) { + cerr << "addr '" << str << "' doesn't look like 'host:port'" << endl; + return -1; + } + //cout << "host '" << host << "' port '" << port << "'" << endl; + + int iport = atoi(port); + + struct hostent *myhostname = gethostbyname( host ); + if (!myhostname) { + cerr << "host " << host << " not found" << endl; + return -1; + } + + memset(&ta, 0, sizeof(ta)); - dout(DBL) << "rank is " << mpi_rank << " / " << mpi_world << endl; + //cout << "addrtype " << myhostname->h_addrtype << " len " << myhostname->h_length << endl; + ta.sin_family = myhostname->h_addrtype; + memcpy((char *)&ta.sin_addr, + myhostname->h_addr, + myhostname->h_length); + ta.sin_port = iport; + + cout << "lookup '" << host << ":" << port << "' -> " << ta << endl; + + return 0; +} + + + +/***** + * global methods for process-wide startup, shutdown. + */ + +int tcpmessenger_init() +{ // LISTEN dout(DBL) << "binding to listen " << endl; @@ -124,81 +277,113 @@ int tcpmessenger_init(int& argc, char**& argv) int myport = listen_addr.sin_port; // listen! - rc = ::listen(listen_sd, 2*mpi_world); + rc = ::listen(listen_sd, 1000); assert(rc >= 0); dout(DBL) << "listening on " << myport << endl; - remote_addr = new sockaddr_in[mpi_world]; - memset(remote_addr, 0, sizeof(sockaddr_in)*mpi_world); - // my address is... char host[100]; gethostname(host, 100); dout(DBL) << "my hostname is " << host << endl; struct hostent *myhostname = gethostbyname( host ); - - struct sockaddr_in myaddr; - memset(&myaddr, 0, sizeof(myaddr)); - myaddr.sin_family = myhostname->h_addrtype; - memcpy((char *) &myaddr.sin_addr.s_addr, + struct sockaddr_in my_addr; + memset(&my_addr, 0, sizeof(my_addr)); + + my_addr.sin_family = myhostname->h_addrtype; + memcpy((char *) &my_addr.sin_addr.s_addr, myhostname->h_addr_list[0], myhostname->h_length); - myaddr.sin_port = myport; + my_addr.sin_port = myport; - dout(DBL) << "my ip is " << myaddr << endl; + listen_addr = my_addr; - remote_addr[mpi_rank] = myaddr; + dout(DBL) << "listen addr is " << listen_addr << endl; - dout(DBL) << "MPI_Allgathering addrs" << endl; - MPI_Allgather( &myaddr, sizeof(struct sockaddr_in), MPI_CHAR, - remote_addr, sizeof(struct sockaddr_in), MPI_CHAR, - MPI_COMM_WORLD); + // register to execute timer events + g_timer.set_messenger_kicker(new C_TCPKicker()); - // for (int i=0; ishutdown(); + delete m; + } } +// on all ranks +void tcpmessenger_start_rankserver(tcpaddr_t& ns) +{ + // connect to nameserver + entity_rank[MSG_ADDR_DIRECTORY] = 0; + rank_addr[0] = ns; + tcp_open(0); + + if (my_rank >= 0) { + // i know my rank + rankmessenger = new TCPMessenger(MSG_ADDR_RANK(my_rank)); + } else { + // start rank messenger, and discover my rank. + rankmessenger = new TCPMessenger(MSG_ADDR_RANK_NEW); + } +} +void tcpmessenger_stop_rankserver() +{ + if (rankmessenger) { + dout(DBL) << "shutting down rankmessenger" << endl; + rankmessenger->shutdown(); + delete rankmessenger; + rankmessenger = 0; + } +} + + + + int tcpmessenger_shutdown() { dout(DBL) << "tcpmessenger_shutdown barrier" << endl; - MPI_Barrier (MPI_COMM_WORLD); - MPI_Finalize(); dout(2) << "tcpmessenger_shutdown closing all sockets etc" << endl; // bleh - for (int i=0; i::iterator it = rank_sd.begin(); + it != rank_sd.end(); + it++) { + ::close(it->second); } - delete[] remote_addr; - delete[] in_sd; - delete[] out_sd; - return 0; } -int tcpmessenger_world() -{ - return mpi_world; -} @@ -243,39 +428,14 @@ void tcp_write(int sd, char *buf, int len) */ -/* -void tcp_wait() -{ - fd_set fds; - FD_ZERO(&fds); - - int n = 0; - for (int i=0; iget_source()) << endl; + return m; } -void tcp_open(int who) +void tcp_open(int rank) { - //dout(DBL) << "tcp_open " << who << " to " << remote_addr[who] << endl; + dout(DBL) << "tcp_open to rank " << rank << " at " << rank_addr[rank] << endl; // create socket? int sd = socket(AF_INET,SOCK_STREAM,0); @@ -328,17 +490,42 @@ void tcp_open(int who) assert(rc>=0); // connect! - int r = connect(sd, (sockaddr*)&remote_addr[who], sizeof(myAddr)); + int r = connect(sd, (sockaddr*)&rank_addr[rank], sizeof(myAddr)); assert(r >= 0); //dout(DBL) << "tcp_open connected to " << who << endl; + rank_sd[rank] = sd; +} - int me = mpi_rank; - tcp_write(sd, (char*)&me, sizeof(me)); - out_sd[who] = sd; +void tcp_marshall(Message *m) +{ + // marshall + if (m->empty_payload()) + m->encode_payload(); } +bool tcp_lookup(Message *m) +{ + msg_addr_t addr = m->get_dest(); + + if (!entity_rank.count(m->get_dest())) { + // lookup and wait. + if (waiting_for_lookup.count(addr)) { + dout(DBL) << "already looking up " << MSG_ADDR_NICE(addr) << endl; + } else { + dout(DBL) << "lookup on " << MSG_ADDR_NICE(addr) << endl; + MNSLookup *r = new MNSLookup(addr); + rankmessenger->send_message(r, MSG_ADDR_DIRECTORY); + } + + // add waiter + waiting_for_lookup[addr].push_back(m); + return false; + } + + return true; +} /* @@ -346,11 +533,13 @@ void tcp_open(int who) */ int tcp_send(Message *m) { - int rank = MPI_DEST_TO_RANK(m->get_dest(), mpi_world); + int rank = entity_rank[m->get_dest()]; + if (rank_sd.count(rank) == 0) tcp_open(rank); - // marshall - if (m->empty_payload()) - m->encode_payload(); + int sd = rank_sd[rank]; + assert(sd); + + // get envelope, buffers msg_envelope_t *env = &m->get_envelope(); bufferlist blist; blist.claim( m->get_payload() ); @@ -361,15 +550,10 @@ int tcp_send(Message *m) env->nchunks = 1; #endif - dout(7) << "sending " << *m << " to " << MSG_ADDR_NICE(env->dest) << " (rank " << rank << ")" << endl; + dout(7) << "sending " << *m << " to " << MSG_ADDR_NICE(m->get_dest()) << " rank " << rank << endl;//" sd " << sd << ")" << endl; - sender_lock.Lock(); - - // open first? - if (out_sd[rank] == 0) tcp_open(rank); - // send envelope - tcp_write( out_sd[rank], (char*)env, sizeof(*env) ); + tcp_write( sd, (char*)env, sizeof(*env) ); // payload #ifdef TCP_KEEP_CHUNKS @@ -380,23 +564,21 @@ int tcp_send(Message *m) it++) { dout(DBL) << "tcp_sending frag " << i << " len " << (*it).length() << endl; int size = (*it).length(); - tcp_write( out_sd[rank], (char*)&size, sizeof(size) ); - tcp_write( out_sd[rank], (*it).c_str(), size ); + tcp_write( sd, (char*)&size, sizeof(size) ); + tcp_write( sd, (*it).c_str(), size ); i++; } #else // one big chunk int size = blist.length(); - tcp_write( out_sd[rank], (char*)&size, sizeof(size) ); + tcp_write( sd, (char*)&size, sizeof(size) ); for (list::iterator it = blist.buffers().begin(); it != blist.buffers().end(); it++) { - tcp_write( out_sd[rank], (*it).c_str(), (*it).length() ); + tcp_write( sd, (*it).c_str(), (*it).length() ); } #endif - sender_lock.Unlock(); - // hose message delete m; return 0; @@ -425,6 +607,8 @@ void* tcp_outthread(void*) while (!out.empty()) { Message *m = out.front(); out.pop_front(); + + tcp_marshall(m); tcp_send(m); } @@ -446,22 +630,15 @@ void* tcp_outthread(void*) */ void *tcp_inthread(void *r) { - int who = (int)r; + int sd = (int)r; + int who = -1; - dout(DBL) << "tcp_inthread reading for " << who << endl; + dout(DBL) << "tcp_inthread reading on sd " << sd << " who is " << who << endl; while (!tcp_done) { - /* - fd_set fds; - FD_ZERO(&fds); - FD_SET(in_sd[who], &fds); - - dout(DBL) << "tcp_inthread waiting on socket" << endl; - ::select(1, &fds, 0, &fds, 0); - */ - - Message *m = tcp_recv(who); + Message *m = tcp_recv(sd); if (!m) break; + who = m->get_source(); // give to dispatch loop incoming_lock.Lock(); @@ -470,11 +647,9 @@ void *tcp_inthread(void *r) incoming_lock.Unlock(); } - dout(DBL) << "tcp_inthread closing " << who << endl; - - //::close(in_sd[who]); - //in_sd[who] = 0; + dout(DBL) << "tcp_inthread closing " << sd << endl; + //::close(sd); return 0; } @@ -486,34 +661,26 @@ void *tcp_acceptthread(void *) { dout(DBL) << "tcp_acceptthread starting" << endl; - int left = mpi_world; - while (left > 0) { + while (!tcp_done) { //dout(DBL) << "accepting, left = " << left << endl; struct sockaddr_in addr; socklen_t slen = sizeof(addr); int sd = ::accept(listen_sd, (sockaddr*)&addr, &slen); if (sd > 0) { - - //dout(DBL) << "accepted incoming, reading who it is " << endl; - - int who; - tcp_read(sd, (char*)&who, sizeof(who)); - - in_sd[who] = sd; - left--; + dout(DBL) << "accepted incoming on sd " << sd << endl; - dout(DBL) << "accepted incoming from " << who << ", left = " << left << endl; - - pthread_create(&in_threads[who], + pthread_t th; + pthread_create(&th, NULL, tcp_inthread, - (void*)who); + (void*)sd); + in_threads[sd] = th; } else { dout(DBL) << "no incoming connection?" << endl; + break; } } - dout(DBL) << "got incoming from everyone!" << endl; return 0; } @@ -562,7 +729,18 @@ void* tcp_dispatchthread(void*) while (in.size()) { Message *m = in.front(); in.pop_front(); + + dout(DBL) << "dispatch doing " << *m << endl; + // for rankserver? + if (m->get_type() == MSG_NS_CONNECTACK || // i just connected + m->get_dest() == MSG_ADDR_RANK(my_rank)) { + dout(DBL) << " giving to rankserver" << endl; + rankserver.dispatch(m); + continue; + } + + // ok int dest = m->get_dest(); directory_lock.Lock(); if (directory.count(dest)) { @@ -578,7 +756,7 @@ void* tcp_dispatchthread(void*) who->dispatch(m); } else { directory_lock.Unlock(); - dout (1) << "---- i don't know who " << dest << " is." << endl; + dout (1) << "---- i don't know who " << MSG_ADDR_NICE(dest) << " " << dest << " is." << endl; assert(0); } } @@ -658,21 +836,71 @@ void tcpmessenger_wait() +msg_addr_t register_entity(msg_addr_t addr) +{ + lookup_lock.Lock(); + + // prepare to wait + long id = ++regid; + Cond cond; + waiting_for_register_cond[id] = &cond; + + if (my_rank < 0) { + dout(DBL) << "register_entity don't know my rank, connecting" << endl; + + // connect to nameserver; discover my rank. + Message *m = new MNSConnect(listen_addr); + m->set_dest(MSG_ADDR_DIRECTORY, 0); + tcp_marshall(m); + tcp_send(m); + + // wait for reply + waiting_for_rank.Wait(lookup_lock); + assert(my_rank > 0); + } + + // send req + dout(DBL) << "register_entity " << MSG_ADDR_NICE(addr) << endl; + Message *m = new MNSRegister(addr, my_rank, id); + m->set_dest(MSG_ADDR_DIRECTORY, 0); + tcp_marshall(m); + tcp_send(m); + + // wait? + if (waiting_for_register_result.count(id)) { + // already here? + } else + cond.Wait(lookup_lock); + + // get result, clean up + int entity = waiting_for_register_result[id]; + waiting_for_register_result.erase(id); + waiting_for_register_cond.erase(id); + + dout(DBL) << "register_entity got " << MSG_ADDR_NICE(entity) << endl; + + lookup_lock.Unlock(); + + // ok! + return entity; +} + + + /*********** * Tcpmessenger class implementation */ -class C_TCPKicker : public Context { - void finish(int r) { - dout(DBL) << "timer kick" << endl; - tcpmessenger_kick_dispatch_loop(); - } -}; TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr) { + if (myaddr != MSG_ADDR_DIRECTORY) { + // register! + myaddr = register_entity(myaddr); + } + // my address - this->myaddr = myaddr; + set_myaddr( myaddr ); // register myself in the messenger directory directory_lock.Lock(); @@ -682,6 +910,7 @@ TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr) // register to execute timer events g_timer.set_messenger_kicker(new C_TCPKicker()); + // logger /* string name; @@ -696,25 +925,80 @@ TCPMessenger::TCPMessenger(msg_addr_t myaddr) : Messenger(myaddr) logger = new Logger(name, (LogType*)&mpimsg_logtype); loggers[ whoami ] = logger; */ + +} + + +void TCPMessenger::ready() +{ + if (get_myaddr() != MSG_ADDR_DIRECTORY) { + // started! tell namer we are up and running. + lookup_lock.Lock(); + Message *m = new MGenericMessage(MSG_NS_STARTED); + m->set_source(get_myaddr(), 0); + m->set_dest(MSG_ADDR_DIRECTORY, 0); + tcp_marshall(m); + tcp_send(m); + lookup_lock.Unlock(); + } } + TCPMessenger::~TCPMessenger() { //delete logger; } +tcpaddr_t& TCPMessenger::get_tcpaddr() +{ + return listen_addr; +} + +void TCPMessenger::map_entity_rank(msg_addr_t e, int r) +{ + lookup_lock.Lock(); + entity_rank[e] = r; + lookup_lock.Unlock(); +} + +void TCPMessenger::map_rank_addr(int r, tcpaddr_t a) +{ + lookup_lock.Lock(); + rank_addr[r] = a; + lookup_lock.Unlock(); +} + + int TCPMessenger::shutdown() { + // dont' send unregistery from nsmessenger shutdown! + if (this != nsmessenger && + (my_rank > 0 || nsmessenger)) { + dout(DBL) << "sending unregister from " << MSG_ADDR_NICE(get_myaddr()) << " to ns" << endl; + send_message(new MGenericMessage(MSG_NS_UNREGISTER), + MSG_ADDR_DIRECTORY); + } + // remove me from the directory directory_lock.Lock(); - directory.erase(myaddr); - bool lastone = directory.empty(); + directory.erase(get_myaddr()); + + // last one? + bool lastone = directory.empty(); + + // or almost last one? + if (rankmessenger && directory.size() == 1) { + directory_lock.Unlock(); + tcpmessenger_stop_rankserver(); + directory_lock.Lock(); + } + directory_lock.Unlock(); // last one? - if (lastone) { - dout(2) << "shutdown last tcpmessenger on rank " << mpi_rank << " shut down" << endl; + if (lastone) { + dout(2) << "shutdown last tcpmessenger on rank " << my_rank << " shut down" << endl; //pthread_t whoami = pthread_self(); // no more timer events @@ -723,12 +1007,12 @@ int TCPMessenger::shutdown() // close incoming sockets //void *r; - for (int i=0; i::iterator it = in_threads.begin(); + it != in_threads.end(); + it++) { + dout(DBL) << "closing reader on sd " << it->first << endl; + ::close(it->first); + //pthread_join(it->second, &r); } dout(DBL) << "setting tcp_done" << endl; @@ -750,9 +1034,7 @@ int TCPMessenger::shutdown() tcp_done = true; } */ - } else { - dout(10) << "shutdown still" /*<< directory.size()*/ << " other messengers on rank " << mpi_rank << endl; - } + } return 0; } @@ -768,14 +1050,19 @@ int TCPMessenger::shutdown() int TCPMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport) { // set envelope - m->set_source(myaddr, fromport); + m->set_source(get_myaddr(), fromport); m->set_dest(dest, port); if (1) { - // der - tcp_send(m); + // serialize all output + tcp_marshall(m); + + lookup_lock.Lock(); + if (tcp_lookup(m)) + tcp_send(m); + lookup_lock.Unlock(); } else { - // good way + // good way (that's actually similarly lame?) outgoing_lock.Lock(); outgoing.push_back(m); outgoing_cond.Signal(); diff --git a/ceph/msg/TCPMessenger.h b/ceph/msg/TCPMessenger.h index 22a5b5355a040..583edf07f4e4e 100644 --- a/ceph/msg/TCPMessenger.h +++ b/ceph/msg/TCPMessenger.h @@ -4,24 +4,31 @@ #include "Messenger.h" #include "Dispatcher.h" -#define NUMMDS g_conf.num_mds -#define NUMOSD g_conf.num_osd -#define MPI_DEST_TO_RANK(dest,world) ((dest)<(NUMMDS+NUMOSD) ? \ - (dest) : \ - ((NUMMDS+NUMOSD)+(((dest)-NUMMDS-NUMOSD) % ((world)-NUMMDS-NUMOSD)))) + +# include +# include +# include class Timer; +typedef struct sockaddr_in tcpaddr_t; + + class TCPMessenger : public Messenger { protected: - msg_addr_t myaddr; // my address - + //class Logger *logger; // for logging public: TCPMessenger(msg_addr_t myaddr); ~TCPMessenger(); + void ready(); + + tcpaddr_t& get_tcpaddr(); + void map_entity_rank(msg_addr_t e, int r); + void map_rank_addr(int r, tcpaddr_t a); + // init, shutdown MPI and associated event loop thread. virtual int shutdown(); @@ -33,11 +40,30 @@ class TCPMessenger : public Messenger { * these are all ONE per process. */ -extern int tcpmessenger_world(); -extern int tcpmessenger_init(int& argc, char**& argv); // init mpi +extern int tcpmessenger_lookup(char *str, tcpaddr_t& ta); + +extern int tcpmessenger_init(); extern int tcpmessenger_start(); // start thread extern void tcpmessenger_wait(); // wait for thread to finish. extern int tcpmessenger_shutdown(); // finalize MPI +extern void tcpmessenger_start_nameserver(tcpaddr_t& ta); // on rank 0 +extern void tcpmessenger_stop_nameserver(); // on rank 0 +extern void tcpmessenger_start_rankserver(tcpaddr_t& ta); // on all ranks +extern void tcpmessenger_stop_rankserver(); // on all ranks + + +inline ostream& operator<<(ostream& out, struct sockaddr_in &a) +{ + unsigned char addr[4]; + memcpy((char*)addr, (char*)&a.sin_addr.s_addr, 4); + out << (unsigned)addr[0] << "." + << (unsigned)addr[1] << "." + << (unsigned)addr[2] << "." + << (unsigned)addr[3] << ":" + << (int)a.sin_port; + return out; +} + #endif diff --git a/ceph/msg/mpistarter.cc b/ceph/msg/mpistarter.cc new file mode 100644 index 0000000000000..c72bdb960bd3f --- /dev/null +++ b/ceph/msg/mpistarter.cc @@ -0,0 +1,45 @@ + +#include + +#include "TCPMessenger.h" + +/* + * start up TCPMessenger via MPI. + */ + +pair mpi_bootstrap_tcp(int& argc, char**& argv) +{ + tcpmessenger_init(); + tcpmessenger_start(); + + // exchnage addresses with other nodes + MPI_Init(&argc, &argv); + + int mpi_world; + int mpi_rank; + MPI_Comm_size(MPI_COMM_WORLD, &mpi_world); + MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank); + + //dout(1) << "i am " << mpi_rank << " of " << mpi_world << endl; + + // start up directory? + tcpaddr_t ta; + if (mpi_rank == 0) { + dout(30) << "i am rank 0, starting ns directory" << endl; + tcpmessenger_start_nameserver(ta); + } else { + memset(&ta, 0, sizeof(ta)); + } + + // distribute tcpaddr + int r = MPI_Bcast(&ta, sizeof(ta), MPI_CHAR, + 0, MPI_COMM_WORLD); + + dout(30) << "r = " << r << " ns tcpaddr is " << ta << endl; + tcpmessenger_start_rankserver(ta); + + MPI_Barrier(MPI_COMM_WORLD); + MPI_Finalize(); + + return pair(mpi_rank, mpi_world); +} diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 1f0b5699c7c5e..9fbc81cc23ff7 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -23,8 +23,11 @@ #include "messages/MPingAck.h" #include "messages/MOSDOp.h" #include "messages/MOSDOpReply.h" + #include "messages/MOSDMap.h" #include "messages/MOSDRGNotify.h" +#include "messages/MOSDRGPeer.h" +#include "messages/MOSDRGPeerAck.h" #include "common/Logger.h" #include "common/LogType.h" @@ -55,7 +58,6 @@ OSD::OSD(int id, Messenger *m) whoami = id; messenger = m; - messenger->set_dispatcher(this); osdmap = 0; @@ -123,6 +125,10 @@ int OSD::init() monitor->init(); osd_lock.Unlock(); + + // i'm ready! + messenger->set_dispatcher(this); + return r; } @@ -171,6 +177,12 @@ void OSD::dispatch(Message *m) case MSG_OSD_RG_NOTIFY: handle_rg_notify((MOSDRGNotify*)m); break; + case MSG_OSD_RG_PEER: + handle_rg_peer((MOSDRGPeer*)m); + break; + case MSG_OSD_RG_PEERACK: + handle_rg_peer_ack((MOSDRGPeerAck*)m); + break; // osd case MSG_SHUTDOWN: @@ -482,7 +494,7 @@ void OSD::peer_start(int replica, map& rg_map) { dout(7) << "peer_start with osd" << replica << " on " << rg_map.size() << " RGs" << endl; - //MOSDRGPeerRequest *m = new MOSDRGPeerRequest(osdmap->get_version()); + list rgids; for (map::iterator it = rg_map.begin(); it != rg_map.end(); @@ -497,10 +509,16 @@ void OSD::peer_start(int replica, map& rg_map) assert(rgp->get_role() == role); } - - + // set last_request stamp + //rgp->last + // add to list + rgids.push_back(rg->get_rgid()); } + + MOSDRGPeer *m = new MOSDRGPeer(osdmap->get_version(), rgids); + messenger->send_message(m, + MSG_ADDR_OSD(replica)); } @@ -577,6 +595,136 @@ void OSD::handle_rg_notify(MOSDRGNotify *m) delete m; } +void OSD::handle_rg_peer(MOSDRGPeer *m) +{ + int from = MSG_ADDR_NUM(m->get_source()); + dout(7) << "handle_rg_peer from osd" << from << endl; + + // older map? + if (m->get_version() < osdmap->get_version()) { + dout(7) << " from old map version " << m->get_version() << " < " << osdmap->get_version() << endl; + delete m; // discard and ignore.* + return; + } + + // newer map? + if (m->get_version() > osdmap->get_version()) { + dout(7) << " for newer map version " << m->get_version() << " > " << osdmap->get_version() << endl; + wait_for_new_map(m); + return; + } + + assert(m->get_version() == osdmap->get_version()); + + // go + MOSDRGPeerAck *ack = new MOSDRGPeerAck(osdmap->get_version()); + + for (list::iterator it = m->get_rg_list().begin(); + it != m->get_rg_list().end(); + it++) { + repgroup_t rgid = *it; + + // dne? + if (!rg_exists(rgid)) { + dout(10) << " rg " << rgid << " dne" << endl; + ack->rg_dne.push_back(rgid); + continue; + } + + // get/open RG + RG *rg = open_rg(rgid); + + // report back state and rg content + ack->rg_state[rgid].state = rg->get_state(); + ack->rg_state[rgid].deleted = rg->get_deleted_objects(); + + // list objects + list olist; + rg->list_objects(store,olist); + + dout(10) << " rg " << rgid << " has state " << rg->get_state() << ", " << olist.size() << " objects" << endl; + + for (list::iterator it = olist.begin(); + it != olist.end(); + it++) { + version_t v = 0; + store->getattr(*it, + "version", + &v, sizeof(v)); + ack->rg_state[rgid].objects[*it] = v; + } + } + + // reply + messenger->send_message(ack, + MSG_ADDR_OSD(from)); + + delete m; +} + +void OSD::handle_rg_peer_ack(MOSDRGPeerAck *m) +{ + int from = MSG_ADDR_NUM(m->get_source()); + dout(7) << "handle_rg_peer_ack from osd" << from << endl; + + // older map? + if (m->get_version() < osdmap->get_version()) { + dout(7) << " from old map version " << m->get_version() << " < " << osdmap->get_version() << endl; + delete m; // discard and ignore.* + return; + } + + // newer map? + if (m->get_version() > osdmap->get_version()) { + dout(7) << " for newer map version " << m->get_version() << " > " << osdmap->get_version() << endl; + wait_for_new_map(m); + return; + } + + assert(m->get_version() == osdmap->get_version()); + + // + //list rg_dne; // rg dne + //map rg_state; // state, lists, etc. + + // rg_dne first + for (list::iterator it = m->rg_dne.begin(); + it != m->rg_dne.end(); + it++) { + dout(10) << " rg " << *it << " dne on osd" << from << endl; + + RG *rg = open_rg(*it); + assert(rg); + RGPeer *rgp = rg->get_peer(from); + if (rgp) { + rg->remove_peer(from); + } else { + dout(10) << " weird, i didn't have it!" << endl; // multiple lagged peer requests? + } + } + + // rg_state + for (map::iterator it = m->rg_state.begin(); + it != m->rg_state.end(); + it++) { + dout(10) << " rg " << it->first << " got state " << it->second.state + << " " << it->second.objects.size() << " objects, " + << it->second.deleted.size() << " deleted" << endl; + + RG *rg = open_rg(it->first); + assert(rg); + RGPeer *rgp = rg->get_peer(from); + assert(rgp); + + rgp->peer_state = it->second; + } + + // done + delete m; +} + + + diff --git a/ceph/osd/OSD.h b/ceph/osd/OSD.h index 75991a0197519..80c2d54ef05f8 100644 --- a/ceph/osd/OSD.h +++ b/ceph/osd/OSD.h @@ -21,6 +21,26 @@ class Message; typedef __uint64_t version_t; +struct RGReplicaInfo { + int state; + map objects; // remote object list + map deleted; // remote delete list + + void _encode(bufferlist& blist) { + blist.append((char*)&state, sizeof(state)); + ::_encode(objects, blist); + ::_encode(deleted, blist); + } + void _decode(bufferlist& blist, int& off) { + blist.copy(off, sizeof(state), (char*)&state); + off += sizeof(state); + ::_decode(objects, blist, off); + ::_decode(deleted, blist, off); + } + + RGReplicaInfo() : state(0) { } +}; + /** RGPeer * state associated with non-primary OSDS with RG content. @@ -37,9 +57,12 @@ class RGPeer { int role; // 0 primary, 1+ replica, -1 residual int state; + // peer state + public: + RGReplicaInfo peer_state; + + protected: // active|residual: used by primary for syncing (old) replicas - map objects; // remote object list - map deleted; // remote delete list map fetching; // objects i'm reading from replica map stray; // objects that need to be deleted @@ -60,12 +83,14 @@ class RGPeer { bool is_complete() { return state_test(RG_PEER_STATE_COMPLETE); } bool is_residual() { return role < 0; } - bool is_empty() { return is_active() && objects.empty(); } // *** && peer_state & COMPLETE + bool is_empty() { return is_active() && peer_state.objects.empty(); } // *** && peer_state & COMPLETE }; + + /** RG - Replica Group * */ @@ -82,7 +107,7 @@ class RGPeer { class RG { protected: - repgroup_t rg; + repgroup_t rgid; int role; // 0 = primary, 1 = secondary, etc. -1=undef/none. int state; // see bit defns above @@ -95,11 +120,12 @@ class RG { map deleted_objects; // locally deleted objects public: - RG(repgroup_t r) : rg(r), + RG(repgroup_t r) : rgid(r), role(0), - state(0) { } + state(0), + primary(-1) { } - repgroup_t get_rg() { return rg; } + repgroup_t get_rgid() { return rgid; } int get_role() { return role; } int get_primary() { return primary; } @@ -114,8 +140,15 @@ class RG { RGPeer* new_peer(int p, int r) { return peers[p] = new RGPeer(p, r); } + void remove_peer(int p) { + assert(peers.count(p)); + delete peers[p]; + peers.erase(p); + } + + set& get_old_replica_set() { return old_replica_set; } + map& get_deleted_objects() { return deleted_objects; } - set get_old_replica_set() { return old_replica_set; } int get_state() { return state; } bool state_test(int m) { return (state & m) != 0; } @@ -124,26 +157,26 @@ class RG { void state_clear(int m) { state &= ~m; } void store(ObjectStore *store) { - if (!store->collection_exists(rg)) - store->collection_create(rg); - store->collection_setattr(rg, "role", &role, sizeof(role)); - store->collection_setattr(rg, "primary", &primary, sizeof(primary)); - store->collection_setattr(rg, "state", &state, sizeof(state)); + if (!store->collection_exists(rgid)) + store->collection_create(rgid); + store->collection_setattr(rgid, "role", &role, sizeof(role)); + store->collection_setattr(rgid, "primary", &primary, sizeof(primary)); + store->collection_setattr(rgid, "state", &state, sizeof(state)); } void fetch(ObjectStore *store) { - store->collection_getattr(rg, "role", &role, sizeof(role)); - store->collection_getattr(rg, "primary", &primary, sizeof(primary)); - store->collection_getattr(rg, "state", &state, sizeof(state)); + store->collection_getattr(rgid, "role", &role, sizeof(role)); + store->collection_getattr(rgid, "primary", &primary, sizeof(primary)); + store->collection_getattr(rgid, "state", &state, sizeof(state)); } void add_object(ObjectStore *store, object_t oid) { - store->collection_add(rg, oid); + store->collection_add(rgid, oid); } void remove_object(ObjectStore *store, object_t oid) { - store->collection_remove(rg, oid); + store->collection_remove(rgid, oid); } void list_objects(ObjectStore *store, list& ls) { - store->collection_list(rg, ls); + store->collection_list(rgid, ls); } }; diff --git a/ceph/osdc/Filer.cc b/ceph/osdc/Filer.cc index 4c51991fc9013..8d1793de0079b 100644 --- a/ceph/osdc/Filer.cc +++ b/ceph/osdc/Filer.cc @@ -188,7 +188,7 @@ Filer::handle_osd_read_reply(MOSDOpReply *m) for (map::iterator bit = eit->buffer_extents.begin(); bit != eit->buffer_extents.end(); bit++) { - dout(10) << "object " << eit->oid << " extent " << eit->offset << " len " << eit->len << " : ox offset " << ox_off << " -> buffer extent " << bit->first << " len " << bit->second << endl; + dout(21) << "object " << eit->oid << " extent " << eit->offset << " len " << eit->len << " : ox offset " << ox_off << " -> buffer extent " << bit->first << " len " << bit->second << endl; by_off[bit->first] = new bufferlist; if (ox_off + bit->second <= ox_len) { @@ -203,14 +203,14 @@ Filer::handle_osd_read_reply(MOSDOpReply *m) p->bytes_read = bit->first + ox_len-ox_off; // zero end of bx - dout(10) << " adding some zeros to the end " << ox_off + bit->second-ox_len << endl; + dout(21) << " adding some zeros to the end " << ox_off + bit->second-ox_len << endl; bufferptr z = new buffer(ox_off + bit->second - ox_len); memset(z.c_str(), 0, z.length()); by_off[bit->first]->append( z ); } else { // we got none of this bx. zero whole thing. assert(ox_off >= ox_len); - dout(10) << " adding all zeros for this bit " << bit->second << endl; + dout(21) << " adding all zeros for this bit " << bit->second << endl; bufferptr z = new buffer(bit->second); assert(z.length() == bit->second); memset(z.c_str(), 0, z.length()); @@ -227,10 +227,10 @@ Filer::handle_osd_read_reply(MOSDOpReply *m) it++) { assert(it->second->length()); if (it->first < p->bytes_read) { - dout(10) << " concat buffer frag off " << it->first << " len " << it->second->length() << endl; + dout(21) << " concat buffer frag off " << it->first << " len " << it->second->length() << endl; p->read_result->claim_append(*(it->second)); } else { - dout(10) << " NO concat zero buffer frag off " << it->first << " len " << it->second->length() << endl; + dout(21) << " NO concat zero buffer frag off " << it->first << " len " << it->second->length() << endl; } delete it->second; } diff --git a/ceph/tcpfuse.cc b/ceph/tcpfuse.cc index a727a2d77c5f0..954815b4e4ee8 100644 --- a/ceph/tcpfuse.cc +++ b/ceph/tcpfuse.cc @@ -16,152 +16,94 @@ using namespace std; #include "msg/TCPMessenger.h" #include "common/Timer.h" + +#include -#define NUMMDS g_conf.num_mds -#define NUMOSD g_conf.num_osd -#define NUMCLIENT g_conf.num_client -class C_Test : public Context { -public: - void finish(int r) { - cout << "C_Test->finish(" << r << ")" << endl; - } -}; - - -int main(int oargc, char **oargv) { +int main(int argc, char **argv, char *envp[]) { //cerr << "tcpfuse starting " << myrank << "/" << world << endl; - int argc; - char **argv; - parse_config_options(oargc, oargv, - argc, argv); - - int start = 0; - - // build new argc+argv for fuse - typedef char* pchar; - int nargc = 0; - char **nargv = new pchar[argc]; - nargv[nargc++] = argv[0]; + vector args; + argv_to_vec(argc, argv, args); + parse_config_options(args); int mkfs = 0; - for (int i=1; i nargs; + + char *nsaddr = 0; + + for (unsigned i=0; iinit(); - } - - for (int i=0; iinit(); - } - - // create client - for (int i=0; iinit(); - - // start up fuse - // use my argc, argv (make sure you pass a mount point!) - cout << "mounting" << endl; - client[i]->mount(mkfs); - - cout << "starting fuse on rank " << myrank << " pid " << getpid() << endl; - ceph_fuse_main(client[i], nargc, nargv); - cout << "fuse finished on rank " << myrank << " pid " << getpid() << endl; - } - for (int i=0; iunmount(); - cout << "unmounted" << endl; - client[i]->shutdown(); - } - - - // wait for it to finish - tcpmessenger_wait(); + // args for fuse + args = nargs; + vec_to_argv(args, argc, argv); - } else { - cerr << "IDLE rank " << myrank << endl; - } - tcpmessenger_shutdown(); // shutdown MPI + // start up tcpmessenger + tcpmessenger_init(); + tcpmessenger_start(); + tcpmessenger_start_rankserver(nsa); - // cleanup - for (int i=0; iinit(); + + // start up fuse + // use my argc, argv (make sure you pass a mount point!) + cout << "mounting" << endl; + client->mount(mkfs); + + cout << "starting fuse on pid " << getpid() << endl; + ceph_fuse_main(client, argc, argv); + cout << "fuse finished on pid " << getpid() << endl; + client->unmount(); + cout << "unmounted" << endl; + client->shutdown(); + + delete client; + + // wait for it to finish + tcpmessenger_wait(); + tcpmessenger_shutdown(); // shutdown MPI + return 0; } diff --git a/ceph/tcpsyn.cc b/ceph/tcpsyn.cc index 92d71295fac26..db10013e56758 100644 --- a/ceph/tcpsyn.cc +++ b/ceph/tcpsyn.cc @@ -28,103 +28,50 @@ public: }; -int main(int oargc, char **oargv) { +#include "msg/mpistarter.cc" - //cerr << "tcpsyn starting " << myrank << "/" << world << endl; - int argc; - char **argv; - parse_config_options(oargc, oargv, - argc, argv); - int start = 0; +int main(int argc, char **argv) +{ + vector args; + argv_to_vec(argc, argv, args); + parse_config_options(args); + parse_syn_options(args); - // build new argc+argv for fuse - typedef char* pchar; - int nargc = 0; - char **nargv = new pchar[argc]; - nargv[nargc++] = argv[0]; - - list syn_modes; - list syn_iargs; - list syn_sargs; - int mkfs = 0; - for (int i=1; i nargs; + for (unsigned i=0; i mpiwho = mpi_bootstrap_tcp(argc, argv); + int myrank = mpiwho.first; + int world = mpiwho.second; if (myrank == 0) cerr << "nummds " << NUMMDS << " numosd " << NUMOSD << " numclient " << NUMCLIENT << endl; - assert(NUMMDS + NUMOSD + 1 <= world); + assert(NUMMDS + NUMOSD + (NUMCLIENT?1:0) <= world); MDCluster *mdc = new MDCluster(NUMMDS, NUMOSD); @@ -133,107 +80,99 @@ int main(int oargc, char **oargv) { gethostname(hostname,100); int pid = getpid(); + int started = 0; + // create mds MDS *mds[NUMMDS]; for (int i=0; iinit(); + started++; } // create osd OSD *osd[NUMOSD]; for (int i=0; iinit(); + started++; } // create client + int client_nodes = world - NUMMDS - NUMOSD; + int clients_per_node = 1; + if (NUMCLIENT) clients_per_node = (NUMCLIENT-1) / client_nodes + 1; set clientlist; Client *client[NUMCLIENT]; SyntheticClient *syn[NUMCLIENT]; for (int i=0; iinit(); + client[i] = new Client(new TCPMessenger(MSG_ADDR_CLIENT(i)) ); + + // logger? + if (client_logger == 0) { + char s[80]; + sprintf(s,"clnode.%d", myrank); + client_logger = new Logger(s, &client_logtype); } + + client[i]->init(); + started++; + } + + int nclients = 0; + for (set::iterator it = clientlist.begin(); + it != clientlist.end(); + it++) { + int i = *it; + // use my argc, argv (make sure you pass a mount point!) + //cout << "mounting" << endl; + client[i]->mount(mkfs); - for (int i=0; iinit(); - } + //cout << "starting synthetic client on rank " << myrank << endl; + syn[i] = new SyntheticClient(client[i]); - // create client - int nclients = 0; - for (int i=0; iinit(); - - // use my argc, argv (make sure you pass a mount point!) - //cout << "mounting" << endl; - client[i]->mount(mkfs); - - //cout << "starting synthetic client on rank " << myrank << endl; - syn[i] = new SyntheticClient(client[i]); - - syn[i]->modes = syn_modes; - syn[i]->sargs = syn_sargs; - syn[i]->iargs = syn_iargs; - syn[i]->start_thread(); - - nclients++; - } - if (nclients) { - cout << "waiting for " << nclients << " clients to finish" << endl; - } - for (int i=0; ijoin_thread(); - delete syn[i]; - - client[i]->unmount(); - //cout << "client" << i << " unmounted" << endl; - client[i]->shutdown(); - } + syn[i]->start_thread(); - - // wait for it to finish - tcpmessenger_wait(); + nclients++; + } + if (nclients) { + cerr << "waiting for " << nclients << " clients to finish" << endl; + } + + for (set::iterator it = clientlist.begin(); + it != clientlist.end(); + it++) { + int i = *it; - //assert(0); - } else { - cerr << "IDLE rank " << myrank << endl; + // cout << "waiting for synthetic client" << i << " to finish" << endl; + syn[i]->join_thread(); + delete syn[i]; + + client[i]->unmount(); + //cout << "client" << i << " unmounted" << endl; + client[i]->shutdown(); + } + + + if (!started) { + dout(1) << "IDLE" << endl; + tcpmessenger_stop_rankserver(); } + // wait for everything to finish + tcpmessenger_wait(); + tcpmessenger_shutdown(); + + /* // cleanup for (int i=0; i