From 831435975fb94764a16af6243e37a91d64bab03e Mon Sep 17 00:00:00 2001 From: sageweil Date: Sun, 28 Jan 2007 03:32:03 +0000 Subject: [PATCH] merged trunk changes r1038:1046 into branches/sage/cephmds2 (standalone are clustered cmon startup; mkmonmap) git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1047 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/Makefile | 3 + branches/sage/cephmds2/TODO | 6 + branches/sage/cephmds2/cfuse.cc | 17 +-- branches/sage/cephmds2/cmds.cc | 17 +-- branches/sage/cephmds2/cmon.cc | 72 ++++++++---- branches/sage/cephmds2/cosd.cc | 17 +-- branches/sage/cephmds2/csyn.cc | 17 +-- branches/sage/cephmds2/mkmonmap.cc | 109 ++++++++++++++++++ branches/sage/cephmds2/mon/MonMap.h | 40 +++++++ branches/sage/cephmds2/msg/SimpleMessenger.cc | 62 ++++------ branches/sage/cephmds2/msg/SimpleMessenger.h | 8 +- branches/sage/cephmds2/newsyn.cc | 7 +- 12 files changed, 249 insertions(+), 126 deletions(-) create mode 100644 branches/sage/cephmds2/mkmonmap.cc diff --git a/branches/sage/cephmds2/Makefile b/branches/sage/cephmds2/Makefile index fd2be4363c778..60ef16607b12e 100644 --- a/branches/sage/cephmds2/Makefile +++ b/branches/sage/cephmds2/Makefile @@ -93,6 +93,9 @@ obfs: depend obfstest # real bits +mkmonmap: mkmonmap.cc common.o + ${CC} ${CFLAGS} ${LIBS} $^ -o $@ + cmon: cmon.cc mon.o ebofs.o msg/SimpleMessenger.o common.o ${CC} ${CFLAGS} ${LIBS} $^ -o $@ diff --git a/branches/sage/cephmds2/TODO b/branches/sage/cephmds2/TODO index 47a6a127a6992..3bb6c72e4f6f8 100644 --- a/branches/sage/cephmds2/TODO +++ b/branches/sage/cephmds2/TODO @@ -1,3 +1,9 @@ +- make CDIR_AUTH_UNKNOWN values _PARENT when possible +- starting -> (replay + rejoin) states +- bcast import map on rejoin +- ... +- merge in cmon stuff from branch + blech: - EMetablob should return 'expired' if they have higher versions (and are thus described by a newer journal entry) diff --git a/branches/sage/cephmds2/cfuse.cc b/branches/sage/cephmds2/cfuse.cc index 95567084348a9..33fe40bc67322 100644 --- a/branches/sage/cephmds2/cfuse.cc +++ b/branches/sage/cephmds2/cfuse.cc @@ -42,24 +42,15 @@ int main(int argc, char **argv, char *envp[]) { vec_to_argv(args, argc, argv); // load monmap - bufferlist bl; - int fd = ::open(".ceph_monmap", O_RDONLY); - assert(fd >= 0); - struct stat st; - ::fstat(fd, &st); - bufferptr bp(st.st_size); - bl.append(bp); - ::read(fd, (void*)bl.c_str(), bl.length()); - ::close(fd); - - MonMap *monmap = new MonMap; - monmap->decode(bl); + MonMap monmap; + int r = monmap.read(".ceph_monmap"); + assert(r >= 0); // start up network rank.start_rank(); // start client - Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), monmap); + Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap); client->init(); // start up fuse diff --git a/branches/sage/cephmds2/cmds.cc b/branches/sage/cephmds2/cmds.cc index fee59512207e9..886cba3758d36 100644 --- a/branches/sage/cephmds2/cmds.cc +++ b/branches/sage/cephmds2/cmds.cc @@ -73,18 +73,9 @@ int main(int argc, char **argv) // load monmap - bufferlist bl; - int fd = ::open(".ceph_monmap", O_RDONLY); - assert(fd >= 0); - struct stat st; - ::fstat(fd, &st); - bufferptr bp(st.st_size); - bl.append(bp); - ::read(fd, (void*)bl.c_str(), bl.length()); - ::close(fd); - - MonMap *monmap = new MonMap; - monmap->decode(bl); + MonMap monmap; + int r = monmap.read(".ceph_monmap"); + assert(r >= 0); // start up network rank.start_rank(); @@ -93,7 +84,7 @@ int main(int argc, char **argv) Messenger *m = rank.register_entity(MSG_ADDR_MDS_NEW); assert(m); - MDS *mds = new MDS(m->get_myaddr().num(), m, monmap); + MDS *mds = new MDS(m->get_myaddr().num(), m, &monmap); mds->init(standby); // wait diff --git a/branches/sage/cephmds2/cmon.cc b/branches/sage/cephmds2/cmon.cc index 7d915d9c3ef58..690a3746b3f13 100644 --- a/branches/sage/cephmds2/cmon.cc +++ b/branches/sage/cephmds2/cmon.cc @@ -60,34 +60,58 @@ int main(int argc, char **argv) if (g_conf.debug_after) g_timer.add_event_after(g_conf.debug_after, new C_Debug); - // let's assume a standalone monitor - // FIXME: we want to start a cluster, eventually. - cout << "starting standalone mon0" << endl; - - // start messenger - rank.start_rank(); - - // create monmap - MonMap *monmap = new MonMap(1); - monmap->mon_inst[0] = rank.my_inst; - - cout << "bound to " << rank.get_listen_addr() << endl; + // args + int whoami = -1; + char *monmap_fn = ".ceph_monmap"; + for (unsigned i=0; i= 0); + } else { + // i am specific monitor. + + // read monmap + cout << "reading monmap from .ceph_monmap" << endl; + int r = monmap.read(monmap_fn); + assert(r >= 0); + + // bind to a specific port + cout << "starting mon" << whoami << " at " << monmap.get_inst(whoami) << endl; + tcpaddr_t addr = monmap.get_inst(whoami).addr; + rank.set_listen_addr(addr); + rank.start_rank(); + } + // start monitor - Messenger *m = rank.register_entity(MSG_ADDR_MON(0)); - Monitor *mon = new Monitor(0, m, monmap); + Messenger *m = rank.register_entity(MSG_ADDR_MON(whoami)); + Monitor *mon = new Monitor(whoami, m, &monmap); mon->init(); - // write monmap - cout << "writing monmap to .ceph_monmap" << endl; - bufferlist bl; - monmap->encode(bl); - int fd = ::open(".ceph_monmap", O_RDWR|O_CREAT); - assert(fd >= 0); - ::fchmod(fd, 0644); - ::write(fd, (void*)bl.c_str(), bl.length()); - ::close(fd); - // wait cout << "waiting for shutdown ..." << endl; rank.wait(); diff --git a/branches/sage/cephmds2/cosd.cc b/branches/sage/cephmds2/cosd.cc index c390faaca3e49..93d14348996df 100644 --- a/branches/sage/cephmds2/cosd.cc +++ b/branches/sage/cephmds2/cosd.cc @@ -100,18 +100,9 @@ int main(int argc, char **argv) } // load monmap - bufferlist bl; - int fd = ::open(".ceph_monmap", O_RDONLY); - assert(fd >= 0); - struct stat st; - ::fstat(fd, &st); - bufferptr bp(st.st_size); - bl.append(bp); - ::read(fd, (void*)bl.c_str(), bl.length()); - ::close(fd); - - MonMap *monmap = new MonMap; - monmap->decode(bl); + MonMap monmap; + int r = monmap.read(".ceph_monmap"); + assert(r >= 0); // start up network rank.start_rank(); @@ -119,7 +110,7 @@ int main(int argc, char **argv) // start osd Messenger *m = rank.register_entity(MSG_ADDR_OSD(whoami)); assert(m); - OSD *osd = new OSD(whoami, m, monmap, dev); + OSD *osd = new OSD(whoami, m, &monmap, dev); osd->init(); // wait diff --git a/branches/sage/cephmds2/csyn.cc b/branches/sage/cephmds2/csyn.cc index 75da5487b092d..9b7a58f5de5e5 100644 --- a/branches/sage/cephmds2/csyn.cc +++ b/branches/sage/cephmds2/csyn.cc @@ -44,24 +44,15 @@ int main(int argc, char **argv, char *envp[]) { vec_to_argv(args, argc, argv); // load monmap - bufferlist bl; - int fd = ::open(".ceph_monmap", O_RDONLY); - assert(fd >= 0); - struct stat st; - ::fstat(fd, &st); - bufferptr bp(st.st_size); - bl.append(bp); - ::read(fd, (void*)bl.c_str(), bl.length()); - ::close(fd); - - MonMap *monmap = new MonMap; - monmap->decode(bl); + MonMap monmap; + int r = monmap.read(".ceph_monmap"); + assert(r >= 0); // start up network rank.start_rank(); // start client - Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), monmap); + Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap); client->init(); // start syntheticclient diff --git a/branches/sage/cephmds2/mkmonmap.cc b/branches/sage/cephmds2/mkmonmap.cc new file mode 100644 index 0000000000000..6d049f4bd7186 --- /dev/null +++ b/branches/sage/cephmds2/mkmonmap.cc @@ -0,0 +1,109 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include +#include + +#include +#include +#include +using namespace std; + +#include "config.h" + +#include "mon/MonMap.h" + + +bool parse_ip_port(const char *s, tcpaddr_t& tcpaddr) +{ + unsigned char addr[4]; + int port = 0; + + int count = 0; // digit count + + while (1) { + // parse the #. + int val = 0; + int numdigits = 0; + + while (*s >= '0' && *s <= '9') { + int digit = *s - '0'; + //cout << "digit " << digit << endl; + val *= 10; + val += digit; + numdigits++; + s++; + } + //cout << "val " << val << endl; + + if (numdigits == 0) return false; // no digits + if (count < 3 && *s != '.') return false; // should have 3 periods + if (count == 3 && *s != ':') return false; // then a colon + s++; + + if (count <= 3) + addr[count] = val; + else + port = val; + + count++; + if (count == 5) break; + } + + // copy into inst + memcpy((char*)&tcpaddr.sin_addr.s_addr, (char*)addr, 4); + tcpaddr.sin_port = port; + + return true; +} + + +int main(int argc, char **argv) +{ + vector args; + argv_to_vec(argc, argv, args); + + MonMap monmap; + + char *outfn = ".ceph_monmap"; + + for (unsigned i=0; i= 0); + + return 0; +} diff --git a/branches/sage/cephmds2/mon/MonMap.h b/branches/sage/cephmds2/mon/MonMap.h index e72946d76cf06..b56ffb4a8549a 100644 --- a/branches/sage/cephmds2/mon/MonMap.h +++ b/branches/sage/cephmds2/mon/MonMap.h @@ -14,6 +14,10 @@ #ifndef __MONMAP_H #define __MONMAP_H +#include +#include +#include + #include "msg/Message.h" #include "include/types.h" @@ -27,6 +31,11 @@ class MonMap { MonMap(int s=0) : epoch(0), num_mon(s), mon_inst(s), last_mon(-1) {} + void add_mon(entity_inst_t inst) { + mon_inst.push_back(inst); + num_mon++; + } + // pick a mon. // choice should be stable, unless we explicitly ask for a new one. int pick_mon(bool newmon=false) { @@ -58,6 +67,37 @@ class MonMap { _decode(mon_inst, blist, off); } + int write(char *fn) { + // encode + bufferlist bl; + encode(bl); + + // write + int fd = ::open(fn, O_RDWR|O_CREAT); + if (fd < 0) return fd; + ::fchmod(fd, 0644); + ::write(fd, (void*)bl.c_str(), bl.length()); + ::close(fd); + return 0; + } + + int read(char *fn) { + // read + bufferlist bl; + int fd = ::open(fn, O_RDONLY); + if (fd < 0) return fd; + struct stat st; + ::fstat(fd, &st); + bufferptr bp(st.st_size); + bl.append(bp); + ::read(fd, (void*)bl.c_str(), bl.length()); + ::close(fd); + + // decode + decode(bl); + return 0; + } + }; #endif diff --git a/branches/sage/cephmds2/msg/SimpleMessenger.cc b/branches/sage/cephmds2/msg/SimpleMessenger.cc index 9a6ae8f558fe2..f6377e828c77d 100644 --- a/branches/sage/cephmds2/msg/SimpleMessenger.cc +++ b/branches/sage/cephmds2/msg/SimpleMessenger.cc @@ -61,18 +61,15 @@ int Rank::Accepter::start() assert(listen_sd > 0); /* bind to port */ - memset((char*)&listen_addr, 0, sizeof(listen_addr)); - listen_addr.sin_family = AF_INET; - listen_addr.sin_addr.s_addr = htonl(INADDR_ANY); - listen_addr.sin_port = 0; - - int rc = bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr)); + int rc = bind(listen_sd, (struct sockaddr *) &rank.listen_addr, sizeof(rank.listen_addr)); + if (rc < 0) + derr(0) << "accepter.start unable to bind to " << rank.listen_addr << endl; assert(rc >= 0); - socklen_t llen = sizeof(listen_addr); - getsockname(listen_sd, (sockaddr*)&listen_addr, &llen); + socklen_t llen = sizeof(rank.listen_addr); + getsockname(listen_sd, (sockaddr*)&rank.listen_addr, &llen); - int myport = listen_addr.sin_port; + int myport = rank.listen_addr.sin_port; // listen! rc = ::listen(listen_sd, 1000); @@ -97,9 +94,9 @@ int Rank::Accepter::start() myhostname->h_length); my_addr.sin_port = myport; - listen_addr = my_addr; + rank.listen_addr = my_addr; - dout(10) << "accepter.start listen addr is " << listen_addr << endl; + dout(10) << "accepter.start listen addr is " << rank.listen_addr << endl; // start thread create(); @@ -671,11 +668,23 @@ void Rank::Pipe::fail(list& out) Rank::Rank() : single_dispatcher(this) { + // default to any listen_addr + memset((char*)&listen_addr, 0, sizeof(listen_addr)); + listen_addr.sin_family = AF_INET; + listen_addr.sin_addr.s_addr = htonl(INADDR_ANY); + listen_addr.sin_port = 0; } Rank::~Rank() { } +void Rank::set_listen_addr(tcpaddr_t& a) +{ + dout(10) << "set_listen_addr " << a << endl; + memcpy((char*)&listen_addr.sin_addr.s_addr, (char*)&a.sin_addr.s_addr, 4); + listen_addr.sin_port = a.sin_port; +} + void Rank::_submit_single_dispatch(Message *m) { @@ -762,7 +771,7 @@ int Rank::start_rank() lock.Lock(); // my_inst - my_inst.set_addr( accepter.listen_addr ); + my_inst.set_addr( listen_addr ); dout(1) << "start_rank at " << my_inst << endl; @@ -972,35 +981,6 @@ void Rank::wait() -int Rank::find_ns_addr(tcpaddr_t &nsa) -{ - // file? - int fd = ::open(".ceph_ns",O_RDONLY); - if (fd > 0) { - ::read(fd, (void*)&nsa, sizeof(nsa)); - ::close(fd); - cout << "ceph ns is " << nsa << endl; - return 0; - } - - // env var? - char *nsaddr = getenv("CEPH_NAMESERVER");////envz_entry(*envp, e_len, "CEPH_NAMESERVER"); - if (nsaddr) { - while (nsaddr[0] != '=') nsaddr++; - nsaddr++; - - if (tcp_hostlookup(nsaddr, nsa) < 0) { - cout << "can't resolve " << nsaddr << endl; - return -1; - } - - cout << "ceph ns is " << nsa << endl; - return 0; - } - - cerr << "i can't find ceph ns addr in .ceph_ns or CEPH_NAMESERVER" << endl; - return -1; -} diff --git a/branches/sage/cephmds2/msg/SimpleMessenger.h b/branches/sage/cephmds2/msg/SimpleMessenger.h index 32e9db14582ef..070fb236b08fd 100644 --- a/branches/sage/cephmds2/msg/SimpleMessenger.h +++ b/branches/sage/cephmds2/msg/SimpleMessenger.h @@ -48,7 +48,6 @@ class Rank { public: bool done; - tcpaddr_t listen_addr; int listen_sd; Accepter() : done(false) {} @@ -234,6 +233,9 @@ class Rank { Mutex lock; Cond wait_cond; // for wait() + // where i listen + tcpaddr_t listen_addr; + // my instance entity_inst_t my_inst; @@ -257,7 +259,7 @@ class Rank { void mark_down(msg_addr_t addr, entity_inst_t& i); void mark_up(msg_addr_t addr, entity_inst_t& i); - tcpaddr_t get_listen_addr() { return accepter.listen_addr; } + tcpaddr_t get_listen_addr() { return listen_addr; } void reaper(); @@ -267,7 +269,7 @@ public: Rank(); ~Rank(); - int find_ns_addr(tcpaddr_t &tcpaddr); + void set_listen_addr(tcpaddr_t& a); int start_rank(); void wait(); diff --git a/branches/sage/cephmds2/newsyn.cc b/branches/sage/cephmds2/newsyn.cc index efce694dbf9a2..f4f6309a1aaa0 100644 --- a/branches/sage/cephmds2/newsyn.cc +++ b/branches/sage/cephmds2/newsyn.cc @@ -89,12 +89,7 @@ pair mpi_bootstrap_new(int& argc, char**& argv, MonMap *monmap) bufferlist bl; if (mpi_rank == 0) { monmap->encode(bl); - - int fd = ::open(".ceph_monmap", O_WRONLY|O_CREAT); - ::write(fd, (void*)bl.c_str(), bl.length()); - ::fchmod(fd, 0644); - ::close(fd); - + monmap->write(".ceph_monmap"); } else { int l = g_conf.num_mon * 1000; // nice'n big. bufferptr bp(l); -- 2.39.5