# real bits
+mkmonmap: mkmonmap.cc common.o
+ ${CC} ${CFLAGS} ${LIBS} $^ -o $@
+
cmon: cmon.cc mon.o ebofs.o msg/SimpleMessenger.o common.o
${CC} ${CFLAGS} ${LIBS} $^ -o $@
vec_to_argv(args, argc, argv);
// load monmap
- bufferlist bl;
- int fd = ::open(".ceph_monmap", O_RDONLY);
- assert(fd >= 0);
- struct stat st;
- ::fstat(fd, &st);
- bufferptr bp(st.st_size);
- bl.append(bp);
- ::read(fd, (void*)bl.c_str(), bl.length());
- ::close(fd);
-
- MonMap *monmap = new MonMap;
- monmap->decode(bl);
+ MonMap monmap;
+ int r = monmap.read(".ceph_monmap");
+ assert(r >= 0);
// start up network
rank.start_rank();
// start client
- Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), monmap);
+ Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
client->init();
// start up fuse
// load monmap
- bufferlist bl;
- int fd = ::open(".ceph_monmap", O_RDONLY);
- assert(fd >= 0);
- struct stat st;
- ::fstat(fd, &st);
- bufferptr bp(st.st_size);
- bl.append(bp);
- ::read(fd, (void*)bl.c_str(), bl.length());
- ::close(fd);
-
- MonMap *monmap = new MonMap;
- monmap->decode(bl);
+ MonMap monmap;
+ int r = monmap.read(".ceph_monmap");
+ assert(r >= 0);
// start up network
rank.start_rank();
Messenger *m = rank.register_entity(MSG_ADDR_MDS_NEW);
assert(m);
- MDS *mds = new MDS(m->get_myaddr().num(), m, monmap);
+ MDS *mds = new MDS(m->get_myaddr().num(), m, &monmap);
mds->init();
// wait
if (g_conf.debug_after)
g_timer.add_event_after(g_conf.debug_after, new C_Debug);
- // let's assume a standalone monitor
- // FIXME: we want to start a cluster, eventually.
- cout << "starting standalone mon0" << endl;
-
- // start messenger
- rank.start_rank();
-
- // create monmap
- MonMap *monmap = new MonMap(1);
- monmap->mon_inst[0] = rank.my_inst;
-
- cout << "bound to " << rank.get_listen_addr() << endl;
+ // args
+ int whoami = -1;
+ char *monmap_fn = ".ceph_monmap";
+ for (unsigned i=0; i<args.size(); i++) {
+ if (strcmp(args[i], "--mon") == 0)
+ whoami = atoi(args[++i]);
+ else if (strcmp(args[i], "--monmap") == 0)
+ monmap_fn = args[++i];
+ else {
+ cerr << "unrecognized arg " << args[i] << endl;
+ return -1;
+ }
+ }
+ MonMap monmap;
+
+ if (whoami < 0) {
+ // let's assume a standalone monitor
+ cout << "starting standalone mon0" << endl;
+ whoami = 0;
+
+ // start messenger
+ rank.start_rank();
+ cout << "bound to " << rank.get_listen_addr() << endl;
+
+ // add single mon0
+ monmap.add_mon(rank.my_inst);
+
+ // write monmap
+ cout << "writing monmap to " << monmap_fn << endl;;
+ int r = monmap.write(monmap_fn);
+ assert(r >= 0);
+ } else {
+ // i am specific monitor.
+
+ // read monmap
+ cout << "reading monmap from .ceph_monmap" << endl;
+ int r = monmap.read(monmap_fn);
+ assert(r >= 0);
+
+ // bind to a specific port
+ cout << "starting mon" << whoami << " at " << monmap.get_inst(whoami) << endl;
+ tcpaddr_t addr = monmap.get_inst(whoami).addr;
+ rank.set_listen_addr(addr);
+ rank.start_rank();
+ }
+
// start monitor
- Messenger *m = rank.register_entity(MSG_ADDR_MON(0));
- Monitor *mon = new Monitor(0, m, monmap);
+ Messenger *m = rank.register_entity(MSG_ADDR_MON(whoami));
+ Monitor *mon = new Monitor(whoami, m, &monmap);
mon->init();
- // write monmap
- cout << "writing monmap to .ceph_monmap" << endl;
- bufferlist bl;
- monmap->encode(bl);
- int fd = ::open(".ceph_monmap", O_RDWR|O_CREAT);
- assert(fd >= 0);
- ::fchmod(fd, 0644);
- ::write(fd, (void*)bl.c_str(), bl.length());
- ::close(fd);
-
// wait
cout << "waiting for shutdown ..." << endl;
rank.wait();
}
// load monmap
- bufferlist bl;
- int fd = ::open(".ceph_monmap", O_RDONLY);
- assert(fd >= 0);
- struct stat st;
- ::fstat(fd, &st);
- bufferptr bp(st.st_size);
- bl.append(bp);
- ::read(fd, (void*)bl.c_str(), bl.length());
- ::close(fd);
-
- MonMap *monmap = new MonMap;
- monmap->decode(bl);
+ MonMap monmap;
+ int r = monmap.read(".ceph_monmap");
+ assert(r >= 0);
// start up network
rank.start_rank();
// start osd
Messenger *m = rank.register_entity(MSG_ADDR_OSD(whoami));
assert(m);
- OSD *osd = new OSD(whoami, m, monmap, dev);
+ OSD *osd = new OSD(whoami, m, &monmap, dev);
osd->init();
// wait
vec_to_argv(args, argc, argv);
// load monmap
- bufferlist bl;
- int fd = ::open(".ceph_monmap", O_RDONLY);
- assert(fd >= 0);
- struct stat st;
- ::fstat(fd, &st);
- bufferptr bp(st.st_size);
- bl.append(bp);
- ::read(fd, (void*)bl.c_str(), bl.length());
- ::close(fd);
-
- MonMap *monmap = new MonMap;
- monmap->decode(bl);
+ MonMap monmap;
+ int r = monmap.read(".ceph_monmap");
+ assert(r >= 0);
// start up network
rank.start_rank();
// start client
- Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), monmap);
+ Client *client = new Client(rank.register_entity(MSG_ADDR_CLIENT_NEW), &monmap);
client->init();
// start syntheticclient
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
+#include <sys/stat.h>
+#include <iostream>
+#include <string>
+using namespace std;
+
+#include "config.h"
+
+#include "mon/MonMap.h"
+
+
+bool parse_ip_port(const char *s, tcpaddr_t& tcpaddr)
+{
+ unsigned char addr[4];
+ int port = 0;
+
+ int count = 0; // digit count
+
+ while (1) {
+ // parse the #.
+ int val = 0;
+ int numdigits = 0;
+
+ while (*s >= '0' && *s <= '9') {
+ int digit = *s - '0';
+ //cout << "digit " << digit << endl;
+ val *= 10;
+ val += digit;
+ numdigits++;
+ s++;
+ }
+ //cout << "val " << val << endl;
+
+ if (numdigits == 0) return false; // no digits
+ if (count < 3 && *s != '.') return false; // should have 3 periods
+ if (count == 3 && *s != ':') return false; // then a colon
+ s++;
+
+ if (count <= 3)
+ addr[count] = val;
+ else
+ port = val;
+
+ count++;
+ if (count == 5) break;
+ }
+
+ // copy into inst
+ memcpy((char*)&tcpaddr.sin_addr.s_addr, (char*)addr, 4);
+ tcpaddr.sin_port = port;
+
+ return true;
+}
+
+
+int main(int argc, char **argv)
+{
+ vector<char*> args;
+ argv_to_vec(argc, argv, args);
+
+ MonMap monmap;
+
+ char *outfn = ".ceph_monmap";
+
+ for (unsigned i=0; i<args.size(); i++) {
+ if (strcmp(args[i], "--out") == 0)
+ outfn = args[++i];
+ else {
+ // parse ip:port
+ tcpaddr_t addr;
+ if (!parse_ip_port(args[i], addr)) {
+ cerr << "mkmonmap: invalid ip:port '" << args[i] << "'" << endl;
+ return -1;
+ }
+ entity_inst_t inst;
+ inst.set_addr(addr);
+ cout << "mkmonmap: mon" << monmap.num_mon << " " << inst << endl;
+ monmap.add_mon(inst);
+ }
+ }
+
+ if (monmap.num_mon == 0) {
+ cerr << "usage: mkmonmap ip:port [...]" << endl;
+ return -1;
+ }
+
+ // write it out
+ cout << "mkmonmap: writing monmap to " << outfn << " (" << monmap.num_mon << " monitors)" << endl;
+ int r = monmap.write(outfn);
+ assert(r >= 0);
+
+ return 0;
+}
#ifndef __MONMAP_H
#define __MONMAP_H
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <fcntl.h>
+
#include "msg/Message.h"
#include "include/types.h"
MonMap(int s=0) : epoch(0), num_mon(s), mon_inst(s), last_mon(-1) {}
+ void add_mon(entity_inst_t inst) {
+ mon_inst.push_back(inst);
+ num_mon++;
+ }
+
// pick a mon.
// choice should be stable, unless we explicitly ask for a new one.
int pick_mon(bool newmon=false) {
_decode(mon_inst, blist, off);
}
+ int write(char *fn) {
+ // encode
+ bufferlist bl;
+ encode(bl);
+
+ // write
+ int fd = ::open(fn, O_RDWR|O_CREAT);
+ if (fd < 0) return fd;
+ ::fchmod(fd, 0644);
+ ::write(fd, (void*)bl.c_str(), bl.length());
+ ::close(fd);
+ return 0;
+ }
+
+ int read(char *fn) {
+ // read
+ bufferlist bl;
+ int fd = ::open(fn, O_RDONLY);
+ if (fd < 0) return fd;
+ struct stat st;
+ ::fstat(fd, &st);
+ bufferptr bp(st.st_size);
+ bl.append(bp);
+ ::read(fd, (void*)bl.c_str(), bl.length());
+ ::close(fd);
+
+ // decode
+ decode(bl);
+ return 0;
+ }
+
};
#endif
assert(listen_sd > 0);
/* bind to port */
- memset((char*)&listen_addr, 0, sizeof(listen_addr));
- listen_addr.sin_family = AF_INET;
- listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
- listen_addr.sin_port = 0;
-
- int rc = bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr));
+ int rc = bind(listen_sd, (struct sockaddr *) &rank.listen_addr, sizeof(rank.listen_addr));
+ if (rc < 0)
+ derr(0) << "accepter.start unable to bind to " << rank.listen_addr << endl;
assert(rc >= 0);
- socklen_t llen = sizeof(listen_addr);
- getsockname(listen_sd, (sockaddr*)&listen_addr, &llen);
+ socklen_t llen = sizeof(rank.listen_addr);
+ getsockname(listen_sd, (sockaddr*)&rank.listen_addr, &llen);
- int myport = listen_addr.sin_port;
+ int myport = rank.listen_addr.sin_port;
// listen!
rc = ::listen(listen_sd, 1000);
myhostname->h_length);
my_addr.sin_port = myport;
- listen_addr = my_addr;
+ rank.listen_addr = my_addr;
- dout(10) << "accepter.start listen addr is " << listen_addr << endl;
+ dout(10) << "accepter.start listen addr is " << rank.listen_addr << endl;
// start thread
create();
Rank::Rank() :
single_dispatcher(this) {
+ // default to any listen_addr
+ memset((char*)&listen_addr, 0, sizeof(listen_addr));
+ listen_addr.sin_family = AF_INET;
+ listen_addr.sin_addr.s_addr = htonl(INADDR_ANY);
+ listen_addr.sin_port = 0;
}
Rank::~Rank()
{
}
+void Rank::set_listen_addr(tcpaddr_t& a)
+{
+ dout(10) << "set_listen_addr " << a << endl;
+ memcpy((char*)&listen_addr.sin_addr.s_addr, (char*)&a.sin_addr.s_addr, 4);
+ listen_addr.sin_port = a.sin_port;
+}
+
void Rank::_submit_single_dispatch(Message *m)
{
lock.Lock();
// my_inst
- my_inst.set_addr( accepter.listen_addr );
+ my_inst.set_addr( listen_addr );
dout(1) << "start_rank at " << my_inst << endl;
-int Rank::find_ns_addr(tcpaddr_t &nsa)
-{
- // file?
- int fd = ::open(".ceph_ns",O_RDONLY);
- if (fd > 0) {
- ::read(fd, (void*)&nsa, sizeof(nsa));
- ::close(fd);
- cout << "ceph ns is " << nsa << endl;
- return 0;
- }
-
- // env var?
- char *nsaddr = getenv("CEPH_NAMESERVER");////envz_entry(*envp, e_len, "CEPH_NAMESERVER");
- if (nsaddr) {
- while (nsaddr[0] != '=') nsaddr++;
- nsaddr++;
-
- if (tcp_hostlookup(nsaddr, nsa) < 0) {
- cout << "can't resolve " << nsaddr << endl;
- return -1;
- }
-
- cout << "ceph ns is " << nsa << endl;
- return 0;
- }
-
- cerr << "i can't find ceph ns addr in .ceph_ns or CEPH_NAMESERVER" << endl;
- return -1;
-}
public:
bool done;
- tcpaddr_t listen_addr;
int listen_sd;
Accepter() : done(false) {}
Mutex lock;
Cond wait_cond; // for wait()
+ // where i listen
+ tcpaddr_t listen_addr;
+
// my instance
entity_inst_t my_inst;
void mark_down(msg_addr_t addr, entity_inst_t& i);
void mark_up(msg_addr_t addr, entity_inst_t& i);
- tcpaddr_t get_listen_addr() { return accepter.listen_addr; }
+ tcpaddr_t get_listen_addr() { return listen_addr; }
void reaper();
Rank();
~Rank();
- int find_ns_addr(tcpaddr_t &tcpaddr);
+ void set_listen_addr(tcpaddr_t& a);
int start_rank();
void wait();
bufferlist bl;
if (mpi_rank == 0) {
monmap->encode(bl);
-
- int fd = ::open(".ceph_monmap", O_WRONLY|O_CREAT);
- ::write(fd, (void*)bl.c_str(), bl.length());
- ::fchmod(fd, 0644);
- ::close(fd);
-
+ monmap->write(".ceph_monmap");
} else {
int l = g_conf.num_mon * 1000; // nice'n big.
bufferptr bp(l);