From: Sage Weil Date: Fri, 1 May 2009 13:55:54 +0000 (-0700) Subject: msgr: clean up refs to static 'rank' X-Git-Tag: v0.7.3~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=2519f3c6234e735b96c08dab7f70fab420296137;p=ceph-ci.git msgr: clean up refs to static 'rank' --- diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index b6eaadb85e6..b967ca0d830 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -48,7 +48,7 @@ static ostream& _prefix() { // help find socket resource leaks -int sockopen = 0; +//static int sockopen = 0; #define closed_socket() //dout(20) << "closed_socket " << --sockopen << dendl; #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl; @@ -65,36 +65,6 @@ sighandler_t old_sigint_handler = 0; * Accepter */ -void simplemessenger_sigint(int r) -{ - rank.sigint(); - if (old_sigint_handler) - old_sigint_handler(r); -} - -void Rank::sigint() -{ - lock.Lock(); - derr(0) << "got control-c, exiting" << dendl; - - // force close listener socket - if (accepter.listen_sd >= 0) { - ::close(accepter.listen_sd); - accepter.listen_sd = -1; - closed_socket(); - } - - // force close all pipe sockets, too - for (hash_map::iterator p = rank_pipe.begin(); - p != rank_pipe.end(); - ++p) - p->second->force_close(); - - lock.Unlock(); -} - - - void noop_signal_handler(int s) { //dout(0) << "blah_handler got " << s << dendl; @@ -160,24 +130,24 @@ int Rank::Accepter::bind(int64_t force_nonce) return -errno; } - rank.rank_addr = g_my_addr; - if (rank.rank_addr != entity_addr_t()) - rank.need_addr = false; + rank->rank_addr = g_my_addr; + if (rank->rank_addr != entity_addr_t()) + rank->need_addr = false; else - rank.need_addr = true; - if (rank.rank_addr.get_port() == 0) { + rank->need_addr = true; + if (rank->rank_addr.get_port() == 0) { entity_addr_t tmp; tmp.ipaddr = listen_addr; - rank.rank_addr.set_port(tmp.get_port()); + rank->rank_addr.set_port(tmp.get_port()); if (force_nonce >= 0) - rank.rank_addr.nonce = force_nonce; + rank->rank_addr.nonce = force_nonce; else - rank.rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here. + rank->rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here. } - rank.rank_addr.erank = 0; + rank->rank_addr.erank = 0; - dout(1) << "accepter.bind rank_addr is " << rank.rank_addr - << " need_addr=" << rank.need_addr + dout(1) << "accepter.bind rank_addr is " << rank->rank_addr + << " need_addr=" << rank->need_addr << dendl; return 0; } @@ -185,8 +155,6 @@ int Rank::Accepter::bind(int64_t force_nonce) int Rank::Accepter::start() { dout(1) << "accepter.start" << dendl; - // set up signal handler - //old_sigint_handler = signal(SIGINT, simplemessenger_sigint); // set a harmless handle for SIGUSR1 (we'll use it to stop the accepter) struct sigaction sa; @@ -235,14 +203,14 @@ void *Rank::Accepter::entry() dout(0) << "accepter could't set TCP_NODELAY: " << strerror(errno) << dendl; } - rank.lock.Lock(); - if (rank.num_local > 0) { - Pipe *p = new Pipe(Pipe::STATE_ACCEPTING); + rank->lock.Lock(); + if (rank->num_local > 0) { + Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING); p->sd = sd; p->start_reader(); - rank.pipes.insert(p); + rank->pipes.insert(p); } - rank.lock.Unlock(); + rank->lock.Unlock(); } else { dout(0) << "accepter no incoming connection? sd = " << sd << " errno " << errno << " " << strerror(errno) << dendl; if (++errors > 4) @@ -366,7 +334,7 @@ static void remove_pid_file() int Rank::start(bool nodaemon) { // register at least one entity, first! - assert(rank.my_type >= 0); + assert(my_type >= 0); lock.Lock(); if (started) { @@ -428,13 +396,13 @@ int Rank::start(bool nodaemon) */ Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr, const Policy& p) { - assert(rank.lock.is_locked()); - assert(addr != rank.rank_addr); + assert(lock.is_locked()); + assert(addr != rank_addr); dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; // create pipe - Pipe *pipe = new Pipe(Pipe::STATE_CONNECTING); + Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING); pipe->policy = p; pipe->peer_addr = addr; pipe->start_writer(); @@ -453,14 +421,14 @@ Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr, const Policy& p) /* register_entity */ -Rank::EntityMessenger *Rank::register_entity(entity_name_t name) +Rank::Endpoint *Rank::register_entity(entity_name_t name) { dout(10) << "register_entity " << name << dendl; lock.Lock(); // create messenger int erank = max_local; - EntityMessenger *msgr = new EntityMessenger(name, erank); + Endpoint *msgr = new Endpoint(this, name, erank); // now i know my type. if (my_type >= 0) @@ -492,7 +460,7 @@ Rank::EntityMessenger *Rank::register_entity(entity_name_t name) } -void Rank::unregister_entity(EntityMessenger *msgr) +void Rank::unregister_entity(Endpoint *msgr) { lock.Lock(); dout(10) << "unregister_entity " << msgr->get_myname() << dendl; @@ -653,10 +621,10 @@ void Rank::wait() /********************************** - * EntityMessenger + * Endpoint */ -void Rank::EntityMessenger::dispatch_entry() +void Rank::Endpoint::dispatch_entry() { lock.Lock(); while (!stop) { @@ -727,11 +695,11 @@ void Rank::EntityMessenger::dispatch_entry() dout(15) << "dispatch: ending loop " << dendl; // deregister - rank.unregister_entity(this); + rank->unregister_entity(this); put(); } -void Rank::EntityMessenger::ready() +void Rank::Endpoint::ready() { dout(10) << "ready " << get_myaddr() << dendl; assert(!dispatch_thread.is_started()); @@ -740,7 +708,7 @@ void Rank::EntityMessenger::ready() } -int Rank::EntityMessenger::shutdown() +int Rank::Endpoint::shutdown() { dout(10) << "shutdown " << get_myaddr() << dendl; @@ -758,24 +726,24 @@ int Rank::EntityMessenger::shutdown() return 0; } -void Rank::EntityMessenger::suicide() +void Rank::Endpoint::suicide() { dout(10) << "suicide " << get_myaddr() << dendl; shutdown(); // hmm, or exit(0)? } -void Rank::EntityMessenger::prepare_dest(const entity_inst_t& inst) +void Rank::Endpoint::prepare_dest(const entity_inst_t& inst) { - rank.lock.Lock(); + rank->lock.Lock(); { - if (rank.rank_pipe.count(inst.addr) == 0) - rank.connect_rank(inst.addr, rank.policy_map[inst.name.type()]); + if (rank->rank_pipe.count(inst.addr) == 0) + rank->connect_rank(inst.addr, rank->policy_map[inst.name.type()]); } - rank.lock.Unlock(); + rank->lock.Unlock(); } -int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest) +int Rank::Endpoint::send_message(Message *m, entity_inst_t dest) { // set envelope m->set_source_inst(_myinst); @@ -795,12 +763,12 @@ int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest) << " " << m << dendl; - rank.submit_message(m, dest.addr); + rank->submit_message(m, dest.addr); return 0; } -int Rank::EntityMessenger::forward_message(Message *m, entity_inst_t dest) +int Rank::Endpoint::forward_message(Message *m, entity_inst_t dest) { // set envelope m->set_source_inst(_myinst); @@ -819,14 +787,14 @@ int Rank::EntityMessenger::forward_message(Message *m, entity_inst_t dest) << " " << m << dendl; - rank.submit_message(m, dest.addr); + rank->submit_message(m, dest.addr); return 0; } -int Rank::EntityMessenger::lazy_send_message(Message *m, entity_inst_t dest) +int Rank::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) { // set envelope m->set_source_inst(_myinst); @@ -846,14 +814,14 @@ int Rank::EntityMessenger::lazy_send_message(Message *m, entity_inst_t dest) << " " << m << dendl; - rank.submit_message(m, dest.addr, true); + rank->submit_message(m, dest.addr, true); return 0; } -void Rank::EntityMessenger::reset_myname(entity_name_t newname) +void Rank::Endpoint::reset_myname(entity_name_t newname) { entity_name_t oldname = get_myname(); dout(10) << "reset_myname " << oldname << " to " << newname << dendl; @@ -861,9 +829,9 @@ void Rank::EntityMessenger::reset_myname(entity_name_t newname) } -void Rank::EntityMessenger::mark_down(entity_addr_t a) +void Rank::Endpoint::mark_down(entity_addr_t a) { - rank.mark_down(a); + rank->mark_down(a); } void Rank::mark_down(entity_addr_t addr) @@ -894,7 +862,7 @@ void Rank::mark_down(entity_addr_t addr) #define dout_prefix _pipe_prefix() ostream& Rank::Pipe::_pipe_prefix() { return *_dout << dbeginl << pthread_self() - << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this + << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this << " sd=" << sd << " pgs=" << peer_global_seq << " cs=" << connect_seq @@ -917,7 +885,7 @@ int Rank::Pipe::accept() } // and my addr - rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr)); + rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr)); if (rc < 0) { dout(10) << "accept couldn't write my addr" << dendl; state = STATE_CLOSED; @@ -977,17 +945,17 @@ int Rank::Pipe::accept() << " global_seq " << connect.global_seq << dendl; - rank.lock.Lock(); + rank->lock.Lock(); // note peer's type, flags - policy = rank.policy_map[connect.host_type]; /* apply policy */ + policy = rank->policy_map[connect.host_type]; /* apply policy */ lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY; memset(&reply, 0, sizeof(reply)); // existing? - if (rank.rank_pipe.count(peer_addr)) { - existing = rank.rank_pipe[peer_addr]; + if (rank->rank_pipe.count(peer_addr)) { + existing = rank->rank_pipe[peer_addr]; existing->lock.Lock(); if (connect.global_seq < existing->peer_global_seq) { @@ -996,7 +964,7 @@ int Rank::Pipe::accept() reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL; reply.global_seq = existing->peer_global_seq; // so we can send it below.. existing->lock.Unlock(); - rank.lock.Unlock(); + rank->lock.Unlock(); goto reply; } else { dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq @@ -1038,14 +1006,14 @@ int Rank::Pipe::accept() reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; reply.connect_seq = existing->connect_seq; // so we can send it below.. existing->lock.Unlock(); - rank.lock.Unlock(); + rank->lock.Unlock(); goto reply; } } if (connect.connect_seq == existing->connect_seq) { // connection race? - if (peer_addr < rank.rank_addr) { + if (peer_addr < rank->rank_addr) { // incoming wins dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", replacing my attempt" << dendl; @@ -1057,11 +1025,11 @@ int Rank::Pipe::accept() // our existing outgoing wins dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", sending WAIT" << dendl; - assert(peer_addr > rank.rank_addr); + assert(peer_addr > rank->rank_addr); assert(existing->state == STATE_CONNECTING); // this will win reply.tag = CEPH_MSGR_TAG_WAIT; existing->lock.Unlock(); - rank.lock.Unlock(); + rank->lock.Unlock(); goto reply; } } @@ -1073,7 +1041,7 @@ int Rank::Pipe::accept() << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; reply.tag = CEPH_MSGR_TAG_RESETSESSION; - rank.lock.Unlock(); + rank->lock.Unlock(); existing->lock.Unlock(); goto reply; } @@ -1086,7 +1054,7 @@ int Rank::Pipe::accept() else if (connect.connect_seq > 0) { // we reset, and they are opening a new session dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; - rank.lock.Unlock(); + rank->lock.Unlock(); reply.tag = CEPH_MSGR_TAG_RESETSESSION; goto reply; } else { @@ -1129,7 +1097,7 @@ int Rank::Pipe::accept() // send READY reply reply.tag = CEPH_MSGR_TAG_READY; - reply.global_seq = rank.get_global_seq(); + reply.global_seq = rank->get_global_seq(); reply.connect_seq = connect_seq; reply.flags = 0; if (policy.lossy_tx) @@ -1137,7 +1105,7 @@ int Rank::Pipe::accept() // ok! register_pipe(); - rank.lock.Unlock(); + rank->lock.Unlock(); rc = tcp_write(sd, (char*)&reply, sizeof(reply)); if (rc < 0) @@ -1154,7 +1122,7 @@ int Rank::Pipe::accept() fail: - rank.lock.Unlock(); + rank->lock.Unlock(); fail_unlocked: lock.Lock(); state = STATE_CLOSED; @@ -1174,7 +1142,7 @@ int Rank::Pipe::connect() closed_socket(); } __u32 cseq = connect_seq; - __u32 gseq = rank.get_global_seq(); + __u32 gseq = rank->get_global_seq(); // stop reader thrad join_reader(); @@ -1273,8 +1241,8 @@ int Rank::Pipe::connect() // identify myself memset(&msg, 0, sizeof(msg)); - msgvec[0].iov_base = (char*)&rank.rank_addr; - msgvec[0].iov_len = sizeof(rank.rank_addr); + msgvec[0].iov_base = (char*)&rank->rank_addr; + msgvec[0].iov_len = sizeof(rank->rank_addr); msg.msg_iov = msgvec; msg.msg_iovlen = 1; msglen = msgvec[0].iov_len; @@ -1282,11 +1250,11 @@ int Rank::Pipe::connect() dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl; goto fail; } - dout(10) << "connect sent my addr " << rank.rank_addr << dendl; + dout(10) << "connect sent my addr " << rank->rank_addr << dendl; while (1) { ceph_msg_connect connect; - connect.host_type = rank.my_type; + connect.host_type = rank->my_type; connect.global_seq = gseq; connect.connect_seq = cseq; connect.flags = 0; @@ -1331,7 +1299,7 @@ int Rank::Pipe::connect() continue; } if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { - gseq = rank.get_global_seq(reply.global_seq); + gseq = rank->get_global_seq(reply.global_seq); dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq << " chose new " << gseq << dendl; lock.Unlock(); @@ -1389,18 +1357,18 @@ int Rank::Pipe::connect() void Rank::Pipe::register_pipe() { dout(10) << "register_pipe" << dendl; - assert(rank.lock.is_locked()); - assert(rank.rank_pipe.count(peer_addr) == 0); - rank.rank_pipe[peer_addr] = this; + assert(rank->lock.is_locked()); + assert(rank->rank_pipe.count(peer_addr) == 0); + rank->rank_pipe[peer_addr] = this; } void Rank::Pipe::unregister_pipe() { - assert(rank.lock.is_locked()); - if (rank.rank_pipe.count(peer_addr) && - rank.rank_pipe[peer_addr] == this) { + assert(rank->lock.is_locked()); + if (rank->rank_pipe.count(peer_addr) && + rank->rank_pipe[peer_addr] == this) { dout(10) << "unregister_pipe" << dendl; - rank.rank_pipe.erase(peer_addr); + rank->rank_pipe.erase(peer_addr); } else { dout(10) << "unregister_pipe - not registered" << dendl; } @@ -1519,15 +1487,15 @@ void Rank::Pipe::fail() stop(); report_failures(); - for (unsigned i=0; iget_dispatcher()) - rank.local[i]->queue_reset(peer_addr, last_dest_name); + for (unsigned i=0; ilocal.size(); i++) + if (rank->local[i] && rank->local[i]->get_dispatcher()) + rank->local[i]->queue_reset(peer_addr, last_dest_name); // unregister lock.Unlock(); - rank.lock.Lock(); + rank->lock.Lock(); unregister_pipe(); - rank.lock.Unlock(); + rank->lock.Unlock(); lock.Lock(); } @@ -1537,9 +1505,9 @@ void Rank::Pipe::was_session_reset() dout(10) << "was_session_reset" << dendl; report_failures(); - for (unsigned i=0; iget_dispatcher()) - rank.local[i]->queue_remote_reset(peer_addr, last_dest_name); + for (unsigned i=0; ilocal.size(); i++) + if (rank->local[i] && rank->local[i]->get_dispatcher()) + rank->local[i]->queue_remote_reset(peer_addr, last_dest_name); out_seq = 0; in_seq = 0; @@ -1557,13 +1525,13 @@ void Rank::Pipe::report_failures() if (policy.drop_msg_callback) { unsigned srcrank = m->get_source_inst().addr.erank; - if (srcrank >= rank.max_local || rank.local[srcrank] == 0) { + if (srcrank >= rank->max_local || rank->local[srcrank] == 0) { dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl; - } else if (rank.local[srcrank]->is_stopped()) { + } else if (rank->local[srcrank]->is_stopped()) { dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl; } else { dout(10) << "fail on " << *m << dendl; - rank.local[srcrank]->queue_failure(m, m->get_dest_inst()); + rank->local[srcrank]->queue_failure(m, m->get_dest_inst()); } } m->put(); @@ -1685,14 +1653,14 @@ void Rank::Pipe::reader() << " for " << m->get_dest() << dendl; // deliver - EntityMessenger *entity = 0; + Endpoint *entity = 0; - rank.lock.Lock(); + rank->lock.Lock(); { unsigned erank = m->get_dest_inst().addr.erank; - if (erank < rank.max_local && rank.local[erank]) { + if (erank < rank->max_local && rank->local[erank]) { // find entity - entity = rank.local[erank]; + entity = rank->local[erank]; entity->get(); // first message? @@ -1702,18 +1670,18 @@ void Rank::Pipe::reader() entity->need_addr = false; } - if (rank.need_addr) { - rank.rank_addr = m->get_dest_inst().addr; - rank.rank_addr.erank = 0; - dout(2) << "reader rank_addr is " << rank.rank_addr << dendl; - rank.need_addr = false; + if (rank->need_addr) { + rank->rank_addr = m->get_dest_inst().addr; + rank->rank_addr.erank = 0; + dout(2) << "reader rank_addr is " << rank->rank_addr << dendl; + rank->need_addr = false; } } else { derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl; } } - rank.lock.Unlock(); + rank->lock.Unlock(); if (entity) { entity->queue_message(m); // queue @@ -1756,12 +1724,12 @@ void Rank::Pipe::reader() sd = -1; closed_socket(); } - rank.lock.Lock(); + rank->lock.Lock(); { - rank.pipe_reap_queue.push_back(this); - rank.wait_cond.Signal(); + rank->pipe_reap_queue.push_back(this); + rank->wait_cond.Signal(); } - rank.lock.Unlock(); + rank->lock.Unlock(); } dout(10) << "reader done" << dendl; @@ -1875,12 +1843,12 @@ void Rank::Pipe::writer() sd = -1; closed_socket(); } - rank.lock.Lock(); + rank->lock.Lock(); { - rank.pipe_reap_queue.push_back(this); - rank.wait_cond.Signal(); + rank->pipe_reap_queue.push_back(this); + rank->wait_cond.Signal(); } - rank.lock.Unlock(); + rank->lock.Unlock(); } dout(10) << "writer done" << dendl; diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 1e4a4ef0068..98f808c7ceb 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -82,16 +82,17 @@ public: void sigint(); private: - class EntityMessenger; + class Endpoint; class Pipe; // incoming class Accepter : public Thread { public: + Rank *rank; bool done; int listen_sd; - Accepter() : done(false), listen_sd(-1) {} + Accepter(Rank *r) : rank(r), done(false), listen_sd(-1) {} void *entry(); void stop(); @@ -104,6 +105,7 @@ private: // pipe class Pipe { public: + Rank *rank; ostream& _pipe_prefix(); enum { @@ -177,7 +179,8 @@ private: friend class Writer; public: - Pipe(int st) : + Pipe(Rank *r, int st) : + rank(r), sd(-1), lock("Rank::Pipe::lock"), state(st), @@ -260,7 +263,8 @@ private: // messenger interface - class EntityMessenger : public Messenger { + class Endpoint : public Messenger { + Rank *rank; Mutex lock; Cond cond; map > dispatch_queue; @@ -272,9 +276,9 @@ private: private: class DispatchThread : public Thread { - EntityMessenger *m; + Endpoint *m; public: - DispatchThread(EntityMessenger *_m) : m(_m) {} + DispatchThread(Endpoint *_m) : m(_m) {} void *entry() { m->dispatch_entry(); return 0; @@ -327,15 +331,16 @@ private: } public: - EntityMessenger(entity_name_t name, int r) : + Endpoint(Rank *r, entity_name_t name, int rn) : Messenger(name), - lock("Rank::EntityMessenger::lock"), + rank(r), + lock("Rank::Endpoint::lock"), stop(false), qlen(0), - my_rank(r), + my_rank(rn), need_addr(false), dispatch_thread(this) { } - ~EntityMessenger() { } + ~Endpoint() { } void destroy() { // join dispatch thread @@ -380,7 +385,7 @@ private: // local unsigned max_local, num_local; - vector local; + vector local; vector stopped; // remote @@ -404,7 +409,8 @@ private: void reaper(); public: - Rank() : lock("Rank::lock"), started(false), need_addr(true), + Rank() : accepter(this), + lock("Rank::lock"), started(false), need_addr(true), max_local(0), num_local(0), my_type(-1), global_seq_lock("Rank::global_seq_lock"), global_seq(0) { } @@ -423,15 +429,15 @@ public: return ++global_seq; } - EntityMessenger *register_entity(entity_name_t addr); - void rename_entity(EntityMessenger *ms, entity_name_t newaddr); - void unregister_entity(EntityMessenger *ms); + Endpoint *register_entity(entity_name_t addr); + void rename_entity(Endpoint *ms, entity_name_t newaddr); + void unregister_entity(Endpoint *ms); void submit_message(Message *m, const entity_addr_t& addr, bool lazy=false); void prepare_dest(const entity_inst_t& inst); // create a new messenger - EntityMessenger *new_entity(entity_name_t addr); + Endpoint *new_entity(entity_name_t addr); void set_policy(int type, Policy p) { policy_map[type] = p;