From e454a48d2dcc2b6fd3fce7ad870a3103dbf500e3 Mon Sep 17 00:00:00 2001 From: sageweil Date: Mon, 22 Oct 2007 16:57:58 +0000 Subject: [PATCH] fixed entity_addr to include rank in process, simplifying SimpleMessenger, eliminating client/mds startup naming ambiguity git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1976 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/TODO | 5 - trunk/ceph/config.cc | 15 +- trunk/ceph/config.h | 2 - trunk/ceph/include/ceph_fs.h | 20 ++- trunk/ceph/mon/MDSMonitor.cc | 11 +- trunk/ceph/msg/FakeMessenger.cc | 4 +- trunk/ceph/msg/SimpleMessenger.cc | 243 +++++++----------------------- trunk/ceph/msg/SimpleMessenger.h | 50 ++---- trunk/ceph/msg/msg_types.h | 3 +- trunk/ceph/newsyn.cc | 44 +++--- 10 files changed, 120 insertions(+), 277 deletions(-) diff --git a/trunk/ceph/TODO b/trunk/ceph/TODO index ed581ab0b350b..f6e6642014cf9 100644 --- a/trunk/ceph/TODO +++ b/trunk/ceph/TODO @@ -48,11 +48,6 @@ sage doc - metablob version semantics -mdsmon -- per-mds, shared standby queues - - - mds bugs - open file rejournaling vs capped log... - open files vs shutdown in general! need to export any caps on replicated metadata diff --git a/trunk/ceph/config.cc b/trunk/ceph/config.cc index f037fe728dfe4..a96071ed63fd4 100644 --- a/trunk/ceph/config.cc +++ b/trunk/ceph/config.cc @@ -158,8 +158,6 @@ md_config_t g_conf = { // --- messenger --- ms_tcp_nodelay: true, - ms_single_dispatch: false, - ms_requeue_on_sender_fail: false, ms_stripe_osds: false, ms_skip_rank0: false, @@ -167,15 +165,6 @@ md_config_t g_conf = { ms_die_on_failure: false, - /*tcp_skip_rank0: false, - tcp_overlay_clients: false, // over osds! - tcp_log: false, - tcp_serial_marshall: true, - tcp_serial_out: false, - tcp_multi_out: true, - tcp_multi_dispatch: false, // not fully implemented yet - */ - // --- mon --- mon_tick_interval: 5, mon_osd_down_out_interval: 5, // seconds @@ -186,7 +175,7 @@ md_config_t g_conf = { mon_accept_timeout: 10.0, // on leader, if paxos update isn't accepted mon_stop_on_last_unmount: false, mon_stop_with_last_mds: false, - mon_allow_mds_bully: true, // allow a booting mds to (forcibly) claim an mds # + mon_allow_mds_bully: false, // allow a booting mds to (forcibly) claim an mds # .. FIXME paxos_propose_interval: 1.0, // gather updates for this long before proposing a map update @@ -515,8 +504,6 @@ void parse_config_options(std::vector& args) else if (strcmp(args[i], "--numosd") == 0) g_conf.num_osd = atoi(args[++i]); - else if (strcmp(args[i], "--ms_single_dispatch") == 0) - g_conf.ms_single_dispatch = atoi(args[++i]); else if (strcmp(args[i], "--ms_stripe_osds") == 0) g_conf.ms_stripe_osds = true; else if (strcmp(args[i], "--ms_skip_rank0") == 0) diff --git a/trunk/ceph/config.h b/trunk/ceph/config.h index b5cdf6cbd586d..169887d2b23fe 100644 --- a/trunk/ceph/config.h +++ b/trunk/ceph/config.h @@ -113,8 +113,6 @@ struct md_config_t { */ bool ms_tcp_nodelay; - bool ms_single_dispatch; - bool ms_requeue_on_sender_fail; bool ms_stripe_osds; bool ms_skip_rank0; diff --git a/trunk/ceph/include/ceph_fs.h b/trunk/ceph/include/ceph_fs.h index ede0663a79158..fbe36ab801cfa 100644 --- a/trunk/ceph/include/ceph_fs.h +++ b/trunk/ceph/include/ceph_fs.h @@ -23,6 +23,10 @@ struct ceph_object { typedef struct ceph_object ceph_object_t; +struct ceph_timeval { + __u32 tv_sec; + __u32 tv_usec; +}; /** object layout @@ -136,13 +140,23 @@ struct ceph_entity_name { /* * entity_addr * ipv4 only for now + * 16 bytes. */ struct ceph_entity_addr { - __u64 nonce; - __u32 port; - __u8 ipq[4]; + __u32 erank; /* entity's rank in process */ + __u32 nonce; /* unique id for process (e.g. pid) */ + __u32 port; /* ip port */ + __u8 ipq[4]; /* ipv4 addr quad */ }; +#define ceph_entity_addr_is_local(a,b) \ + ((a).nonce == (b).nonce && \ + (a).port == (b).port && \ + (a).ipq[0] == (b).ipq[0] && \ + (a).ipq[1] == (b).ipq[1] && \ + (a).ipq[2] == (b).ipq[2] && \ + (a).ipq[3] == (b).ipq[3]) + struct ceph_entity_inst { struct ceph_entity_name name; diff --git a/trunk/ceph/mon/MDSMonitor.cc b/trunk/ceph/mon/MDSMonitor.cc index 42edd63f13cc6..3b415232b8f14 100644 --- a/trunk/ceph/mon/MDSMonitor.cc +++ b/trunk/ceph/mon/MDSMonitor.cc @@ -112,7 +112,8 @@ bool MDSMonitor::update_from_paxos() for (map::iterator p = mdsmap.mds_inst.begin(); p != mdsmap.mds_inst.end(); ++p) - if (last_beacon.count(p->second.addr) == 0) + if (last_beacon.count(p->second.addr) == 0 && + mdsmap.get_state(p->first) != MDSMap::STATE_DNE) last_beacon[p->second.addr] = g_clock.now(); for (map::iterator p = mdsmap.standby.begin(); p != mdsmap.standby.end(); @@ -460,12 +461,6 @@ void MDSMonitor::take_over(entity_addr_t addr, int mds) pending_mdsmap.standby.erase(addr); pending_mdsmap.standby_for[mds].erase(addr); pending_mdsmap.standby_any.erase(addr); - - // send new map to old inst/name - entity_inst_t oldinst; - oldinst.name = entity_name_t::MDS(-2); - oldinst.addr = addr; - waiting_for_map.push_back(oldinst); } @@ -579,6 +574,7 @@ void MDSMonitor::tick() switch (pending_mdsmap.get_state(mds)) { case MDSMap::STATE_CREATING: newstate = MDSMap::STATE_DNE; // didn't finish creating + last_beacon.erase(addr); break; case MDSMap::STATE_STARTING: @@ -648,6 +644,7 @@ void MDSMonitor::do_stop() break; case MDSMap::STATE_CREATING: pending_mdsmap.mds_state[p->first] = MDSMap::STATE_DNE; + last_beacon.erase(pending_mdsmap.mds_inst[p->first].addr); break; case MDSMap::STATE_STARTING: pending_mdsmap.mds_state[p->first] = MDSMap::STATE_STOPPED; diff --git a/trunk/ceph/msg/FakeMessenger.cc b/trunk/ceph/msg/FakeMessenger.cc index ee80df3dc0626..21a7f9d0e12c9 100644 --- a/trunk/ceph/msg/FakeMessenger.cc +++ b/trunk/ceph/msg/FakeMessenger.cc @@ -273,8 +273,8 @@ FakeMessenger::FakeMessenger(entity_name_t me) : Messenger(me) { // assign rank _myinst.name = me; - _myinst.addr.v.port = nranks++; - //if (!me.is_mon()) + _myinst.addr.v.port = 0; + _myinst.addr.v.erank = nranks++; _myinst.addr.v.nonce = getpid(); // add to directory diff --git a/trunk/ceph/msg/SimpleMessenger.cc b/trunk/ceph/msg/SimpleMessenger.cc index 7e29f033d83b5..bc87b3968f86f 100644 --- a/trunk/ceph/msg/SimpleMessenger.cc +++ b/trunk/ceph/msg/SimpleMessenger.cc @@ -31,8 +31,8 @@ #include #include -#define dout(l) if (l<=g_conf.debug_ms) *_dout << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.my_addr << " " -#define derr(l) if (l<=g_conf.debug_ms) *_derr << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.my_addr << " " +#define dout(l) if (l<=g_conf.debug_ms) *_dout << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " " +#define derr(l) if (l<=g_conf.debug_ms) *_derr << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " " @@ -142,7 +142,7 @@ int Rank::Accepter::start() // figure out my_addr if (g_my_addr != entity_addr_t()) { // user specified it, easy peasy. - rank.my_addr = g_my_addr; + rank.rank_addr = g_my_addr; } else { // my IP is... HELP! struct hostent *myhostname = gethostbyname(hostname); @@ -152,17 +152,18 @@ int Rank::Accepter::start() memcpy((char*)&listen_addr.sin_addr.s_addr, myhostname->h_addr_list[0], myhostname->h_length); - rank.my_addr.set_addr(listen_addr); - rank.my_addr.v.port = 0; // see below + rank.rank_addr.set_addr(listen_addr); + rank.rank_addr.v.port = 0; // see below } - if (rank.my_addr.v.port == 0) { + if (rank.rank_addr.v.port == 0) { entity_addr_t tmp; tmp.set_addr(listen_addr); - rank.my_addr.v.port = tmp.v.port; - rank.my_addr.v.nonce = getpid(); // FIXME: pid might not be best choice here. + rank.rank_addr.v.port = tmp.v.port; + rank.rank_addr.v.nonce = getpid(); // FIXME: pid might not be best choice here. } + rank.rank_addr.v.erank = 0; - dout(1) << "accepter.start my_addr is " << rank.my_addr << dendl; + dout(1) << "accepter.start rank_addr is " << rank.rank_addr << dendl; // set up signal handler //old_sigint_handler = signal(SIGINT, simplemessenger_sigint); @@ -203,7 +204,7 @@ void *Rank::Accepter::entry() dout(10) << "accepted incoming on sd " << sd << dendl; rank.lock.Lock(); - if (!rank.local.empty()) { + if (rank.num_local > 0) { Pipe *p = new Pipe(sd); rank.pipes.insert(p); } @@ -236,7 +237,7 @@ int Rank::Pipe::accept() // my creater gave me sd via accept() // announce myself. - int rc = tcp_write(sd, (char*)&rank.my_addr, sizeof(rank.my_addr)); + int rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr)); if (rc < 0) { ::close(sd); done = true; @@ -246,7 +247,7 @@ int Rank::Pipe::accept() // identify peer rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr)); if (rc < 0) { - dout(10) << "pipe(? " << this << ").accept couldn't read peer inst" << dendl; + dout(10) << "pipe(? " << this << ").accept couldn't read peer addr" << dendl; ::close(sd); done = true; return -1; @@ -321,18 +322,18 @@ int Rank::Pipe::connect() entity_addr_t paddr; rc = tcp_read(sd, (char*)&paddr, sizeof(paddr)); if (!rc) { // bool - dout(10) << "pipe(" << peer_addr << ' ' << this << ").connect couldn't read peer addr" << dendl; + dout(0) << "pipe(" << peer_addr << ' ' << this << ").connect couldn't read peer addr" << dendl; return -1; } - if (peer_addr != paddr) { - dout(10) << "pipe(" << peer_addr << ' ' << this << ").connect peer identifies itself as " << paddr << ", wrong guy!" << dendl; + if (!ceph_entity_addr_is_local(peer_addr.v, paddr.v)) { + dout(0) << "pipe(" << peer_addr << ' ' << this << ").connect peer identifies itself as " << paddr << ", wrong guy!" << dendl; ::close(sd); sd = 0; return -1; } // identify myself - rc = tcp_write(sd, (char*)&rank.my_addr, sizeof(rank.my_addr)); + rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr)); if (rc < 0) return -1; @@ -439,31 +440,12 @@ void Rank::Pipe::reader() rank.lock.Lock(); { - if (g_conf.ms_single_dispatch) { - // submit to single dispatch queue - rank._submit_single_dispatch(m); + unsigned erank = m->get_dest_inst().addr.v.erank; + if (erank < rank.max_local && rank.local[erank]) { + // find entity + entity = rank.local[erank]; } else { - if (rank.local.count(m->get_dest())) { - // find entity - entity = rank.local[m->get_dest()]; - } else { - entity = rank.find_unnamed(m->get_dest()); - if (entity) { - dout(3) << "pipe(" << peer_addr << ' ' << this << ").reader blessing " << m->get_dest() << dendl; - //entity->reset_myname(m->get_dest()); - rank.local.erase(entity->get_myname()); - rank.local[m->get_dest()] = entity; - entity->_set_myname(m->get_dest()); - - } else { - if (rank.stopped.count(m->get_dest())) { - // ignore it - } else { - derr(0) << "pipe(" << peer_addr << ' ' << this << ").reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl; - //assert(0); // FIXME do this differently - } - } - } + derr(0) << "pipe(" << peer_addr << ' ' << this << ").reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl; } } rank.lock.Unlock(); @@ -540,9 +522,6 @@ void Rank::Pipe::writer() dout(20) << "pipe(" << peer_addr << ' ' << this << ").writer sending " << m << " " << *m << dendl; - // stamp. - m->set_source_addr(rank.my_addr); - // marshall if (m->empty_payload()) m->encode_payload(); @@ -848,11 +827,12 @@ void Rank::Pipe::fail(list& out) // sort while (!q.empty()) { + unsigned srcrank = q.front()->get_source_inst().addr.v.erank; if (q.front()->get_type() == MSG_CLOSE) { delete q.front(); } - else if (rank.local.count(q.front()->get_source())) { - EntityMessenger *mgr = rank.local[q.front()->get_source()]; + else if (srcrank < rank.max_local && rank.local[srcrank]) { + EntityMessenger *mgr = rank.local[srcrank]; Dispatcher *dis = mgr->get_dispatcher(); if (mgr->is_stopped()) { // ignore. @@ -896,69 +876,6 @@ void Rank::Pipe::fail(list& out) * Rank */ -Rank::Rank() : - single_dispatcher(this), - started(false) { -} -Rank::~Rank() -{ -} - -/* -void Rank::set_listen_addr(tcpaddr_t& a) -{ - dout(10) << "set_listen_addr " << a << dendl; - memcpy((char*)&listen_addr.sin_addr.s_addr, (char*)&a.sin_addr.s_addr, 4); - listen_addr.sin_port = a.sin_port; -} -*/ - -void Rank::_submit_single_dispatch(Message *m) -{ - assert(lock.is_locked()); - - if (local.count(m->get_dest()) && - local[m->get_dest()]->is_ready()) { - rank.single_dispatch_queue.push_back(m); - rank.single_dispatch_cond.Signal(); - } else { - waiting_for_ready[m->get_dest()].push_back(m); - } -} - - -void Rank::single_dispatcher_entry() -{ - lock.Lock(); - while (!single_dispatch_stop || !single_dispatch_queue.empty()) { - if (!single_dispatch_queue.empty()) { - list ls; - ls.swap(single_dispatch_queue); - - lock.Unlock(); - { - while (!ls.empty()) { - Message *m = ls.front(); - ls.pop_front(); - - dout(1) << m->get_dest() - << " <-- " << m->get_source_inst() - << " ---- " << *m - << " -- " << m - << dendl; - - assert(local.count(m->get_dest())); - local[m->get_dest()]->dispatch(m); - } - } - lock.Lock(); - continue; - } - single_dispatch_cond.Wait(lock); - } - lock.Unlock(); -} - /* * note: assumes lock is held @@ -996,15 +913,9 @@ int Rank::start_rank() if (accepter.start() < 0) return -1; - // start single thread dispatcher? - if (g_conf.ms_single_dispatch) { - single_dispatch_stop = false; - single_dispatcher.create(); - } - lock.Lock(); - dout(1) << "start_rank at " << my_addr << dendl; + dout(1) << "start_rank at " << rank_addr << dendl; started = true; lock.Unlock(); return 0; @@ -1018,7 +929,7 @@ int Rank::start_rank() Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr) { assert(rank.lock.is_locked()); - assert(addr != rank.my_addr); + assert(addr != rank.rank_addr); dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; @@ -1038,20 +949,6 @@ Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr) -Rank::EntityMessenger *Rank::find_unnamed(entity_name_t a) -{ - // find an unnamed (and _ready_) local entity of the right type - for (map::iterator p = local.begin(); - p != local.end(); - ++p) { - if (p->first.type() == a.type() && p->first.is_new() && - p->second->is_ready()) - return p->second; - } - return 0; -} - - /* register_entity @@ -1062,11 +959,22 @@ Rank::EntityMessenger *Rank::register_entity(entity_name_t name) lock.Lock(); // create messenger - EntityMessenger *msgr = new EntityMessenger(name); + int erank = max_local; + EntityMessenger *msgr = new EntityMessenger(name, erank); // add to directory - assert(local.count(name) == 0); - local[name] = msgr; + max_local++; + local.resize(max_local); + stopped.resize(max_local); + + local[erank] = msgr; + stopped[erank] = false; + msgr->my_addr = rank_addr; + msgr->my_addr.v.erank = erank; + + dout(0) << "register_entity " << name << " at " << msgr->my_addr << dendl; + + num_local++; lock.Unlock(); return msgr; @@ -1079,11 +987,10 @@ void Rank::unregister_entity(EntityMessenger *msgr) dout(10) << "unregister_entity " << msgr->get_myname() << dendl; // remove from local directory. - entity_name_t name = msgr->get_myname(); - assert(local.count(name)); - local.erase(name); - - stopped.insert(name); + local[msgr->my_rank] = 0; + stopped[msgr->my_rank] = true; + num_local--; + wait_cond.Signal(); lock.Unlock(); @@ -1101,15 +1008,11 @@ void Rank::submit_message(Message *m, const entity_addr_t& dest_addr) lock.Lock(); { // local? - if (dest_addr == my_addr) { - if (local.count(dest)) { + if (ceph_entity_addr_is_local(dest_addr.v, rank_addr.v)) { + if (dest_addr.v.erank < max_local && local[dest_addr.v.erank]) { // local dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl; - if (g_conf.ms_single_dispatch) { - _submit_single_dispatch(m); - } else { - entity = local[dest]; - } + entity = local[dest_addr.v.erank]; } else { derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map?" << dendl; //assert(0); // hmpf, this is probably mds->mon beacon from newsyn. @@ -1155,7 +1058,7 @@ void Rank::wait() // reap dead pipes reaper(); - if (local.empty()) { + if (num_local == 0) { dout(10) << "wait: everything stopped" << dendl; break; // everything stopped. } else { @@ -1171,16 +1074,6 @@ void Rank::wait() accepter.stop(); dout(20) << "wait: stopped accepter thread" << dendl; - // stop dispatch thread - if (g_conf.ms_single_dispatch) { - dout(10) << "wait: stopping dispatch thread" << dendl; - lock.Lock(); - single_dispatch_stop = true; - single_dispatch_cond.Signal(); - lock.Unlock(); - single_dispatcher.join(); - } - // close+reap all pipes lock.Lock(); { @@ -1276,19 +1169,8 @@ void Rank::EntityMessenger::ready() dout(10) << "ready " << get_myaddr() << dendl; assert(!dispatch_thread.is_started()); - if (g_conf.ms_single_dispatch) { - rank.lock.Lock(); - if (rank.waiting_for_ready.count(get_myname())) { - rank.single_dispatch_queue.splice(rank.single_dispatch_queue.end(), - rank.waiting_for_ready[get_myname()]); - rank.waiting_for_ready.erase(get_myname()); - rank.single_dispatch_cond.Signal(); - } - rank.lock.Unlock(); - } else { - // start my dispatch thread - dispatch_thread.create(); - } + // start my dispatch thread + dispatch_thread.create(); } @@ -1333,7 +1215,7 @@ int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest, { // set envelope m->set_source(get_myname(), fromport); - m->set_source_addr(rank.my_addr); + m->set_source_addr(my_addr); m->set_dest_inst(dest); m->set_dest_port(port); @@ -1363,7 +1245,7 @@ int Rank::EntityMessenger::send_first_message(Dispatcher *d, // set envelope m->set_source(get_myname(), fromport); - m->set_source_addr(rank.my_addr); + m->set_source_addr(my_addr); m->set_dest_inst(dest); m->set_dest_port(port); rank.lock.Unlock(); @@ -1380,25 +1262,12 @@ int Rank::EntityMessenger::send_first_message(Dispatcher *d, } -const entity_addr_t &Rank::EntityMessenger::get_myaddr() -{ - return rank.my_addr; -} - void Rank::EntityMessenger::reset_myname(entity_name_t newname) { - rank.lock.Lock(); - { - entity_name_t oldname = get_myname(); - dout(10) << "reset_myname " << oldname << " to " << newname << dendl; - - rank.local.erase(oldname); - rank.local[newname] = this; - - _set_myname(newname); - } - rank.lock.Unlock(); + entity_name_t oldname = get_myname(); + dout(10) << "reset_myname " << oldname << " to " << newname << dendl; + _set_myname(newname); } diff --git a/trunk/ceph/msg/SimpleMessenger.h b/trunk/ceph/msg/SimpleMessenger.h index 6bd417adc8e10..e5fa8005df28d 100644 --- a/trunk/ceph/msg/SimpleMessenger.h +++ b/trunk/ceph/msg/SimpleMessenger.h @@ -158,7 +158,6 @@ private: }; - // messenger interface class EntityMessenger : public Messenger { Mutex lock; @@ -167,6 +166,8 @@ private: list prio_dispatch_queue; bool stop; int qlen, pqlen; + int my_rank; + entity_addr_t my_addr; class DispatchThread : public Thread { EntityMessenger *m; @@ -179,6 +180,8 @@ private: } dispatch_thread; void dispatch_entry(); + friend class Rank; + public: void queue_message(Message *m) { // set recv stamp @@ -197,10 +200,11 @@ private: } public: - EntityMessenger(entity_name_t myaddr) : + EntityMessenger(entity_name_t myaddr, int r) : Messenger(myaddr), stop(false), qlen(0), pqlen(0), + my_rank(r), dispatch_thread(this) { } ~EntityMessenger() { // join dispatch thread @@ -215,7 +219,7 @@ private: dispatch_thread.join(); } - const entity_addr_t &get_myaddr(); + const entity_addr_t &get_myaddr() { return my_addr; } int get_dispatch_queue_len() { return qlen + pqlen; } @@ -235,26 +239,6 @@ private: }; - class SingleDispatcher : public Thread { - Rank *rank; - public: - SingleDispatcher(Rank *r) : rank(r) {} - void *entry() { - rank->single_dispatcher_entry(); - return 0; - } - } single_dispatcher; - - Cond single_dispatch_cond; - bool single_dispatch_stop; - list single_dispatch_queue; - - map > waiting_for_ready; - - void single_dispatcher_entry(); - void _submit_single_dispatch(Message *m); - - // Rank stuff public: Mutex lock; @@ -262,12 +246,12 @@ private: bool started; // where i listen - entity_addr_t my_addr; + entity_addr_t rank_addr; // local - map local; - set stopped; - //hash_set entity_unstarted; + unsigned max_local, num_local; + vector local; + vector stopped; // remote hash_map rank_pipe; @@ -277,18 +261,16 @@ private: Pipe *connect_rank(const entity_addr_t& addr); - void mark_down(entity_addr_t addr); - //void mark_up(entity_name_t addr, entity_addr_t& i); + const entity_addr_t &get_rank_addr() { return rank_addr; } - entity_addr_t get_my_addr() { return my_addr; } + void mark_down(entity_addr_t addr); void reaper(); - EntityMessenger *find_unnamed(entity_name_t a); - public: - Rank(); - ~Rank(); + Rank() : started(false), + max_local(0), num_local(0) { } + ~Rank() { } //void set_listen_addr(tcpaddr_t& a); diff --git a/trunk/ceph/msg/msg_types.h b/trunk/ceph/msg/msg_types.h index 52b1e69c8886c..7346c8947093a 100644 --- a/trunk/ceph/msg/msg_types.h +++ b/trunk/ceph/msg/msg_types.h @@ -127,7 +127,8 @@ inline ostream& operator<<(ostream& out, const entity_addr_t &addr) << '.' << (int)addr.v.ipq[2] << '.' << (int)addr.v.ipq[3] << ':' << addr.v.port - << '.' << addr.v.nonce; + << '#' << addr.v.nonce + << '@' << addr.v.erank; } inline bool operator==(const entity_addr_t& a, const entity_addr_t& b) { return memcmp(&a, &b, sizeof(a)) == 0; } diff --git a/trunk/ceph/newsyn.cc b/trunk/ceph/newsyn.cc index e580e49a9b7e9..4b37591cdca69 100644 --- a/trunk/ceph/newsyn.cc +++ b/trunk/ceph/newsyn.cc @@ -81,7 +81,7 @@ pair mpi_bootstrap_new(int& argc, char**& argv, MonMap *monmap) rank.start_rank(); // bind and listen if (mpi_rank < g_conf.num_mon) { - moninst[mpi_rank].addr = rank.my_addr; + moninst[mpi_rank].addr = rank.rank_addr; moninst[mpi_rank].name = entity_name_t(entity_name_t::TYPE_MON, mpi_rank); //cerr << mpi_rank << " at " << rank.get_listen_addr() << std::endl; @@ -225,7 +225,7 @@ int main(int argc, char **argv) // start up messenger via MPI MonMap *monmap = new MonMap(g_conf.num_mon); pair mpiwho = mpi_bootstrap_new(argc, argv, monmap); - int myrank = mpiwho.first; + int mpirank = mpiwho.first; int world = mpiwho.second; int need = 0; @@ -241,7 +241,7 @@ int main(int argc, char **argv) } assert(need <= world); - if (myrank == 0) + if (mpirank == 0) cerr << "nummds " << start_mds << " numosd " << start_osd << " numclient " << start_client << " .. need " << need << ", have " << world << std::endl; @@ -251,7 +251,7 @@ int main(int argc, char **argv) int started = 0; - //if (myrank == 0) g_conf.debug = 20; + //if (mpirank == 0) g_conf.debug = 20; // courtesy symlinks char ffrom[100]; @@ -260,11 +260,11 @@ int main(int argc, char **argv) // create mon - if (myrank < g_conf.num_mon) { - Monitor *mon = new Monitor(myrank, rank.register_entity(entity_name_t(entity_name_t::TYPE_MON, myrank)), monmap); + if (mpirank < g_conf.num_mon) { + Monitor *mon = new Monitor(mpirank, rank.register_entity(entity_name_t(entity_name_t::TYPE_MON, mpirank)), monmap); mon->init(); if (g_conf.dout_dir) { - sprintf(ffrom, "%s/mon%d", g_conf.dout_dir, myrank); + sprintf(ffrom, "%s/mon%d", g_conf.dout_dir, mpirank); ::symlink(fto, ffrom); } } @@ -280,9 +280,9 @@ int main(int argc, char **argv) map mds; map mdsosd; for (int i=0; iget_myaddr() << " " << hostname << "." << pid << std::endl; if (g_conf.dout_dir) { sprintf(ffrom, "%s/mds%d", g_conf.dout_dir, i); ::symlink(fto, ffrom); @@ -309,16 +309,16 @@ int main(int argc, char **argv) int osds_per_node = (start_osd-1)/max_osd_nodes + 1; for (int i=0; iget_myaddr() << " " << hostname << "." << pid << std::endl; if (g_conf.dout_dir) { sprintf(ffrom, "%s/osd%d", g_conf.dout_dir, i); ::symlink(fto, ffrom); @@ -344,10 +344,10 @@ int main(int argc, char **argv) map syn;//[start_client]; int nclients = 0; for (int i=0; istart_thread(); } if (nclients) { - cerr << nclients << " clients at " << rank.my_addr << " " << hostname << "." << pid << std::endl; + cerr << nclients << " clients at " << rank.rank_addr << " " << hostname << "." << pid << std::endl; } for (set::iterator it = clientlist.begin(); @@ -384,9 +384,9 @@ int main(int argc, char **argv) } - if (myrank && !started) { + if (mpirank && !started) { //dout(1) << "IDLE" << dendl; - cerr << "idle at " << rank.my_addr << " rank " << myrank << " " << hostname << "." << pid << std::endl; + cerr << "idle at " << rank.rank_addr << " mpirank " << mpirank << " " << hostname << "." << pid << std::endl; } // wait for everything to finish @@ -396,7 +396,7 @@ int main(int argc, char **argv) // cd on exit, so that gmon.out (if any) goes into a separate directory for each node. char s[20]; - sprintf(s, "gmon/%d", myrank); + sprintf(s, "gmon/%d", mpirank); mkdir(s, 0755); chdir(s); @@ -417,15 +417,15 @@ int main(int argc, char **argv) */ /* for (int i=0; i