From be072bef3ee98070c97dffe83568f9faaaa2a3bc Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Fri, 1 May 2009 07:11:46 -0700 Subject: [PATCH] msgr: kill static instance 'rank' of SimpleMessenger --- src/ceph.cc | 3 +- src/cfuse.cc | 7 +- src/cmds.cc | 9 +- src/cmon.cc | 12 +- src/cosd.cc | 9 +- src/csyn.cc | 7 +- src/dumpjournal.cc | 1 + src/mon/MonClient.cc | 4 +- src/msg/SimpleMessenger.cc | 3202 ++++++++++++++++++------------------ src/msg/SimpleMessenger.h | 34 +- src/testmsgr.cc | 1 + 11 files changed, 1648 insertions(+), 1641 deletions(-) diff --git a/src/ceph.cc b/src/ceph.cc index eff440dfe0af0..5884b0e9ba550 100644 --- a/src/ceph.cc +++ b/src/ceph.cc @@ -587,12 +587,13 @@ int main(int argc, const char **argv, const char *envp[]) return -1; // start up network + SimpleMessenger rank; rank.bind(); messenger = rank.register_entity(entity_name_t::ADMIN()); messenger->set_dispatcher(&dispatcher); rank.start(); - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0)); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0)); if (watch) { lock.Lock(); diff --git a/src/cfuse.cc b/src/cfuse.cc index b502e6099d034..f44aed720d8d5 100644 --- a/src/cfuse.cc +++ b/src/cfuse.cc @@ -69,6 +69,7 @@ int main(int argc, const char **argv, const char *envp[]) { return -1; // start up network + SimpleMessenger rank; rank.bind(); cout << "bound to " << rank.get_rank_addr() << ", mounting ceph" << std::endl; @@ -76,9 +77,9 @@ int main(int argc, const char **argv, const char *envp[]) { rank.start(); - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); // start client client->init(); diff --git a/src/cmds.cc b/src/cmds.cc index a1603dd317142..2e76b687ea7d3 100644 --- a/src/cmds.cc +++ b/src/cmds.cc @@ -68,6 +68,7 @@ int main(int argc, const char **argv) if (mc.get_monmap(&monmap) < 0) return -1; + SimpleMessenger rank; rank.bind(); cout << "starting mds." << g_conf.id << " at " << rank.get_rank_addr() @@ -79,10 +80,10 @@ int main(int argc, const char **argv) if (!m) return 1; - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fail_after(1.0)); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossless()); // mds does its own timeout/markdown + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0)); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossless()); // mds does its own timeout/markdown rank.start(); diff --git a/src/cmon.cc b/src/cmon.cc index d84e217e8a201..d75e861095824 100644 --- a/src/cmon.cc +++ b/src/cmon.cc @@ -122,6 +122,8 @@ int main(int argc, const char **argv) } // bind + SimpleMessenger rank; + cout << "starting mon" << whoami << " at " << monmap.get_inst(whoami).addr << " mon_data " << g_conf.mon_data @@ -141,12 +143,12 @@ int main(int argc, const char **argv) rank.start(); // may daemonize - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_ADMIN, Rank::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_ADMIN, SimpleMessenger::Policy::lossy_fast_fail()); mon->init(); diff --git a/src/cosd.cc b/src/cosd.cc index c49908d0741fa..0ff53dee2272b 100644 --- a/src/cosd.cc +++ b/src/cosd.cc @@ -122,6 +122,7 @@ int main(int argc, const char **argv) } // start up network + SimpleMessenger rank; rank.bind(); cout << "starting osd" << whoami @@ -142,14 +143,14 @@ int main(int argc, const char **argv) if (!hbm) return 1; - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); // make a _reasonable_ effort to send acks/replies to requests, but // don't get carried away, as the sender may go away and we won't // ever hear about it. - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_CLIENT, Rank::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail()); rank.start(); diff --git a/src/csyn.cc b/src/csyn.cc index 58110fa6a696a..62cd08a0bd3d5 100644 --- a/src/csyn.cc +++ b/src/csyn.cc @@ -56,12 +56,13 @@ int main(int argc, const char **argv, char *envp[]) return -1; // start up network + SimpleMessenger rank; rank.bind(); cout << "starting csyn at " << rank.get_rank_addr() << std::endl; - rank.set_policy(entity_name_t::TYPE_MON, Rank::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_MDS, Rank::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_OSD, Rank::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail()); + rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); + rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); list clients; list synclients; diff --git a/src/dumpjournal.cc b/src/dumpjournal.cc index 158ff642ffcff..00b39721027c5 100644 --- a/src/dumpjournal.cc +++ b/src/dumpjournal.cc @@ -88,6 +88,7 @@ int main(int argc, const char **argv, const char *envp[]) return -1; // start up network + SimpleMessenger rank; rank.bind(); g_conf.daemonize = false; // not us! rank.start(); diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 73888a98ceadc..65f7964083935 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -34,7 +34,9 @@ int MonClient::probe_mon(MonMap *pmonmap) cerr << "couldn't parse ip:port(s) from '" << g_conf.mon_host << "'" << std::endl; return -1; } - + + SimpleMessenger rank; + rank.bind(); dout(1) << " connecting to monitor(s) at " << monaddrs << " ..." << dendl; diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index b967ca0d83027..22822405ed098 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -38,9 +38,9 @@ #define DOUT_SUBSYS ms #undef dout_prefix -#define dout_prefix _prefix() -static ostream& _prefix() { - return *_dout << dbeginl << pthread_self() << " -- " << rank.rank_addr << " "; +#define dout_prefix _prefix(rank) +static ostream& _prefix(SimpleMessenger *rank) { + return *_dout << dbeginl << pthread_self() << " -- " << rank->rank_addr << " "; } @@ -53,8 +53,6 @@ static ostream& _prefix() { #define opened_socket() //dout(20) << "opened_socket " << ++sockopen << dendl; -Rank rank; - #ifdef DARWIN sig_t old_sigint_handler = 0; #else @@ -70,7 +68,7 @@ void noop_signal_handler(int s) //dout(0) << "blah_handler got " << s << dendl; } -int Rank::Accepter::bind(int64_t force_nonce) +int SimpleMessenger::Accepter::bind(int64_t force_nonce) { // bind to a socket dout(10) << "accepter.bind" << dendl; @@ -152,7 +150,7 @@ int Rank::Accepter::bind(int64_t force_nonce) return 0; } -int Rank::Accepter::start() +int SimpleMessenger::Accepter::start() { dout(1) << "accepter.start" << dendl; @@ -171,7 +169,7 @@ int Rank::Accepter::start() return 0; } -void *Rank::Accepter::entry() +void *SimpleMessenger::Accepter::entry() { dout(10) << "accepter starting" << dendl; @@ -229,7 +227,7 @@ void *Rank::Accepter::entry() return 0; } -void Rank::Accepter::stop() +void SimpleMessenger::Accepter::stop() { done = true; dout(10) << "stop sending SIGUSR1" << dendl; @@ -241,1887 +239,1889 @@ void Rank::Accepter::stop() -/******************************************** - * Rank - */ -/* - * note: assumes lock is held +/********************************** + * Endpoint */ -void Rank::reaper() -{ - dout(10) << "reaper" << dendl; - assert(lock.is_locked()); - - while (!pipe_reap_queue.empty()) { - Pipe *p = pipe_reap_queue.front(); - dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; - p->unregister_pipe(); - pipe_reap_queue.pop_front(); - assert(pipes.count(p)); - pipes.erase(p); - p->join(); - p->discard_queue(); - dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; - assert(p->sd < 0); - delete p; - dout(10) << "reaper deleted pipe " << p << dendl; - } -} - -int Rank::bind(int64_t force_nonce) +void SimpleMessenger::Endpoint::dispatch_entry() { lock.Lock(); - if (started) { - dout(10) << "rank.bind already started" << dendl; - lock.Unlock(); - return -1; + while (!stop) { + if (!dispatch_queue.empty()) { + list ls; + + // take highest priority message off the queue + map >::reverse_iterator p = dispatch_queue.rbegin(); + ls.push_back(p->second.front()); + p->second.pop_front(); + if (p->second.empty()) + dispatch_queue.erase(p->first); + qlen--; + + lock.Unlock(); + { + // deliver + while (!ls.empty()) { + if (stop) { + dout(1) << "dispatch: stop=true, discarding " << ls.size() + << " messages in dispatch queue" << dendl; + break; + } + Message *m = ls.front(); + ls.pop_front(); + if ((long)m == BAD_REMOTE_RESET) { + lock.Lock(); + entity_addr_t a = remote_reset_q.front().first; + entity_name_t n = remote_reset_q.front().second; + remote_reset_q.pop_front(); + lock.Unlock(); + get_dispatcher()->ms_handle_remote_reset(a, n); + } else if ((long)m == BAD_RESET) { + lock.Lock(); + entity_addr_t a = reset_q.front().first; + entity_name_t n = reset_q.front().second; + reset_q.pop_front(); + lock.Unlock(); + get_dispatcher()->ms_handle_reset(a, n); + } else if ((long)m == BAD_FAILED) { + lock.Lock(); + m = failed_q.front().first; + entity_inst_t i = failed_q.front().second; + failed_q.pop_front(); + lock.Unlock(); + get_dispatcher()->ms_handle_failure(m, i); + m->put(); + } else { + dout(1) << m->get_dest() + << " <== " << m->get_source_inst() + << " " << m->get_seq() + << " ==== " << *m + << " ==== " << m->get_payload().length() << "+" << m->get_data().length() + << " (" << m->get_footer().front_crc << " " << m->get_footer().data_crc << ")" + << " " << m + << dendl; + dispatch(m); + dout(20) << "done calling dispatch on " << m << dendl; + } + } + } + lock.Lock(); + continue; + } + cond.Wait(lock); } - dout(10) << "rank.bind" << dendl; lock.Unlock(); + dout(15) << "dispatch: ending loop " << dendl; - // bind to a socket - return accepter.bind(force_nonce); + // deregister + rank->unregister_entity(this); + put(); } +void SimpleMessenger::Endpoint::ready() +{ + dout(10) << "ready " << get_myaddr() << dendl; + assert(!dispatch_thread.is_started()); + get(); + dispatch_thread.create(); +} -class C_Die : public Context { -public: - void finish(int) { - cerr << "die" << std::endl; - exit(1); - } -}; -static void write_pid_file(int pid) +int SimpleMessenger::Endpoint::shutdown() { - if (!g_conf.pid_file) - return; - - int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644); - if (fd >= 0) { - char buf[20]; - int len = sprintf(buf, "%d\n", pid); - ::write(fd, buf, len); - ::close(fd); + dout(10) << "shutdown " << get_myaddr() << dendl; + + // stop my dispatch thread + if (dispatch_thread.am_self()) { + dout(10) << "shutdown i am dispatch, setting stop flag" << dendl; + stop = true; + } else { + dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl; + lock.Lock(); + stop = true; + cond.Signal(); + lock.Unlock(); } + return 0; } -static void remove_pid_file() +void SimpleMessenger::Endpoint::suicide() { - if (!g_conf.pid_file) - return; - - // only remove it if it has OUR pid in it! - int fd = ::open(g_conf.pid_file, O_RDONLY); - if (fd >= 0) { - char buf[20]; - ::read(fd, buf, 20); - ::close(fd); - int a = atoi(buf); + dout(10) << "suicide " << get_myaddr() << dendl; + shutdown(); + // hmm, or exit(0)? +} - if (a == getpid()) - ::unlink(g_conf.pid_file); - else - dout(0) << "strange, pid file " << g_conf.pid_file - << " has " << a << ", not expected " << getpid() - << dendl; +void SimpleMessenger::Endpoint::prepare_dest(const entity_inst_t& inst) +{ + rank->lock.Lock(); + { + if (rank->rank_pipe.count(inst.addr) == 0) + rank->connect_rank(inst.addr, rank->policy_map[inst.name.type()]); } + rank->lock.Unlock(); } -int Rank::start(bool nodaemon) +int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest) { - // register at least one entity, first! - assert(my_type >= 0); - - lock.Lock(); - if (started) { - dout(10) << "rank.start already started" << dendl; - lock.Unlock(); - return 0; - } + // set envelope + m->set_source_inst(_myinst); + m->set_orig_source_inst(_myinst); + m->set_dest_inst(dest); + if (!g_conf.ms_nocrc) + m->calc_data_crc(); + else + m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC; + if (!m->get_priority()) m->set_priority(get_default_send_priority()); + + dout(1) << m->get_source() + << " --> " << dest.name << " " << dest.addr + << " -- " << *m + << " -- ?+" << m->get_data().length() + << " (? " << m->get_footer().data_crc << ")" + << " " << m + << dendl; - dout(1) << "rank.start at " << rank_addr << dendl; - started = true; - lock.Unlock(); + rank->submit_message(m, dest.addr); - // daemonize? - if (g_conf.daemonize && !nodaemon) { - if (Thread::get_num_threads() > 0) { - derr(0) << "rank.start BUG: there are " << Thread::get_num_threads() - << " already started that will now die! call rank.start() sooner." - << dendl; - } - dout(1) << "rank.start daemonizing" << dendl; + return 0; +} - if (1) { - daemon(1, 0); - write_pid_file(getpid()); - } else { - pid_t pid = fork(); - if (pid) { - // i am parent - write_pid_file(pid); - ::close(0); - ::close(1); - ::close(2); - _exit(0); - } - } +int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest) +{ + // set envelope + m->set_source_inst(_myinst); + m->set_dest_inst(dest); + if (!g_conf.ms_nocrc) + m->calc_data_crc(); + else + m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC; + if (!m->get_priority()) m->set_priority(get_default_send_priority()); - if (g_conf.chdir && g_conf.chdir[0]) { - ::mkdir(g_conf.chdir, 0700); - ::chdir(g_conf.chdir); - } - - _dout_rename_output_file(); - } else if (g_daemon) { - write_pid_file(getpid()); - } + dout(1) << m->get_source() + << " **> " << dest.name << " " << dest.addr + << " -- " << *m + << " -- ?+" << m->get_data().length() + << " (? " << m->get_footer().data_crc << ")" + << " " << m + << dendl; - // some debug hackery? - if (g_conf.kill_after) - g_timer.add_event_after(g_conf.kill_after, new C_Die); + rank->submit_message(m, dest.addr); - // go! - accepter.start(); return 0; } -/* connect_rank - * NOTE: assumes rank.lock held. - */ -Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr, const Policy& p) + +int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) { - assert(lock.is_locked()); - assert(addr != rank_addr); - - dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; - - // create pipe - Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING); - pipe->policy = p; - pipe->peer_addr = addr; - pipe->start_writer(); - pipe->register_pipe(); - pipes.insert(pipe); - - return pipe; -} - + // set envelope + m->set_source_inst(_myinst); + m->set_orig_source_inst(_myinst); + m->set_dest_inst(dest); + if (!g_conf.ms_nocrc) + m->calc_data_crc(); + else + m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC; + if (!m->get_priority()) m->set_priority(get_default_send_priority()); + + dout(1) << "lazy " << m->get_source() + << " --> " << dest.name << " " << dest.addr + << " -- " << *m + << " -- ?+" << m->get_data().length() + << " (? " << m->get_footer().data_crc << ")" + << " " << m + << dendl; + rank->submit_message(m, dest.addr, true); + return 0; +} +void SimpleMessenger::Endpoint::reset_myname(entity_name_t newname) +{ + entity_name_t oldname = get_myname(); + dout(10) << "reset_myname " << oldname << " to " << newname << dendl; + _set_myname(newname); +} -/* register_entity - */ -Rank::Endpoint *Rank::register_entity(entity_name_t name) +void SimpleMessenger::Endpoint::mark_down(entity_addr_t a) { - dout(10) << "register_entity " << name << dendl; - lock.Lock(); - - // create messenger - int erank = max_local; - Endpoint *msgr = new Endpoint(this, name, erank); + rank->mark_down(a); +} - // now i know my type. - if (my_type >= 0) - assert(my_type == name.type()); - else - my_type = name.type(); - // add to directory - max_local++; - local.resize(max_local); - stopped.resize(max_local); - msgr->get(); - local[erank] = msgr; - stopped[erank] = false; - msgr->_myinst.addr = rank_addr; - if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr) - msgr->need_addr = true; - msgr->_myinst.addr.erank = erank; - dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr - << " need_addr=" << need_addr - << dendl; - num_local++; - - lock.Unlock(); - return msgr; -} +/************************************** + * Pipe + */ +#undef dout_prefix +#define dout_prefix _pipe_prefix() +ostream& SimpleMessenger::Pipe::_pipe_prefix() { + return *_dout << dbeginl << pthread_self() + << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this + << " sd=" << sd + << " pgs=" << peer_global_seq + << " cs=" << connect_seq + << ")."; +} -void Rank::unregister_entity(Endpoint *msgr) +int SimpleMessenger::Pipe::accept() { - lock.Lock(); - dout(10) << "unregister_entity " << msgr->get_myname() << dendl; - - // remove from local directory. - assert(msgr->my_rank >= 0); - assert(local[msgr->my_rank] == msgr); - local[msgr->my_rank] = 0; - stopped[msgr->my_rank] = true; - num_local--; - msgr->my_rank = -1; - - assert(msgr->nref.test() > 1); - msgr->put(); - - wait_cond.Signal(); + dout(10) << "accept" << dendl; - lock.Unlock(); -} + // my creater gave me sd via accept() + assert(state == STATE_ACCEPTING); + + // announce myself. + int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER)); + if (rc < 0) { + dout(10) << "accept couldn't write banner" << dendl; + state = STATE_CLOSED; + return -1; + } + // and my addr + rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr)); + if (rc < 0) { + dout(10) << "accept couldn't write my addr" << dendl; + state = STATE_CLOSED; + return -1; + } + dout(10) << "accept sd=" << sd << dendl; + + // identify peer + char banner[strlen(CEPH_BANNER)+1]; + rc = tcp_read(sd, banner, strlen(CEPH_BANNER)); + if (rc < 0) { + dout(10) << "accept couldn't read banner" << dendl; + state = STATE_CLOSED; + return -1; + } + if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { + banner[strlen(CEPH_BANNER)] = 0; + dout(10) << "accept peer sent bad banner '" << banner << "'" << dendl; + state = STATE_CLOSED; + return -1; + } + rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr)); + if (rc < 0) { + dout(10) << "accept couldn't read peer_addr" << dendl; + state = STATE_CLOSED; + return -1; + } + dout(10) << "accept peer addr is " << peer_addr << dendl; + if (peer_addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) { + // peer apparently doesn't know what ip they have; figure it out for them. + entity_addr_t old_addr = peer_addr; + socklen_t len = sizeof(peer_addr.ipaddr); + int r = ::getpeername(sd, (sockaddr*)&peer_addr.ipaddr, &len); + if (r < 0) { + dout(0) << "accept failed to getpeername " << errno << " " << strerror(errno) << dendl; + state = STATE_CLOSED; + return -1; + } + peer_addr.ipaddr.sin_port = old_addr.ipaddr.sin_port; + dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl; + } + + ceph_msg_connect connect; + ceph_msg_connect_reply reply; + Pipe *existing = 0; + + // this should roughly mirror pseudocode at + // http://ceph.newdream.net/wiki/Messaging_protocol -void Rank::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy) -{ - const entity_name_t dest = m->get_dest(); + while (1) { + rc = tcp_read(sd, (char*)&connect, sizeof(connect)); + if (rc < 0) { + dout(10) << "accept couldn't read connect" << dendl; + goto fail_unlocked; + } + dout(20) << "accept got peer connect_seq " << connect.connect_seq + << " global_seq " << connect.global_seq + << dendl; + + rank->lock.Lock(); - assert(m->nref.test() == 0); + // note peer's type, flags + policy = rank->policy_map[connect.host_type]; /* apply policy */ + lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY; - m->get_header().mon_protocol = CEPH_MON_PROTOCOL; - m->get_header().monc_protocol = CEPH_MONC_PROTOCOL; - m->get_header().mds_protocol = CEPH_MDS_PROTOCOL; - m->get_header().mdsc_protocol = CEPH_MDSC_PROTOCOL; - m->get_header().osd_protocol = CEPH_OSD_PROTOCOL; - m->get_header().osdc_protocol = CEPH_OSDC_PROTOCOL; + memset(&reply, 0, sizeof(reply)); - // lookup - entity_addr_t dest_proc_addr = dest_addr; - dest_proc_addr.erank = 0; + // existing? + if (rank->rank_pipe.count(peer_addr)) { + existing = rank->rank_pipe[peer_addr]; + existing->lock.Lock(); - lock.Lock(); - { - // local? - if (rank_addr.is_local_to(dest_addr)) { - if (dest_addr.erank < max_local && local[dest_addr.erank]) { - // local - dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl; - local[dest_addr.erank]->queue_message(m); + if (connect.global_seq < existing->peer_global_seq) { + dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq + << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; + reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL; + reply.global_seq = existing->peer_global_seq; // so we can send it below.. + existing->lock.Unlock(); + rank->lock.Unlock(); + goto reply; } else { - derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl; - //assert(0); // hmpf, this is probably mds->mon beacon from newsyn. - delete m; + dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq + << " <= " << connect.global_seq << ", looks ok" << dendl; } - } - else { - // remote. - Pipe *pipe = 0; - if (rank_pipe.count( dest_proc_addr )) { - // connected? - pipe = rank_pipe[ dest_proc_addr ]; - pipe->lock.Lock(); - if (pipe->state == Pipe::STATE_CLOSED) { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl; - pipe->unregister_pipe(); - pipe->lock.Unlock(); - pipe = 0; + + if (existing->policy.lossy_tx) { + dout(-10) << "accept replacing existing (lossy) channel" << dendl; + existing->was_session_reset(); + goto replace; + } + if (lossy_rx) { + if (existing->state == STATE_STANDBY) { + dout(-10) << "accept incoming lossy connection, kicking outgoing lossless " + << existing << dendl; + existing->state = STATE_CONNECTING; + existing->cond.Signal(); } else { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl; - - // if this pipe was created by an incoming connection, but we haven't received - // a message yet, then it won't have the policy set. - if (pipe->get_out_seq() == 0) - pipe->policy = policy_map[m->get_dest().type()]; - - pipe->_send(m); - pipe->lock.Unlock(); + dout(-10) << "accept incoming lossy connection, our lossless " << existing + << " has state " << existing->state << ", doing nothing" << dendl; } + existing->lock.Unlock(); + goto fail; } - if (!pipe) { - if (lazy) { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl; - delete m; - } else { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl; - // not connected. - pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]); - pipe->send(m); + + dout(-10) << "accept connect_seq " << connect.connect_seq + << " vs existing " << existing->connect_seq + << " state " << existing->state << dendl; + + if (connect.connect_seq < existing->connect_seq) { + if (connect.connect_seq == 0) { + dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl; + existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s + goto replace; + } else { + // old attempt, or we sent READY but they didn't get it. + dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq + << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; + reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; + reply.connect_seq = existing->connect_seq; // so we can send it below.. + existing->lock.Unlock(); + rank->lock.Unlock(); + goto reply; + } + } + + if (connect.connect_seq == existing->connect_seq) { + // connection race? + if (peer_addr < rank->rank_addr) { + // incoming wins + dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq + << " == " << connect.connect_seq << ", replacing my attempt" << dendl; + assert(existing->state == STATE_CONNECTING || + existing->state == STATE_STANDBY || + existing->state == STATE_WAIT); + goto replace; + } else { + // our existing outgoing wins + dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq + << " == " << connect.connect_seq << ", sending WAIT" << dendl; + assert(peer_addr > rank->rank_addr); + assert(existing->state == STATE_CONNECTING); // this will win + reply.tag = CEPH_MSGR_TAG_WAIT; + existing->lock.Unlock(); + rank->lock.Unlock(); + goto reply; } } + + assert(connect.connect_seq > existing->connect_seq); + assert(connect.global_seq >= existing->peer_global_seq); + if (existing->connect_seq == 0) { + dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq + << ", " << existing << ".cseq = " << existing->connect_seq + << "), sending RESETSESSION" << dendl; + reply.tag = CEPH_MSGR_TAG_RESETSESSION; + rank->lock.Unlock(); + existing->lock.Unlock(); + goto reply; + } + + // reconnect + dout(10) << "accept peer sent cseq " << connect.connect_seq + << " > " << existing->connect_seq << dendl; + goto replace; + } // existing + else if (connect.connect_seq > 0) { + // we reset, and they are opening a new session + dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; + rank->lock.Unlock(); + reply.tag = CEPH_MSGR_TAG_RESETSESSION; + goto reply; + } else { + // new session + dout(10) << "accept new session" << dendl; + goto open; } - } + assert(0); - lock.Unlock(); -} + reply: + rc = tcp_write(sd, (char*)&reply, sizeof(reply)); + if (rc < 0) + goto fail_unlocked; + } + + replace: + dout(10) << "accept replacing " << existing << dendl; + existing->state = STATE_CLOSED; + existing->cond.Signal(); + existing->reader_thread.kill(SIGUSR1); + existing->unregister_pipe(); + + // steal queue and out_seq + existing->requeue_sent(); + out_seq = existing->out_seq; + in_seq = existing->in_seq; + dout(10) << "accept out_seq " << out_seq << " in_seq " << in_seq << dendl; + for (map >::iterator p = existing->q.begin(); + p != existing->q.end(); + p++) + q[p->first].splice(q[p->first].begin(), p->second); + + existing->lock.Unlock(); + open: + // open + connect_seq = connect.connect_seq + 1; + peer_global_seq = connect.global_seq; + dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; + // send READY reply + reply.tag = CEPH_MSGR_TAG_READY; + reply.global_seq = rank->get_global_seq(); + reply.connect_seq = connect_seq; + reply.flags = 0; + if (policy.lossy_tx) + reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; + // ok! + register_pipe(); + rank->lock.Unlock(); + rc = tcp_write(sd, (char*)&reply, sizeof(reply)); + if (rc < 0) + goto fail; -void Rank::wait() -{ lock.Lock(); - while (1) { - // reap dead pipes - reaper(); - - if (num_local == 0) { - dout(10) << "wait: everything stopped" << dendl; - break; // everything stopped. - } else { - dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl; - } - - wait_cond.Wait(lock); + if (state != STATE_CLOSED) { + dout(10) << "accept starting writer, " << "state=" << state << dendl; + start_writer(); } + dout(20) << "accept done" << dendl; lock.Unlock(); - - // done! clean up. - dout(20) << "wait: stopping accepter thread" << dendl; - accepter.stop(); - dout(20) << "wait: stopped accepter thread" << dendl; + return 0; // success. - // close+reap all pipes - lock.Lock(); - { - dout(10) << "wait: closing pipes" << dendl; - list toclose; - for (hash_map::iterator i = rank_pipe.begin(); - i != rank_pipe.end(); - i++) - toclose.push_back(i->second); - for (list::iterator i = toclose.begin(); - i != toclose.end(); - i++) { - (*i)->unregister_pipe(); - (*i)->lock.Lock(); - (*i)->stop(); - (*i)->lock.Unlock(); - } - reaper(); - dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl; - while (!pipes.empty()) { - wait_cond.Wait(lock); - reaper(); - } - } + fail: + rank->lock.Unlock(); + fail_unlocked: + lock.Lock(); + state = STATE_CLOSED; + fault(); lock.Unlock(); - - dout(10) << "wait: done." << dendl; - dout(1) << "shutdown complete." << dendl; - remove_pid_file(); - started = false; - my_type = -1; + return -1; } +int SimpleMessenger::Pipe::connect() +{ + dout(10) << "connect " << connect_seq << dendl; + assert(lock.is_locked()); + if (sd >= 0) { + ::close(sd); + sd = -1; + closed_socket(); + } + __u32 cseq = connect_seq; + __u32 gseq = rank->get_global_seq(); + // stop reader thrad + join_reader(); + lock.Unlock(); + + char tag = -1; + int rc; + struct sockaddr_in myAddr; + struct msghdr msg; + struct iovec msgvec[2]; + int msglen; + char banner[strlen(CEPH_BANNER)]; + entity_addr_t paddr; + // create socket? + sd = ::socket(AF_INET, SOCK_STREAM, 0); + if (sd < 0) { + dout(-1) << "connect couldn't created socket " << strerror(errno) << dendl; + assert(0); + goto fail; + } + opened_socket(); + + // bind any port + myAddr.sin_family = AF_INET; + myAddr.sin_addr.s_addr = htonl(INADDR_ANY); + myAddr.sin_port = htons( 0 ); + dout(10) << "binding to " << myAddr << dendl; + rc = ::bind(sd, (struct sockaddr *)&myAddr, sizeof(myAddr)); + if (rc < 0) { + dout(2) << "bind error " << myAddr + << ", " << errno << ": " << strerror(errno) << dendl; + goto fail; + } -/********************************** - * Endpoint - */ - -void Rank::Endpoint::dispatch_entry() -{ - lock.Lock(); - while (!stop) { - if (!dispatch_queue.empty()) { - list ls; + // connect! + dout(10) << "connecting to " << peer_addr.ipaddr << dendl; + rc = ::connect(sd, (sockaddr*)&peer_addr.ipaddr, sizeof(peer_addr.ipaddr)); + if (rc < 0) { + dout(2) << "connect error " << peer_addr.ipaddr + << ", " << errno << ": " << strerror(errno) << dendl; + goto fail; + } - // take highest priority message off the queue - map >::reverse_iterator p = dispatch_queue.rbegin(); - ls.push_back(p->second.front()); - p->second.pop_front(); - if (p->second.empty()) - dispatch_queue.erase(p->first); - qlen--; + // disable Nagle algorithm? + if (g_conf.ms_tcp_nodelay) { + int flag = 1; + int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); + if (r < 0) + dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl; + } - lock.Unlock(); - { - // deliver - while (!ls.empty()) { - if (stop) { - dout(1) << "dispatch: stop=true, discarding " << ls.size() - << " messages in dispatch queue" << dendl; - break; - } - Message *m = ls.front(); - ls.pop_front(); - if ((long)m == BAD_REMOTE_RESET) { - lock.Lock(); - entity_addr_t a = remote_reset_q.front().first; - entity_name_t n = remote_reset_q.front().second; - remote_reset_q.pop_front(); - lock.Unlock(); - get_dispatcher()->ms_handle_remote_reset(a, n); - } else if ((long)m == BAD_RESET) { - lock.Lock(); - entity_addr_t a = reset_q.front().first; - entity_name_t n = reset_q.front().second; - reset_q.pop_front(); - lock.Unlock(); - get_dispatcher()->ms_handle_reset(a, n); - } else if ((long)m == BAD_FAILED) { - lock.Lock(); - m = failed_q.front().first; - entity_inst_t i = failed_q.front().second; - failed_q.pop_front(); - lock.Unlock(); - get_dispatcher()->ms_handle_failure(m, i); - m->put(); - } else { - dout(1) << m->get_dest() - << " <== " << m->get_source_inst() - << " " << m->get_seq() - << " ==== " << *m - << " ==== " << m->get_payload().length() << "+" << m->get_data().length() - << " (" << m->get_footer().front_crc << " " << m->get_footer().data_crc << ")" - << " " << m - << dendl; - dispatch(m); - dout(20) << "done calling dispatch on " << m << dendl; - } - } - } - lock.Lock(); + // verify banner + // FIXME: this should be non-blocking, or in some other way verify the banner as we get it. + rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER)); + if (rc < 0) { + dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl; + goto fail; + } + if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { + dout(0) << "connect protocol error (bad banner) on peer " << paddr << dendl; + goto fail; + } + + memset(&msg, 0, sizeof(msg)); + msgvec[0].iov_base = banner; + msgvec[0].iov_len = strlen(CEPH_BANNER); + msg.msg_iov = msgvec; + msg.msg_iovlen = 1; + msglen = msgvec[0].iov_len; + if (do_sendmsg(sd, &msg, msglen)) { + dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl; + goto fail; + } + + // identify peer + rc = tcp_read(sd, (char*)&paddr, sizeof(paddr)); + if (rc < 0) { + dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl; + goto fail; + } + dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl; + if (!peer_addr.is_local_to(paddr)) { + if (paddr.ipaddr.sin_addr.s_addr == 0 && + peer_addr.ipaddr.sin_port == paddr.ipaddr.sin_port && + peer_addr.nonce == paddr.nonce) { + dout(0) << "connect claims to be " + << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl; + } else { + dout(0) << "connect claims to be " + << paddr << " not " << peer_addr << " - wrong node!" << dendl; + goto fail; + } + } + + // identify myself + memset(&msg, 0, sizeof(msg)); + msgvec[0].iov_base = (char*)&rank->rank_addr; + msgvec[0].iov_len = sizeof(rank->rank_addr); + msg.msg_iov = msgvec; + msg.msg_iovlen = 1; + msglen = msgvec[0].iov_len; + if (do_sendmsg(sd, &msg, msglen)) { + dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl; + goto fail; + } + dout(10) << "connect sent my addr " << rank->rank_addr << dendl; + + while (1) { + ceph_msg_connect connect; + connect.host_type = rank->my_type; + connect.global_seq = gseq; + connect.connect_seq = cseq; + connect.flags = 0; + if (policy.lossy_tx) + connect.flags |= CEPH_MSG_CONNECT_LOSSY; + memset(&msg, 0, sizeof(msg)); + msgvec[0].iov_base = (char*)&connect; + msgvec[0].iov_len = sizeof(connect); + msg.msg_iov = msgvec; + msg.msg_iovlen = 1; + msglen = msgvec[0].iov_len; + + dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq << dendl; + if (do_sendmsg(sd, &msg, msglen)) { + dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl; + goto fail; + } + + dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl; + ceph_msg_connect_reply reply; + if (tcp_read(sd, (char*)&reply, sizeof(reply)) < 0) { + dout(2) << "connect read reply " << strerror(errno) << dendl; + goto fail; + } + dout(20) << "connect got reply tag " << (int)reply.tag + << " connect_seq " << reply.connect_seq + << " global_seq " << reply.global_seq + << " flags " << (int)reply.flags + << dendl; + + lock.Lock(); + if (state != STATE_CONNECTING) { + dout(0) << "connect got RESETSESSION but no longer connecting" << dendl; + goto stop_locked; + } + + if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) { + dout(0) << "connect got RESETSESSION" << dendl; + was_session_reset(); + cseq = 0; + lock.Unlock(); continue; } - cond.Wait(lock); + if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { + gseq = rank->get_global_seq(reply.global_seq); + dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq + << " chose new " << gseq << dendl; + lock.Unlock(); + continue; + } + if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { + assert(reply.connect_seq > connect_seq); + dout(10) << "connect got RETRY_SESSION " << connect_seq + << " -> " << reply.connect_seq << dendl; + cseq = connect_seq = reply.connect_seq; + lock.Unlock(); + continue; + } + + if (reply.tag == CEPH_MSGR_TAG_WAIT) { + dout(3) << "connect got WAIT (connection race)" << dendl; + state = STATE_WAIT; + goto stop_locked; + } + + if (reply.tag == CEPH_MSGR_TAG_READY) { + // hooray! + peer_global_seq = reply.global_seq; + lossy_rx = reply.flags & CEPH_MSG_CONNECT_LOSSY; + state = STATE_OPEN; + connect_seq = cseq + 1; + assert(connect_seq == reply.connect_seq); + first_fault = last_attempt = utime_t(); + dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl; + + if (!reader_running) { + dout(20) << "connect starting reader" << dendl; + start_reader(); + } + return 0; + } + + // protocol error + dout(0) << "connect got bad tag " << (int)tag << dendl; + goto fail_locked; } - lock.Unlock(); - dout(15) << "dispatch: ending loop " << dendl; - // deregister - rank->unregister_entity(this); - put(); + fail: + lock.Lock(); + fail_locked: + if (state == STATE_CONNECTING) + fault(); + else + dout(3) << "connect fault, but state != connecting, stopping" << dendl; + + stop_locked: + return -1; } -void Rank::Endpoint::ready() +void SimpleMessenger::Pipe::register_pipe() { - dout(10) << "ready " << get_myaddr() << dendl; - assert(!dispatch_thread.is_started()); - get(); - dispatch_thread.create(); + dout(10) << "register_pipe" << dendl; + assert(rank->lock.is_locked()); + assert(rank->rank_pipe.count(peer_addr) == 0); + rank->rank_pipe[peer_addr] = this; } - -int Rank::Endpoint::shutdown() +void SimpleMessenger::Pipe::unregister_pipe() { - dout(10) << "shutdown " << get_myaddr() << dendl; - - // stop my dispatch thread - if (dispatch_thread.am_self()) { - dout(10) << "shutdown i am dispatch, setting stop flag" << dendl; - stop = true; + assert(rank->lock.is_locked()); + if (rank->rank_pipe.count(peer_addr) && + rank->rank_pipe[peer_addr] == this) { + dout(10) << "unregister_pipe" << dendl; + rank->rank_pipe.erase(peer_addr); } else { - dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl; - lock.Lock(); - stop = true; - cond.Signal(); - lock.Unlock(); + dout(10) << "unregister_pipe - not registered" << dendl; } - return 0; } -void Rank::Endpoint::suicide() -{ - dout(10) << "suicide " << get_myaddr() << dendl; - shutdown(); - // hmm, or exit(0)? -} -void Rank::Endpoint::prepare_dest(const entity_inst_t& inst) +void SimpleMessenger::Pipe::requeue_sent() { - rank->lock.Lock(); - { - if (rank->rank_pipe.count(inst.addr) == 0) - rank->connect_rank(inst.addr, rank->policy_map[inst.name.type()]); + if (sent.empty()) + return; + + list& rq = q[CEPH_MSG_PRIO_HIGHEST]; + while (!sent.empty()) { + Message *m = sent.back(); + sent.pop_back(); + dout(10) << "requeue_sent " << *m << " for resend seq " << out_seq + << " (" << m->get_seq() << ")" << dendl; + rq.push_front(m); + out_seq--; } - rank->lock.Unlock(); } -int Rank::Endpoint::send_message(Message *m, entity_inst_t dest) +void SimpleMessenger::Pipe::discard_queue() { - // set envelope - m->set_source_inst(_myinst); - m->set_orig_source_inst(_myinst); - m->set_dest_inst(dest); - if (!g_conf.ms_nocrc) - m->calc_data_crc(); - else - m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC; - if (!m->get_priority()) m->set_priority(get_default_send_priority()); - - dout(1) << m->get_source() - << " --> " << dest.name << " " << dest.addr - << " -- " << *m - << " -- ?+" << m->get_data().length() - << " (? " << m->get_footer().data_crc << ")" - << " " << m - << dendl; - - rank->submit_message(m, dest.addr); - - return 0; + dout(10) << "discard_queue" << dendl; + for (list::iterator p = sent.begin(); p != sent.end(); p++) + (*p)->put(); + sent.clear(); + for (map >::iterator p = q.begin(); p != q.end(); p++) + for (list::iterator r = p->second.begin(); r != p->second.end(); r++) + (*r)->put(); + q.clear(); } -int Rank::Endpoint::forward_message(Message *m, entity_inst_t dest) + +void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) { - // set envelope - m->set_source_inst(_myinst); - m->set_dest_inst(dest); - if (!g_conf.ms_nocrc) - m->calc_data_crc(); - else - m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC; - if (!m->get_priority()) m->set_priority(get_default_send_priority()); - - dout(1) << m->get_source() - << " **> " << dest.name << " " << dest.addr - << " -- " << *m - << " -- ?+" << m->get_data().length() - << " (? " << m->get_footer().data_crc << ")" - << " " << m - << dendl; + assert(lock.is_locked()); + cond.Signal(); - rank->submit_message(m, dest.addr); + if (onread && state == STATE_CONNECTING) { + dout(10) << "fault already connecting, reader shutting down" << dendl; + return; + } - return 0; -} + if (!onconnect) dout(2) << "fault " << errno << ": " << strerror(errno) << dendl; + if (state == STATE_CLOSED || + state == STATE_CLOSING) { + dout(10) << "fault already closed|closing" << dendl; + return; + } + if (sd >= 0) { + ::close(sd); + sd = -1; + closed_socket(); + } -int Rank::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) -{ - // set envelope - m->set_source_inst(_myinst); - m->set_orig_source_inst(_myinst); - m->set_dest_inst(dest); - if (!g_conf.ms_nocrc) - m->calc_data_crc(); - else - m->get_footer().flags = (unsigned)m->get_footer().flags | CEPH_MSG_FOOTER_NOCRC; - if (!m->get_priority()) m->set_priority(get_default_send_priority()); - - dout(1) << "lazy " << m->get_source() - << " --> " << dest.name << " " << dest.addr - << " -- " << *m - << " -- ?+" << m->get_data().length() - << " (? " << m->get_footer().data_crc << ")" - << " " << m - << dendl; + // lossy channel? + if (policy.lossy_tx) { + dout(10) << "fault on lossy channel, failing" << dendl; + fail(); + return; + } - rank->submit_message(m, dest.addr, true); + // requeue sent items + requeue_sent(); - return 0; + if (q.empty()) { + if (state == STATE_CLOSING || onconnect) { + dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl; + state = STATE_CLOSED; + } else { + dout(0) << "fault nothing to send, going to standby" << dendl; + state = STATE_STANDBY; + } + return; + } + + utime_t now = g_clock.now(); + if (state != STATE_CONNECTING) { + if (!onconnect) dout(0) << "fault initiating reconnect" << dendl; + connect_seq++; + state = STATE_CONNECTING; + first_fault = now; + } else if (first_fault.sec() == 0) { + if (!onconnect) dout(0) << "fault first fault" << dendl; + first_fault = now; + } else { + utime_t failinterval = now - first_fault; + utime_t retryinterval = now - last_attempt; + if (!onconnect) dout(10) << "fault failure was " << failinterval + << " ago, last attempt was at " << last_attempt + << ", " << retryinterval << " ago" << dendl; + if (policy.fail_interval > 0 && failinterval > policy.fail_interval) { + // give up + dout(0) << "fault giving up" << dendl; + fail(); + } else if (retryinterval < policy.retry_interval) { + // wait + now += (policy.retry_interval - retryinterval); + dout(10) << "fault waiting until " << now << dendl; + cond.WaitUntil(lock, now); + dout(10) << "fault done waiting or woke up" << dendl; + } + } + last_attempt = now; } +void SimpleMessenger::Pipe::fail() +{ + derr(10) << "fail" << dendl; + assert(lock.is_locked()); + stop(); + report_failures(); -void Rank::Endpoint::reset_myname(entity_name_t newname) -{ - entity_name_t oldname = get_myname(); - dout(10) << "reset_myname " << oldname << " to " << newname << dendl; - _set_myname(newname); -} + for (unsigned i=0; ilocal.size(); i++) + if (rank->local[i] && rank->local[i]->get_dispatcher()) + rank->local[i]->queue_reset(peer_addr, last_dest_name); + // unregister + lock.Unlock(); + rank->lock.Lock(); + unregister_pipe(); + rank->lock.Unlock(); + lock.Lock(); +} -void Rank::Endpoint::mark_down(entity_addr_t a) +void SimpleMessenger::Pipe::was_session_reset() { - rank->mark_down(a); + assert(lock.is_locked()); + + dout(10) << "was_session_reset" << dendl; + report_failures(); + for (unsigned i=0; ilocal.size(); i++) + if (rank->local[i] && rank->local[i]->get_dispatcher()) + rank->local[i]->queue_remote_reset(peer_addr, last_dest_name); + + out_seq = 0; + in_seq = 0; + connect_seq = 0; } -void Rank::mark_down(entity_addr_t addr) +void SimpleMessenger::Pipe::report_failures() { - lock.Lock(); - if (rank_pipe.count(addr)) { - Pipe *p = rank_pipe[addr]; - dout(1) << "mark_down " << addr << " -- " << p << dendl; - p->unregister_pipe(); - p->lock.Lock(); - p->stop(); - p->lock.Unlock(); - } else { - dout(1) << "mark_down " << addr << " -- pipe dne" << dendl; + // report failures + q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), sent); + while (1) { + Message *m = _get_next_outgoing(); + if (!m) + break; + + if (policy.drop_msg_callback) { + unsigned srcrank = m->get_source_inst().addr.erank; + if (srcrank >= rank->max_local || rank->local[srcrank] == 0) { + dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl; + } else if (rank->local[srcrank]->is_stopped()) { + dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl; + } else { + dout(10) << "fail on " << *m << dendl; + rank->local[srcrank]->queue_failure(m, m->get_dest_inst()); + } + } + m->put(); } - lock.Unlock(); } +void SimpleMessenger::Pipe::stop() +{ + dout(10) << "stop" << dendl; + state = STATE_CLOSED; + cond.Signal(); + if (reader_running) + reader_thread.kill(SIGUSR1); + if (writer_running) + writer_thread.kill(SIGUSR1); +} +/* read msgs from socket. + * also, server. + */ +void SimpleMessenger::Pipe::reader() +{ + if (state == STATE_ACCEPTING) + accept(); + lock.Lock(); -/************************************** - * Pipe - */ + // loop. + while (state != STATE_CLOSED && + state != STATE_CONNECTING) { + assert(lock.is_locked()); -#undef dout_prefix -#define dout_prefix _pipe_prefix() -ostream& Rank::Pipe::_pipe_prefix() { - return *_dout << dbeginl << pthread_self() - << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this - << " sd=" << sd - << " pgs=" << peer_global_seq - << " cs=" << connect_seq - << ")."; -} + // sleep if (re)connecting + if (state == STATE_STANDBY) { + dout(20) << "reader sleeping during reconnect|standby" << dendl; + cond.Wait(lock); + continue; + } -int Rank::Pipe::accept() -{ - dout(10) << "accept" << dendl; + lock.Unlock(); - // my creater gave me sd via accept() - assert(state == STATE_ACCEPTING); - - // announce myself. - int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER)); - if (rc < 0) { - dout(10) << "accept couldn't write banner" << dendl; - state = STATE_CLOSED; - return -1; - } + char tag = -1; + dout(20) << "reader reading tag..." << dendl; + int rc = tcp_read(sd, (char*)&tag, 1); + if (rc < 0) { + lock.Lock(); + dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl; + fault(false, true); + continue; + } - // and my addr - rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr)); - if (rc < 0) { - dout(10) << "accept couldn't write my addr" << dendl; - state = STATE_CLOSED; - return -1; - } - dout(10) << "accept sd=" << sd << dendl; - - // identify peer - char banner[strlen(CEPH_BANNER)+1]; - rc = tcp_read(sd, banner, strlen(CEPH_BANNER)); - if (rc < 0) { - dout(10) << "accept couldn't read banner" << dendl; - state = STATE_CLOSED; - return -1; - } - if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { - banner[strlen(CEPH_BANNER)] = 0; - dout(10) << "accept peer sent bad banner '" << banner << "'" << dendl; - state = STATE_CLOSED; - return -1; - } - rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr)); - if (rc < 0) { - dout(10) << "accept couldn't read peer_addr" << dendl; - state = STATE_CLOSED; - return -1; - } - dout(10) << "accept peer addr is " << peer_addr << dendl; - if (peer_addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) { - // peer apparently doesn't know what ip they have; figure it out for them. - entity_addr_t old_addr = peer_addr; - socklen_t len = sizeof(peer_addr.ipaddr); - int r = ::getpeername(sd, (sockaddr*)&peer_addr.ipaddr, &len); - if (r < 0) { - dout(0) << "accept failed to getpeername " << errno << " " << strerror(errno) << dendl; - state = STATE_CLOSED; - return -1; - } - peer_addr.ipaddr.sin_port = old_addr.ipaddr.sin_port; - dout(2) << "accept peer says " << old_addr << ", socket says " << peer_addr << dendl; - } - - ceph_msg_connect connect; - ceph_msg_connect_reply reply; - Pipe *existing = 0; - - // this should roughly mirror pseudocode at - // http://ceph.newdream.net/wiki/Messaging_protocol - - while (1) { - rc = tcp_read(sd, (char*)&connect, sizeof(connect)); - if (rc < 0) { - dout(10) << "accept couldn't read connect" << dendl; - goto fail_unlocked; - } - dout(20) << "accept got peer connect_seq " << connect.connect_seq - << " global_seq " << connect.global_seq - << dendl; - - rank->lock.Lock(); - - // note peer's type, flags - policy = rank->policy_map[connect.host_type]; /* apply policy */ - lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY; - - memset(&reply, 0, sizeof(reply)); - - // existing? - if (rank->rank_pipe.count(peer_addr)) { - existing = rank->rank_pipe[peer_addr]; - existing->lock.Lock(); - - if (connect.global_seq < existing->peer_global_seq) { - dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq - << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; - reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL; - reply.global_seq = existing->peer_global_seq; // so we can send it below.. - existing->lock.Unlock(); - rank->lock.Unlock(); - goto reply; - } else { - dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq - << " <= " << connect.global_seq << ", looks ok" << dendl; - } - - if (existing->policy.lossy_tx) { - dout(-10) << "accept replacing existing (lossy) channel" << dendl; - existing->was_session_reset(); - goto replace; - } - if (lossy_rx) { - if (existing->state == STATE_STANDBY) { - dout(-10) << "accept incoming lossy connection, kicking outgoing lossless " - << existing << dendl; - existing->state = STATE_CONNECTING; - existing->cond.Signal(); - } else { - dout(-10) << "accept incoming lossy connection, our lossless " << existing - << " has state " << existing->state << ", doing nothing" << dendl; + // open ... + if (tag == CEPH_MSGR_TAG_ACK) { + dout(20) << "reader got ACK" << dendl; + __u32 seq; + int rc = tcp_read( sd, (char*)&seq, sizeof(seq)); + lock.Lock(); + if (rc < 0) { + dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl; + fault(false, true); + } else if (state != STATE_CLOSED) { + dout(15) << "reader got ack seq " << seq << dendl; + // trim sent list + while (!sent.empty() && + sent.front()->get_seq() <= seq) { + Message *m = sent.front(); + sent.pop_front(); + dout(10) << "reader got ack seq " + << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl; + m->put(); } - existing->lock.Unlock(); - goto fail; } + continue; + } - dout(-10) << "accept connect_seq " << connect.connect_seq - << " vs existing " << existing->connect_seq - << " state " << existing->state << dendl; - - if (connect.connect_seq < existing->connect_seq) { - if (connect.connect_seq == 0) { - dout(-10) << "accept peer reset, then tried to connect to us, replacing" << dendl; - existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s - goto replace; - } else { - // old attempt, or we sent READY but they didn't get it. - dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq - << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; - reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; - reply.connect_seq = existing->connect_seq; // so we can send it below.. - existing->lock.Unlock(); - rank->lock.Unlock(); - goto reply; - } + else if (tag == CEPH_MSGR_TAG_MSG) { + dout(20) << "reader got MSG" << dendl; + Message *m = read_message(); + lock.Lock(); + + if (!m) { + derr(2) << "reader read null message, " << strerror(errno) << dendl; + fault(false, true); + continue; } - if (connect.connect_seq == existing->connect_seq) { - // connection race? - if (peer_addr < rank->rank_addr) { - // incoming wins - dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq - << " == " << connect.connect_seq << ", replacing my attempt" << dendl; - assert(existing->state == STATE_CONNECTING || - existing->state == STATE_STANDBY || - existing->state == STATE_WAIT); - goto replace; - } else { - // our existing outgoing wins - dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq - << " == " << connect.connect_seq << ", sending WAIT" << dendl; - assert(peer_addr > rank->rank_addr); - assert(existing->state == STATE_CONNECTING); // this will win - reply.tag = CEPH_MSGR_TAG_WAIT; - existing->lock.Unlock(); - rank->lock.Unlock(); - goto reply; - } - } + if (state == STATE_CLOSED || + state == STATE_CONNECTING) + continue; - assert(connect.connect_seq > existing->connect_seq); - assert(connect.global_seq >= existing->peer_global_seq); - if (existing->connect_seq == 0) { - dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq - << ", " << existing << ".cseq = " << existing->connect_seq - << "), sending RESETSESSION" << dendl; - reply.tag = CEPH_MSGR_TAG_RESETSESSION; - rank->lock.Unlock(); - existing->lock.Unlock(); - goto reply; + // check received seq# + if (m->get_seq() <= in_seq) { + dout(-10) << "reader got old message " + << m->get_seq() << " <= " << in_seq << " " << m << " " << *m + << " for " << m->get_dest() + << ", discarding" << dendl; + delete m; + continue; } + in_seq++; - // reconnect - dout(10) << "accept peer sent cseq " << connect.connect_seq - << " > " << existing->connect_seq << dendl; - goto replace; - } // existing - else if (connect.connect_seq > 0) { - // we reset, and they are opening a new session - dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; - rank->lock.Unlock(); - reply.tag = CEPH_MSGR_TAG_RESETSESSION; - goto reply; - } else { - // new session - dout(10) << "accept new session" << dendl; - goto open; - } - assert(0); - - reply: - rc = tcp_write(sd, (char*)&reply, sizeof(reply)); - if (rc < 0) - goto fail_unlocked; - } - - replace: - dout(10) << "accept replacing " << existing << dendl; - existing->state = STATE_CLOSED; - existing->cond.Signal(); - existing->reader_thread.kill(SIGUSR1); - existing->unregister_pipe(); - - // steal queue and out_seq - existing->requeue_sent(); - out_seq = existing->out_seq; - in_seq = existing->in_seq; - dout(10) << "accept out_seq " << out_seq << " in_seq " << in_seq << dendl; - for (map >::iterator p = existing->q.begin(); - p != existing->q.end(); - p++) - q[p->first].splice(q[p->first].begin(), p->second); - - existing->lock.Unlock(); - - open: - // open - connect_seq = connect.connect_seq + 1; - peer_global_seq = connect.global_seq; - dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; - - // send READY reply - reply.tag = CEPH_MSGR_TAG_READY; - reply.global_seq = rank->get_global_seq(); - reply.connect_seq = connect_seq; - reply.flags = 0; - if (policy.lossy_tx) - reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; - - // ok! - register_pipe(); - rank->lock.Unlock(); - - rc = tcp_write(sd, (char*)&reply, sizeof(reply)); - if (rc < 0) - goto fail; - - lock.Lock(); - if (state != STATE_CLOSED) { - dout(10) << "accept starting writer, " << "state=" << state << dendl; - start_writer(); - } - dout(20) << "accept done" << dendl; - lock.Unlock(); - return 0; // success. - - - fail: - rank->lock.Unlock(); - fail_unlocked: - lock.Lock(); - state = STATE_CLOSED; - fault(); - lock.Unlock(); - return -1; -} - -int Rank::Pipe::connect() -{ - dout(10) << "connect " << connect_seq << dendl; - assert(lock.is_locked()); - - if (sd >= 0) { - ::close(sd); - sd = -1; - closed_socket(); - } - __u32 cseq = connect_seq; - __u32 gseq = rank->get_global_seq(); + if (!lossy_rx && in_seq != m->get_seq()) { + dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq + << " for " << *m << " from " << m->get_source() << dendl; + derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq + << " for " << *m << " from " << m->get_source() << dendl; + assert(in_seq == m->get_seq()); // for now! + fault(false, true); + delete m; + continue; + } - // stop reader thrad - join_reader(); + cond.Signal(); // wake up writer, to ack this + lock.Unlock(); + + dout(10) << "reader got message " + << m->get_seq() << " " << m << " " << *m + << " for " << m->get_dest() << dendl; + + // deliver + Endpoint *entity = 0; + + rank->lock.Lock(); + { + unsigned erank = m->get_dest_inst().addr.erank; + if (erank < rank->max_local && rank->local[erank]) { + // find entity + entity = rank->local[erank]; + entity->get(); - lock.Unlock(); - - char tag = -1; - int rc; - struct sockaddr_in myAddr; - struct msghdr msg; - struct iovec msgvec[2]; - int msglen; - char banner[strlen(CEPH_BANNER)]; - entity_addr_t paddr; + // first message? + if (entity->need_addr) { + entity->_set_myaddr(m->get_dest_inst().addr); + dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl; + entity->need_addr = false; + } - // create socket? - sd = ::socket(AF_INET, SOCK_STREAM, 0); - if (sd < 0) { - dout(-1) << "connect couldn't created socket " << strerror(errno) << dendl; - assert(0); - goto fail; - } - opened_socket(); - - // bind any port - myAddr.sin_family = AF_INET; - myAddr.sin_addr.s_addr = htonl(INADDR_ANY); - myAddr.sin_port = htons( 0 ); - dout(10) << "binding to " << myAddr << dendl; - rc = ::bind(sd, (struct sockaddr *)&myAddr, sizeof(myAddr)); - if (rc < 0) { - dout(2) << "bind error " << myAddr - << ", " << errno << ": " << strerror(errno) << dendl; - goto fail; - } + if (rank->need_addr) { + rank->rank_addr = m->get_dest_inst().addr; + rank->rank_addr.erank = 0; + dout(2) << "reader rank_addr is " << rank->rank_addr << dendl; + rank->need_addr = false; + } - // connect! - dout(10) << "connecting to " << peer_addr.ipaddr << dendl; - rc = ::connect(sd, (sockaddr*)&peer_addr.ipaddr, sizeof(peer_addr.ipaddr)); - if (rc < 0) { - dout(2) << "connect error " << peer_addr.ipaddr - << ", " << errno << ": " << strerror(errno) << dendl; - goto fail; - } + } else { + derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl; + } + } + rank->lock.Unlock(); + + if (entity) { + entity->queue_message(m); // queue + entity->put(); + } - // disable Nagle algorithm? - if (g_conf.ms_tcp_nodelay) { - int flag = 1; - int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); - if (r < 0) - dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl; + lock.Lock(); + } + + else if (tag == CEPH_MSGR_TAG_CLOSE) { + dout(20) << "reader got CLOSE" << dendl; + lock.Lock(); + if (state == STATE_CLOSING) + state = STATE_CLOSED; + else + state = STATE_CLOSING; + cond.Signal(); + break; + } + else { + dout(0) << "reader bad tag " << (int)tag << dendl; + lock.Lock(); + fault(false, true); + } } - // verify banner - // FIXME: this should be non-blocking, or in some other way verify the banner as we get it. - rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER)); - if (rc < 0) { - dout(2) << "connect couldn't read banner, " << strerror(errno) << dendl; - goto fail; - } - if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { - dout(0) << "connect protocol error (bad banner) on peer " << paddr << dendl; - goto fail; - } + + // reap? + bool reap = false; + reader_running = false; + if (!writer_running) + reap = true; - memset(&msg, 0, sizeof(msg)); - msgvec[0].iov_base = banner; - msgvec[0].iov_len = strlen(CEPH_BANNER); - msg.msg_iov = msgvec; - msg.msg_iovlen = 1; - msglen = msgvec[0].iov_len; - if (do_sendmsg(sd, &msg, msglen)) { - dout(2) << "connect couldn't write my banner, " << strerror(errno) << dendl; - goto fail; - } + lock.Unlock(); - // identify peer - rc = tcp_read(sd, (char*)&paddr, sizeof(paddr)); - if (rc < 0) { - dout(2) << "connect couldn't read peer addr, " << strerror(errno) << dendl; - goto fail; - } - dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl; - if (!peer_addr.is_local_to(paddr)) { - if (paddr.ipaddr.sin_addr.s_addr == 0 && - peer_addr.ipaddr.sin_port == paddr.ipaddr.sin_port && - peer_addr.nonce == paddr.nonce) { - dout(0) << "connect claims to be " - << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl; - } else { - dout(0) << "connect claims to be " - << paddr << " not " << peer_addr << " - wrong node!" << dendl; - goto fail; + if (reap) { + dout(10) << "reader queueing for reap" << dendl; + if (sd >= 0) { + ::close(sd); + sd = -1; + closed_socket(); } + rank->lock.Lock(); + { + rank->pipe_reap_queue.push_back(this); + rank->wait_cond.Signal(); + } + rank->lock.Unlock(); } - - // identify myself - memset(&msg, 0, sizeof(msg)); - msgvec[0].iov_base = (char*)&rank->rank_addr; - msgvec[0].iov_len = sizeof(rank->rank_addr); - msg.msg_iov = msgvec; - msg.msg_iovlen = 1; - msglen = msgvec[0].iov_len; - if (do_sendmsg(sd, &msg, msglen)) { - dout(2) << "connect couldn't write my addr, " << strerror(errno) << dendl; - goto fail; - } - dout(10) << "connect sent my addr " << rank->rank_addr << dendl; - while (1) { - ceph_msg_connect connect; - connect.host_type = rank->my_type; - connect.global_seq = gseq; - connect.connect_seq = cseq; - connect.flags = 0; - if (policy.lossy_tx) - connect.flags |= CEPH_MSG_CONNECT_LOSSY; - memset(&msg, 0, sizeof(msg)); - msgvec[0].iov_base = (char*)&connect; - msgvec[0].iov_len = sizeof(connect); - msg.msg_iov = msgvec; - msg.msg_iovlen = 1; - msglen = msgvec[0].iov_len; + dout(10) << "reader done" << dendl; +} - dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq << dendl; - if (do_sendmsg(sd, &msg, msglen)) { - dout(2) << "connect couldn't write gseq, cseq, " << strerror(errno) << dendl; - goto fail; - } +/* +class FakeSocketError : public Context { + int sd; +public: + FakeSocketError(int s) : sd(s) {} + void finish(int r) { + cout << "faking socket error on " << sd << std::endl; + ::close(sd); + } +}; +*/ - dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl; - ceph_msg_connect_reply reply; - if (tcp_read(sd, (char*)&reply, sizeof(reply)) < 0) { - dout(2) << "connect read reply " << strerror(errno) << dendl; - goto fail; - } - dout(20) << "connect got reply tag " << (int)reply.tag - << " connect_seq " << reply.connect_seq - << " global_seq " << reply.global_seq - << " flags " << (int)reply.flags - << dendl; +/* write msgs to socket. + * also, client. + */ +void SimpleMessenger::Pipe::writer() +{ + lock.Lock(); - lock.Lock(); - if (state != STATE_CONNECTING) { - dout(0) << "connect got RESETSESSION but no longer connecting" << dendl; - goto stop_locked; - } + while (state != STATE_CLOSED) {// && state != STATE_WAIT) { + // standby? + if (!q.empty() && state == STATE_STANDBY) + state = STATE_CONNECTING; - if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) { - dout(0) << "connect got RESETSESSION" << dendl; - was_session_reset(); - cseq = 0; - lock.Unlock(); - continue; - } - if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { - gseq = rank->get_global_seq(reply.global_seq); - dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq - << " chose new " << gseq << dendl; - lock.Unlock(); + // connect? + if (state == STATE_CONNECTING) { + connect(); continue; } - if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { - assert(reply.connect_seq > connect_seq); - dout(10) << "connect got RETRY_SESSION " << connect_seq - << " -> " << reply.connect_seq << dendl; - cseq = connect_seq = reply.connect_seq; + + if (state == STATE_CLOSING) { + // write close tag + dout(20) << "writer writing CLOSE tag" << dendl; + char tag = CEPH_MSGR_TAG_CLOSE; + state = STATE_CLOSED; lock.Unlock(); + if (sd) ::write(sd, &tag, 1); + lock.Lock(); continue; } - if (reply.tag == CEPH_MSGR_TAG_WAIT) { - dout(3) << "connect got WAIT (connection race)" << dendl; - state = STATE_WAIT; - goto stop_locked; - } - - if (reply.tag == CEPH_MSGR_TAG_READY) { - // hooray! - peer_global_seq = reply.global_seq; - lossy_rx = reply.flags & CEPH_MSG_CONNECT_LOSSY; - state = STATE_OPEN; - connect_seq = cseq + 1; - assert(connect_seq == reply.connect_seq); - first_fault = last_attempt = utime_t(); - dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl; + if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY && + (!q.empty() || in_seq > in_seq_acked)) { - if (!reader_running) { - dout(20) << "connect starting reader" << dendl; - start_reader(); + // send ack? + if (in_seq > in_seq_acked) { + int send_seq = in_seq; + lock.Unlock(); + int rc = write_ack(send_seq); + lock.Lock(); + if (rc < 0) { + dout(2) << "writer couldn't write ack, " << strerror(errno) << dendl; + fault(); + continue; + } + in_seq_acked = send_seq; } - return 0; - } - - // protocol error - dout(0) << "connect got bad tag " << (int)tag << dendl; - goto fail_locked; - } - fail: - lock.Lock(); - fail_locked: - if (state == STATE_CONNECTING) - fault(); - else - dout(3) << "connect fault, but state != connecting, stopping" << dendl; + // grab outgoing message + Message *m = _get_next_outgoing(); + if (m) { + m->set_seq(++out_seq); + sent.push_back(m); // move to sent list + m->get(); + lock.Unlock(); - stop_locked: - return -1; -} + dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; -void Rank::Pipe::register_pipe() -{ - dout(10) << "register_pipe" << dendl; - assert(rank->lock.is_locked()); - assert(rank->rank_pipe.count(peer_addr) == 0); - rank->rank_pipe[peer_addr] = this; -} + // encode and copy out of *m + if (m->empty_payload()) + m->encode_payload(); + m->calc_front_crc(); -void Rank::Pipe::unregister_pipe() -{ - assert(rank->lock.is_locked()); - if (rank->rank_pipe.count(peer_addr) && - rank->rank_pipe[peer_addr] == this) { - dout(10) << "unregister_pipe" << dendl; - rank->rank_pipe.erase(peer_addr); - } else { - dout(10) << "unregister_pipe - not registered" << dendl; - } -} + dout(20) << "writer sending " << m->get_seq() << " " << m << dendl; + int rc = write_message(m); + lock.Lock(); + if (rc < 0) { + derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", " + << errno << ": " << strerror(errno) << dendl; + fault(); + } + m->put(); + } + continue; + } + + // wait + dout(20) << "writer sleeping" << dendl; + cond.Wait(lock); + } + + dout(20) << "writer finishing" << dendl; -void Rank::Pipe::requeue_sent() -{ - if (sent.empty()) - return; + // reap? + bool reap = false; + writer_running = false; + if (!reader_running) reap = true; - list& rq = q[CEPH_MSG_PRIO_HIGHEST]; - while (!sent.empty()) { - Message *m = sent.back(); - sent.pop_back(); - dout(10) << "requeue_sent " << *m << " for resend seq " << out_seq - << " (" << m->get_seq() << ")" << dendl; - rq.push_front(m); - out_seq--; + lock.Unlock(); + + if (reap) { + dout(10) << "writer queueing for reap" << dendl; + if (sd >= 0) { + ::close(sd); + sd = -1; + closed_socket(); + } + rank->lock.Lock(); + { + rank->pipe_reap_queue.push_back(this); + rank->wait_cond.Signal(); + } + rank->lock.Unlock(); } -} -void Rank::Pipe::discard_queue() -{ - dout(10) << "discard_queue" << dendl; - for (list::iterator p = sent.begin(); p != sent.end(); p++) - (*p)->put(); - sent.clear(); - for (map >::iterator p = q.begin(); p != q.end(); p++) - for (list::iterator r = p->second.begin(); r != p->second.end(); r++) - (*r)->put(); - q.clear(); + dout(10) << "writer done" << dendl; } -void Rank::Pipe::fault(bool onconnect, bool onread) +Message *SimpleMessenger::Pipe::read_message() { - assert(lock.is_locked()); - cond.Signal(); - - if (onread && state == STATE_CONNECTING) { - dout(10) << "fault already connecting, reader shutting down" << dendl; - return; - } + // envelope + //dout(10) << "receiver.read_message from sd " << sd << dendl; + + ceph_msg_header header; + ceph_msg_footer footer; - if (!onconnect) dout(2) << "fault " << errno << ": " << strerror(errno) << dendl; + if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0) + return 0; + + dout(20) << "reader got envelope type=" << header.type + << " src " << header.src << " dst " << header.dst + << " front=" << header.front_len + << " data=" << header.data_len + << " off " << header.data_off + << dendl; - if (state == STATE_CLOSED || - state == STATE_CLOSING) { - dout(10) << "fault already closed|closing" << dendl; - return; + // verify header crc + __u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); + if (header_crc != header.crc) { + dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; + return 0; } - if (sd >= 0) { - ::close(sd); - sd = -1; - closed_socket(); + // ok, now it's safe to change the header.. + // munge source address? + if (header.src.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) { + dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl; + header.orig_src.addr.ipaddr = header.src.addr.ipaddr = peer_addr.ipaddr; } - // lossy channel? - if (policy.lossy_tx) { - dout(10) << "fault on lossy channel, failing" << dendl; - fail(); - return; + // read front + bufferlist front; + bufferptr bp; + int front_len = header.front_len; + if (front_len) { + bp = buffer::create(front_len); + if (tcp_read( sd, bp.c_str(), front_len ) < 0) + return 0; + front.push_back(bp); + dout(20) << "reader got front " << front.length() << dendl; } - // requeue sent items - requeue_sent(); - - if (q.empty()) { - if (state == STATE_CLOSING || onconnect) { - dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl; - state = STATE_CLOSED; - } else { - dout(0) << "fault nothing to send, going to standby" << dendl; - state = STATE_STANDBY; - } - return; - } - - utime_t now = g_clock.now(); - if (state != STATE_CONNECTING) { - if (!onconnect) dout(0) << "fault initiating reconnect" << dendl; - connect_seq++; - state = STATE_CONNECTING; - first_fault = now; - } else if (first_fault.sec() == 0) { - if (!onconnect) dout(0) << "fault first fault" << dendl; - first_fault = now; - } else { - utime_t failinterval = now - first_fault; - utime_t retryinterval = now - last_attempt; - if (!onconnect) dout(10) << "fault failure was " << failinterval - << " ago, last attempt was at " << last_attempt - << ", " << retryinterval << " ago" << dendl; - if (policy.fail_interval > 0 && failinterval > policy.fail_interval) { - // give up - dout(0) << "fault giving up" << dendl; - fail(); - } else if (retryinterval < policy.retry_interval) { - // wait - now += (policy.retry_interval - retryinterval); - dout(10) << "fault waiting until " << now << dendl; - cond.WaitUntil(lock, now); - dout(10) << "fault done waiting or woke up" << dendl; - } - } - last_attempt = now; -} + // read data + bufferlist data; + unsigned data_len = le32_to_cpu(header.data_len); + unsigned data_off = le32_to_cpu(header.data_off); + if (data_len) { + int left = data_len; + if (data_off & ~PAGE_MASK) { + // head + int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK), + (unsigned)left); + bp = buffer::create(head); + if (tcp_read( sd, bp.c_str(), head ) < 0) + return 0; + data.push_back(bp); + left -= head; + dout(20) << "reader got data head " << head << dendl; + } -void Rank::Pipe::fail() -{ - derr(10) << "fail" << dendl; - assert(lock.is_locked()); + // middle + int middle = left & PAGE_MASK; + if (middle > 0) { + bp = buffer::create_page_aligned(middle); + if (tcp_read( sd, bp.c_str(), middle ) < 0) + return 0; + data.push_back(bp); + left -= middle; + dout(20) << "reader got data page-aligned middle " << middle << dendl; + } - stop(); - report_failures(); + if (left) { + bp = buffer::create(left); + if (tcp_read( sd, bp.c_str(), left ) < 0) + return 0; + data.push_back(bp); + dout(20) << "reader got data tail " << left << dendl; + } + } - for (unsigned i=0; ilocal.size(); i++) - if (rank->local[i] && rank->local[i]->get_dispatcher()) - rank->local[i]->queue_reset(peer_addr, last_dest_name); + // footer + if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) + return 0; + + int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED); + dout(10) << "aborted = " << aborted << dendl; + if (aborted) { + dout(0) << "reader got " << front.length() << " + " << data.length() + << " byte message from " << header.src << ".. ABORTED" << dendl; + // MEH FIXME + Message *m = new MGenericMessage(CEPH_MSG_PING); + header.type = CEPH_MSG_PING; + m->set_header(header); + return m; + } - // unregister - lock.Unlock(); - rank->lock.Lock(); - unregister_pipe(); - rank->lock.Unlock(); - lock.Lock(); + dout(20) << "reader got " << front.length() << " + " << data.length() + << " byte message from " << header.src << dendl; + return decode_message(header, footer, front, data); } -void Rank::Pipe::was_session_reset() -{ - assert(lock.is_locked()); - - dout(10) << "was_session_reset" << dendl; - report_failures(); - for (unsigned i=0; ilocal.size(); i++) - if (rank->local[i] && rank->local[i]->get_dispatcher()) - rank->local[i]->queue_remote_reset(peer_addr, last_dest_name); - - out_seq = 0; - in_seq = 0; - connect_seq = 0; -} -void Rank::Pipe::report_failures() +int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len) { - // report failures - q[CEPH_MSG_PRIO_HIGHEST].splice(q[CEPH_MSG_PRIO_HIGHEST].begin(), sent); - while (1) { - Message *m = _get_next_outgoing(); - if (!m) - break; + while (len > 0) { + if (0) { // sanity + int l = 0; + for (unsigned i=0; imsg_iovlen; i++) + l += msg->msg_iov[i].iov_len; + assert(l == len); + } - if (policy.drop_msg_callback) { - unsigned srcrank = m->get_source_inst().addr.erank; - if (srcrank >= rank->max_local || rank->local[srcrank] == 0) { - dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl; - } else if (rank->local[srcrank]->is_stopped()) { - dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl; + int r = ::sendmsg(sd, msg, 0); + if (r == 0) + dout(10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl; + if (r < 0) { + dout(1) << "do_sendmsg error " << strerror(errno) << dendl; + return -1; + } + if (state == STATE_CLOSED) { + dout(10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl; + errno = EINTR; + return -1; // close enough + } + len -= r; + if (len == 0) break; + + // hrmph. trim r bytes off the front of our message. + dout(20) << "do_sendmail short write did " << r << ", still have " << len << dendl; + while (r > 0) { + if (msg->msg_iov[0].iov_len <= (size_t)r) { + // lose this whole item + //dout(30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl; + r -= msg->msg_iov[0].iov_len; + msg->msg_iov++; + msg->msg_iovlen--; } else { - dout(10) << "fail on " << *m << dendl; - rank->local[srcrank]->queue_failure(m, m->get_dest_inst()); + // partial! + //dout(30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl; + msg->msg_iov[0].iov_base = (void*)((long)msg->msg_iov[0].iov_base + r); + msg->msg_iov[0].iov_len -= r; + break; } } - m->put(); } + return 0; } -void Rank::Pipe::stop() + +int SimpleMessenger::Pipe::write_ack(unsigned seq) { - dout(10) << "stop" << dendl; - state = STATE_CLOSED; - cond.Signal(); - if (reader_running) - reader_thread.kill(SIGUSR1); - if (writer_running) - writer_thread.kill(SIGUSR1); + dout(10) << "write_ack " << seq << dendl; + + char c = CEPH_MSGR_TAG_ACK; + __le32 s; + s = seq; + + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[2]; + msgvec[0].iov_base = &c; + msgvec[0].iov_len = 1; + msgvec[1].iov_base = &s; + msgvec[1].iov_len = sizeof(s); + msg.msg_iov = msgvec; + msg.msg_iovlen = 2; + + if (do_sendmsg(sd, &msg, 5) < 0) + return -1; + return 0; } -/* read msgs from socket. - * also, server. - */ -void Rank::Pipe::reader() +int SimpleMessenger::Pipe::write_message(Message *m) { - if (state == STATE_ACCEPTING) - accept(); + ceph_msg_header& header = m->get_header(); + ceph_msg_footer& footer = m->get_footer(); - lock.Lock(); + // get envelope, buffers + header.front_len = m->get_payload().length(); + header.data_len = m->get_data().length(); + footer.flags = 0; + m->calc_header_crc(); - // loop. - while (state != STATE_CLOSED && - state != STATE_CONNECTING) { - assert(lock.is_locked()); + bufferlist blist = m->get_payload(); + blist.append(m->get_data()); + + dout(20) << "write_message " << m << " to " << header.dst << dendl; + + // set up msghdr and iovecs + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[3 + blist.buffers().size()]; // conservative upper bound + msg.msg_iov = msgvec; + int msglen = 0; + + // send tag + char tag = CEPH_MSGR_TAG_MSG; + msgvec[msg.msg_iovlen].iov_base = &tag; + msgvec[msg.msg_iovlen].iov_len = 1; + msglen++; + msg.msg_iovlen++; - // sleep if (re)connecting - if (state == STATE_STANDBY) { - dout(20) << "reader sleeping during reconnect|standby" << dendl; - cond.Wait(lock); - continue; - } + // send envelope + msgvec[msg.msg_iovlen].iov_base = (char*)&header; + msgvec[msg.msg_iovlen].iov_len = sizeof(header); + msglen += sizeof(header); + msg.msg_iovlen++; - lock.Unlock(); + // payload (front+data) + list::const_iterator pb = blist.buffers().begin(); + int b_off = 0; // carry-over buffer offset, if any + int bl_pos = 0; // blist pos + int left = blist.length(); - char tag = -1; - dout(20) << "reader reading tag..." << dendl; - int rc = tcp_read(sd, (char*)&tag, 1); - if (rc < 0) { - lock.Lock(); - dout(2) << "reader couldn't read tag, " << strerror(errno) << dendl; - fault(false, true); - continue; + while (left > 0) { + int donow = MIN(left, (int)pb->length()-b_off); + if (donow == 0) { + dout(0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() << " b_off " << b_off << dendl; } - - // open ... - if (tag == CEPH_MSGR_TAG_ACK) { - dout(20) << "reader got ACK" << dendl; - __u32 seq; - int rc = tcp_read( sd, (char*)&seq, sizeof(seq)); - lock.Lock(); - if (rc < 0) { - dout(2) << "reader couldn't read ack seq, " << strerror(errno) << dendl; - fault(false, true); - } else if (state != STATE_CLOSED) { - dout(15) << "reader got ack seq " << seq << dendl; - // trim sent list - while (!sent.empty() && - sent.front()->get_seq() <= seq) { - Message *m = sent.front(); - sent.pop_front(); - dout(10) << "reader got ack seq " - << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl; - m->put(); - } - } - continue; + assert(donow > 0); + dout(30) << " bl_pos " << bl_pos << " b_off " << b_off + << " leftinchunk " << left + << " buffer len " << pb->length() + << " writing " << donow + << dendl; + + if (msg.msg_iovlen >= IOV_MAX-2) { + if (do_sendmsg(sd, &msg, msglen)) + return -1; + + // and restart the iov + msg.msg_iov = msgvec; + msg.msg_iovlen = 0; + msglen = 0; + } + + msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off); + msgvec[msg.msg_iovlen].iov_len = donow; + msglen += donow; + msg.msg_iovlen++; + + left -= donow; + assert(left >= 0); + b_off += donow; + bl_pos += donow; + if (left == 0) break; + while (b_off == (int)pb->length()) { + pb++; + b_off = 0; } + } + assert(left == 0); - else if (tag == CEPH_MSGR_TAG_MSG) { - dout(20) << "reader got MSG" << dendl; - Message *m = read_message(); - lock.Lock(); - - if (!m) { - derr(2) << "reader read null message, " << strerror(errno) << dendl; - fault(false, true); - continue; - } - - if (state == STATE_CLOSED || - state == STATE_CONNECTING) - continue; + // send footer + msgvec[msg.msg_iovlen].iov_base = (void*)&footer; + msgvec[msg.msg_iovlen].iov_len = sizeof(footer); + msglen += sizeof(footer); + msg.msg_iovlen++; - // check received seq# - if (m->get_seq() <= in_seq) { - dout(-10) << "reader got old message " - << m->get_seq() << " <= " << in_seq << " " << m << " " << *m - << " for " << m->get_dest() - << ", discarding" << dendl; - delete m; - continue; - } - in_seq++; + // send + if (do_sendmsg(sd, &msg, msglen)) + return -1; - if (!lossy_rx && in_seq != m->get_seq()) { - dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq - << " for " << *m << " from " << m->get_source() << dendl; - derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq - << " for " << *m << " from " << m->get_source() << dendl; - assert(in_seq == m->get_seq()); // for now! - fault(false, true); - delete m; - continue; - } + return 0; +} - cond.Signal(); // wake up writer, to ack this - lock.Unlock(); - - dout(10) << "reader got message " - << m->get_seq() << " " << m << " " << *m - << " for " << m->get_dest() << dendl; - - // deliver - Endpoint *entity = 0; - - rank->lock.Lock(); - { - unsigned erank = m->get_dest_inst().addr.erank; - if (erank < rank->max_local && rank->local[erank]) { - // find entity - entity = rank->local[erank]; - entity->get(); - // first message? - if (entity->need_addr) { - entity->_set_myaddr(m->get_dest_inst().addr); - dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl; - entity->need_addr = false; - } +/******************************************** + * SimpleMessenger + */ +#undef dout_prefix +#define dout_prefix _prefix(this) - if (rank->need_addr) { - rank->rank_addr = m->get_dest_inst().addr; - rank->rank_addr.erank = 0; - dout(2) << "reader rank_addr is " << rank->rank_addr << dendl; - rank->need_addr = false; - } - } else { - derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl; - } - } - rank->lock.Unlock(); - - if (entity) { - entity->queue_message(m); // queue - entity->put(); - } +/* + * note: assumes lock is held + */ +void SimpleMessenger::reaper() +{ + dout(10) << "reaper" << dendl; + assert(lock.is_locked()); - lock.Lock(); - } - - else if (tag == CEPH_MSGR_TAG_CLOSE) { - dout(20) << "reader got CLOSE" << dendl; - lock.Lock(); - if (state == STATE_CLOSING) - state = STATE_CLOSED; - else - state = STATE_CLOSING; - cond.Signal(); - break; - } - else { - dout(0) << "reader bad tag " << (int)tag << dendl; - lock.Lock(); - fault(false, true); - } + while (!pipe_reap_queue.empty()) { + Pipe *p = pipe_reap_queue.front(); + dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; + p->unregister_pipe(); + pipe_reap_queue.pop_front(); + assert(pipes.count(p)); + pipes.erase(p); + p->join(); + p->discard_queue(); + dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; + assert(p->sd < 0); + delete p; + dout(10) << "reaper deleted pipe " << p << dendl; } +} - - // reap? - bool reap = false; - reader_running = false; - if (!writer_running) - reap = true; - - lock.Unlock(); - if (reap) { - dout(10) << "reader queueing for reap" << dendl; - if (sd >= 0) { - ::close(sd); - sd = -1; - closed_socket(); - } - rank->lock.Lock(); - { - rank->pipe_reap_queue.push_back(this); - rank->wait_cond.Signal(); - } - rank->lock.Unlock(); +int SimpleMessenger::bind(int64_t force_nonce) +{ + lock.Lock(); + if (started) { + dout(10) << "rank.bind already started" << dendl; + lock.Unlock(); + return -1; } + dout(10) << "rank.bind" << dendl; + lock.Unlock(); - dout(10) << "reader done" << dendl; + // bind to a socket + return accepter.bind(force_nonce); } -/* -class FakeSocketError : public Context { - int sd; + +class C_Die : public Context { public: - FakeSocketError(int s) : sd(s) {} - void finish(int r) { - cout << "faking socket error on " << sd << std::endl; - ::close(sd); + void finish(int) { + cerr << "die" << std::endl; + exit(1); } }; -*/ -/* write msgs to socket. - * also, client. - */ -void Rank::Pipe::writer() +static void write_pid_file(int pid) { - lock.Lock(); - - while (state != STATE_CLOSED) {// && state != STATE_WAIT) { - // standby? - if (!q.empty() && state == STATE_STANDBY) - state = STATE_CONNECTING; - - // connect? - if (state == STATE_CONNECTING) { - connect(); - continue; - } - - if (state == STATE_CLOSING) { - // write close tag - dout(20) << "writer writing CLOSE tag" << dendl; - char tag = CEPH_MSGR_TAG_CLOSE; - state = STATE_CLOSED; - lock.Unlock(); - if (sd) ::write(sd, &tag, 1); - lock.Lock(); - continue; - } - - if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY && - (!q.empty() || in_seq > in_seq_acked)) { + if (!g_conf.pid_file) + return; - // send ack? - if (in_seq > in_seq_acked) { - int send_seq = in_seq; - lock.Unlock(); - int rc = write_ack(send_seq); - lock.Lock(); - if (rc < 0) { - dout(2) << "writer couldn't write ack, " << strerror(errno) << dendl; - fault(); - continue; - } - in_seq_acked = send_seq; - } + int fd = ::open(g_conf.pid_file, O_CREAT|O_TRUNC|O_WRONLY, 0644); + if (fd >= 0) { + char buf[20]; + int len = sprintf(buf, "%d\n", pid); + ::write(fd, buf, len); + ::close(fd); + } +} - // grab outgoing message - Message *m = _get_next_outgoing(); - if (m) { - m->set_seq(++out_seq); - sent.push_back(m); // move to sent list - m->get(); - lock.Unlock(); +static void remove_pid_file() +{ + if (!g_conf.pid_file) + return; - dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; + // only remove it if it has OUR pid in it! + int fd = ::open(g_conf.pid_file, O_RDONLY); + if (fd >= 0) { + char buf[20]; + ::read(fd, buf, 20); + ::close(fd); + int a = atoi(buf); - // encode and copy out of *m - if (m->empty_payload()) - m->encode_payload(); - m->calc_front_crc(); + if (a == getpid()) + ::unlink(g_conf.pid_file); + else + generic_dout(0) << "strange, pid file " << g_conf.pid_file + << " has " << a << ", not expected " << getpid() + << dendl; + } +} - dout(20) << "writer sending " << m->get_seq() << " " << m << dendl; - int rc = write_message(m); +int SimpleMessenger::start(bool nodaemon) +{ + // register at least one entity, first! + assert(my_type >= 0); - lock.Lock(); - if (rc < 0) { - derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", " - << errno << ": " << strerror(errno) << dendl; - fault(); - } - m->put(); - } - continue; - } - - // wait - dout(20) << "writer sleeping" << dendl; - cond.Wait(lock); + lock.Lock(); + if (started) { + dout(10) << "rank.start already started" << dendl; + lock.Unlock(); + return 0; } - - dout(20) << "writer finishing" << dendl; - - // reap? - bool reap = false; - writer_running = false; - if (!reader_running) reap = true; + dout(1) << "rank.start at " << rank_addr << dendl; + started = true; lock.Unlock(); - - if (reap) { - dout(10) << "writer queueing for reap" << dendl; - if (sd >= 0) { - ::close(sd); - sd = -1; - closed_socket(); + + // daemonize? + if (g_conf.daemonize && !nodaemon) { + if (Thread::get_num_threads() > 0) { + derr(0) << "rank.start BUG: there are " << Thread::get_num_threads() + << " already started that will now die! call rank.start() sooner." + << dendl; } - rank->lock.Lock(); - { - rank->pipe_reap_queue.push_back(this); - rank->wait_cond.Signal(); + dout(1) << "rank.start daemonizing" << dendl; + + if (1) { + daemon(1, 0); + write_pid_file(getpid()); + } else { + pid_t pid = fork(); + if (pid) { + // i am parent + write_pid_file(pid); + ::close(0); + ::close(1); + ::close(2); + _exit(0); + } } - rank->lock.Unlock(); + + if (g_conf.chdir && g_conf.chdir[0]) { + ::mkdir(g_conf.chdir, 0700); + ::chdir(g_conf.chdir); + } + + _dout_rename_output_file(); + } else if (g_daemon) { + write_pid_file(getpid()); } - dout(10) << "writer done" << dendl; + // some debug hackery? + if (g_conf.kill_after) + g_timer.add_event_after(g_conf.kill_after, new C_Die); + + // go! + accepter.start(); + return 0; } -Message *Rank::Pipe::read_message() +/* connect_rank + * NOTE: assumes rank.lock held. + */ +SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, const Policy& p) { - // envelope - //dout(10) << "receiver.read_message from sd " << sd << dendl; + assert(lock.is_locked()); + assert(addr != rank_addr); - ceph_msg_header header; - ceph_msg_footer footer; - - if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0) - return 0; + dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; - dout(20) << "reader got envelope type=" << header.type - << " src " << header.src << " dst " << header.dst - << " front=" << header.front_len - << " data=" << header.data_len - << " off " << header.data_off - << dendl; - - // verify header crc - __u32 header_crc = crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); - if (header_crc != header.crc) { - dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; - return 0; - } + // create pipe + Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING); + pipe->policy = p; + pipe->peer_addr = addr; + pipe->start_writer(); + pipe->register_pipe(); + pipes.insert(pipe); - // ok, now it's safe to change the header.. - // munge source address? - if (header.src.addr.ipaddr.sin_addr.s_addr == htonl(INADDR_ANY)) { - dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl; - header.orig_src.addr.ipaddr = header.src.addr.ipaddr = peer_addr.ipaddr; - } + return pipe; +} - // read front - bufferlist front; - bufferptr bp; - int front_len = header.front_len; - if (front_len) { - bp = buffer::create(front_len); - if (tcp_read( sd, bp.c_str(), front_len ) < 0) - return 0; - front.push_back(bp); - dout(20) << "reader got front " << front.length() << dendl; - } - // read data - bufferlist data; - unsigned data_len = le32_to_cpu(header.data_len); - unsigned data_off = le32_to_cpu(header.data_off); - if (data_len) { - int left = data_len; - if (data_off & ~PAGE_MASK) { - // head - int head = MIN(PAGE_SIZE - (data_off & ~PAGE_MASK), - (unsigned)left); - bp = buffer::create(head); - if (tcp_read( sd, bp.c_str(), head ) < 0) - return 0; - data.push_back(bp); - left -= head; - dout(20) << "reader got data head " << head << dendl; - } - // middle - int middle = left & PAGE_MASK; - if (middle > 0) { - bp = buffer::create_page_aligned(middle); - if (tcp_read( sd, bp.c_str(), middle ) < 0) - return 0; - data.push_back(bp); - left -= middle; - dout(20) << "reader got data page-aligned middle " << middle << dendl; - } - if (left) { - bp = buffer::create(left); - if (tcp_read( sd, bp.c_str(), left ) < 0) - return 0; - data.push_back(bp); - dout(20) << "reader got data tail " << left << dendl; - } - } - // footer - if (tcp_read(sd, (char*)&footer, sizeof(footer)) < 0) - return 0; - - int aborted = (le32_to_cpu(footer.flags) & CEPH_MSG_FOOTER_ABORTED); - dout(10) << "aborted = " << aborted << dendl; - if (aborted) { - dout(0) << "reader got " << front.length() << " + " << data.length() - << " byte message from " << header.src << ".. ABORTED" << dendl; - // MEH FIXME - Message *m = new MGenericMessage(CEPH_MSG_PING); - header.type = CEPH_MSG_PING; - m->set_header(header); - return m; - } - dout(20) << "reader got " << front.length() << " + " << data.length() - << " byte message from " << header.src << dendl; - return decode_message(header, footer, front, data); -} -int Rank::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len) +/* register_entity + */ +SimpleMessenger::Endpoint *SimpleMessenger::register_entity(entity_name_t name) { - while (len > 0) { - if (0) { // sanity - int l = 0; - for (unsigned i=0; imsg_iovlen; i++) - l += msg->msg_iov[i].iov_len; - assert(l == len); - } + dout(10) << "register_entity " << name << dendl; + lock.Lock(); + + // create messenger + int erank = max_local; + Endpoint *msgr = new Endpoint(this, name, erank); - int r = ::sendmsg(sd, msg, 0); - if (r == 0) - dout(10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl; - if (r < 0) { - dout(1) << "do_sendmsg error " << strerror(errno) << dendl; - return -1; - } - if (state == STATE_CLOSED) { - dout(10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl; - errno = EINTR; - return -1; // close enough - } - len -= r; - if (len == 0) break; - - // hrmph. trim r bytes off the front of our message. - dout(20) << "do_sendmail short write did " << r << ", still have " << len << dendl; - while (r > 0) { - if (msg->msg_iov[0].iov_len <= (size_t)r) { - // lose this whole item - //dout(30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl; - r -= msg->msg_iov[0].iov_len; - msg->msg_iov++; - msg->msg_iovlen--; - } else { - // partial! - //dout(30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl; - msg->msg_iov[0].iov_base = (void*)((long)msg->msg_iov[0].iov_base + r); - msg->msg_iov[0].iov_len -= r; - break; - } - } - } - return 0; + // now i know my type. + if (my_type >= 0) + assert(my_type == name.type()); + else + my_type = name.type(); + + // add to directory + max_local++; + local.resize(max_local); + stopped.resize(max_local); + + msgr->get(); + local[erank] = msgr; + stopped[erank] = false; + msgr->_myinst.addr = rank_addr; + if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr) + msgr->need_addr = true; + msgr->_myinst.addr.erank = erank; + + dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr + << " need_addr=" << need_addr + << dendl; + + num_local++; + + lock.Unlock(); + return msgr; } -int Rank::Pipe::write_ack(unsigned seq) +void SimpleMessenger::unregister_entity(Endpoint *msgr) { - dout(10) << "write_ack " << seq << dendl; + lock.Lock(); + dout(10) << "unregister_entity " << msgr->get_myname() << dendl; + + // remove from local directory. + assert(msgr->my_rank >= 0); + assert(local[msgr->my_rank] == msgr); + local[msgr->my_rank] = 0; + stopped[msgr->my_rank] = true; + num_local--; + msgr->my_rank = -1; - char c = CEPH_MSGR_TAG_ACK; - __le32 s; - s = seq; + assert(msgr->nref.test() > 1); + msgr->put(); - struct msghdr msg; - memset(&msg, 0, sizeof(msg)); - struct iovec msgvec[2]; - msgvec[0].iov_base = &c; - msgvec[0].iov_len = 1; - msgvec[1].iov_base = &s; - msgvec[1].iov_len = sizeof(s); - msg.msg_iov = msgvec; - msg.msg_iovlen = 2; - - if (do_sendmsg(sd, &msg, 5) < 0) - return -1; - return 0; + wait_cond.Signal(); + + lock.Unlock(); } -int Rank::Pipe::write_message(Message *m) +void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy) { - ceph_msg_header& header = m->get_header(); - ceph_msg_footer& footer = m->get_footer(); + const entity_name_t dest = m->get_dest(); - // get envelope, buffers - header.front_len = m->get_payload().length(); - header.data_len = m->get_data().length(); - footer.flags = 0; - m->calc_header_crc(); + assert(m->nref.test() == 0); - bufferlist blist = m->get_payload(); - blist.append(m->get_data()); - - dout(20) << "write_message " << m << " to " << header.dst << dendl; - - // set up msghdr and iovecs - struct msghdr msg; - memset(&msg, 0, sizeof(msg)); - struct iovec msgvec[3 + blist.buffers().size()]; // conservative upper bound - msg.msg_iov = msgvec; - int msglen = 0; - - // send tag - char tag = CEPH_MSGR_TAG_MSG; - msgvec[msg.msg_iovlen].iov_base = &tag; - msgvec[msg.msg_iovlen].iov_len = 1; - msglen++; - msg.msg_iovlen++; + m->get_header().mon_protocol = CEPH_MON_PROTOCOL; + m->get_header().monc_protocol = CEPH_MONC_PROTOCOL; + m->get_header().mds_protocol = CEPH_MDS_PROTOCOL; + m->get_header().mdsc_protocol = CEPH_MDSC_PROTOCOL; + m->get_header().osd_protocol = CEPH_OSD_PROTOCOL; + m->get_header().osdc_protocol = CEPH_OSDC_PROTOCOL; - // send envelope - msgvec[msg.msg_iovlen].iov_base = (char*)&header; - msgvec[msg.msg_iovlen].iov_len = sizeof(header); - msglen += sizeof(header); - msg.msg_iovlen++; + // lookup + entity_addr_t dest_proc_addr = dest_addr; + dest_proc_addr.erank = 0; - // payload (front+data) - list::const_iterator pb = blist.buffers().begin(); - int b_off = 0; // carry-over buffer offset, if any - int bl_pos = 0; // blist pos - int left = blist.length(); + lock.Lock(); + { + // local? + if (rank_addr.is_local_to(dest_addr)) { + if (dest_addr.erank < max_local && local[dest_addr.erank]) { + // local + dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl; + local[dest_addr.erank]->queue_message(m); + } else { + derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl; + //assert(0); // hmpf, this is probably mds->mon beacon from newsyn. + delete m; + } + } + else { + // remote. + Pipe *pipe = 0; + if (rank_pipe.count( dest_proc_addr )) { + // connected? + pipe = rank_pipe[ dest_proc_addr ]; + pipe->lock.Lock(); + if (pipe->state == Pipe::STATE_CLOSED) { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl; + pipe->unregister_pipe(); + pipe->lock.Unlock(); + pipe = 0; + } else { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl; - while (left > 0) { - int donow = MIN(left, (int)pb->length()-b_off); - if (donow == 0) { - dout(0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() << " b_off " << b_off << dendl; + // if this pipe was created by an incoming connection, but we haven't received + // a message yet, then it won't have the policy set. + if (pipe->get_out_seq() == 0) + pipe->policy = policy_map[m->get_dest().type()]; + + pipe->_send(m); + pipe->lock.Unlock(); + } + } + if (!pipe) { + if (lazy) { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl; + delete m; + } else { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl; + // not connected. + pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]); + pipe->send(m); + } + } } - assert(donow > 0); - dout(30) << " bl_pos " << bl_pos << " b_off " << b_off - << " leftinchunk " << left - << " buffer len " << pb->length() - << " writing " << donow - << dendl; - - if (msg.msg_iovlen >= IOV_MAX-2) { - if (do_sendmsg(sd, &msg, msglen)) - return -1; - - // and restart the iov - msg.msg_iov = msgvec; - msg.msg_iovlen = 0; - msglen = 0; + } + + lock.Unlock(); +} + + + + + +void SimpleMessenger::wait() +{ + lock.Lock(); + while (1) { + // reap dead pipes + reaper(); + + if (num_local == 0) { + dout(10) << "wait: everything stopped" << dendl; + break; // everything stopped. + } else { + dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl; } - msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off); - msgvec[msg.msg_iovlen].iov_len = donow; - msglen += donow; - msg.msg_iovlen++; - - left -= donow; - assert(left >= 0); - b_off += donow; - bl_pos += donow; - if (left == 0) break; - while (b_off == (int)pb->length()) { - pb++; - b_off = 0; - } + wait_cond.Wait(lock); } - assert(left == 0); + lock.Unlock(); + + // done! clean up. + dout(20) << "wait: stopping accepter thread" << dendl; + accepter.stop(); + dout(20) << "wait: stopped accepter thread" << dendl; - // send footer - msgvec[msg.msg_iovlen].iov_base = (void*)&footer; - msgvec[msg.msg_iovlen].iov_len = sizeof(footer); - msglen += sizeof(footer); - msg.msg_iovlen++; + // close+reap all pipes + lock.Lock(); + { + dout(10) << "wait: closing pipes" << dendl; + list toclose; + for (hash_map::iterator i = rank_pipe.begin(); + i != rank_pipe.end(); + i++) + toclose.push_back(i->second); + for (list::iterator i = toclose.begin(); + i != toclose.end(); + i++) { + (*i)->unregister_pipe(); + (*i)->lock.Lock(); + (*i)->stop(); + (*i)->lock.Unlock(); + } - // send - if (do_sendmsg(sd, &msg, msglen)) - return -1; + reaper(); + dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl; + while (!pipes.empty()) { + wait_cond.Wait(lock); + reaper(); + } + } + lock.Unlock(); - return 0; + dout(10) << "wait: done." << dendl; + dout(1) << "shutdown complete." << dendl; + remove_pid_file(); + started = false; + my_type = -1; } + + +void SimpleMessenger::mark_down(entity_addr_t addr) +{ + lock.Lock(); + if (rank_pipe.count(addr)) { + Pipe *p = rank_pipe[addr]; + dout(1) << "mark_down " << addr << " -- " << p << dendl; + p->unregister_pipe(); + p->lock.Lock(); + p->stop(); + p->lock.Unlock(); + } else { + dout(1) << "mark_down " << addr << " -- pipe dne" << dendl; + } + lock.Unlock(); +} + diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 98f808c7cebc2..d25d58189a192 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -36,7 +36,7 @@ using namespace __gnu_cxx; /* Rank - per-process */ -class Rank { +class SimpleMessenger { public: struct Policy { bool lossy_tx; // @@ -88,11 +88,11 @@ private: // incoming class Accepter : public Thread { public: - Rank *rank; + SimpleMessenger *rank; bool done; int listen_sd; - Accepter(Rank *r) : rank(r), done(false), listen_sd(-1) {} + Accepter(SimpleMessenger *r) : rank(r), done(false), listen_sd(-1) {} void *entry(); void stop(); @@ -105,7 +105,7 @@ private: // pipe class Pipe { public: - Rank *rank; + SimpleMessenger *rank; ostream& _pipe_prefix(); enum { @@ -179,10 +179,10 @@ private: friend class Writer; public: - Pipe(Rank *r, int st) : + Pipe(SimpleMessenger *r, int st) : rank(r), sd(-1), - lock("Rank::Pipe::lock"), + lock("SimpleMessenger::Pipe::lock"), state(st), reader_running(false), writer_running(false), connect_seq(0), peer_global_seq(0), @@ -264,7 +264,7 @@ private: // messenger interface class Endpoint : public Messenger { - Rank *rank; + SimpleMessenger *rank; Mutex lock; Cond cond; map > dispatch_queue; @@ -286,7 +286,7 @@ private: } dispatch_thread; void dispatch_entry(); - friend class Rank; + friend class SimpleMessenger; public: void queue_message(Message *m) { @@ -331,10 +331,10 @@ private: } public: - Endpoint(Rank *r, entity_name_t name, int rn) : + Endpoint(SimpleMessenger *r, entity_name_t name, int rn) : Messenger(name), rank(r), - lock("Rank::Endpoint::lock"), + lock("SimpleMessenger::Endpoint::lock"), stop(false), qlen(0), my_rank(rn), @@ -373,7 +373,7 @@ private: }; - // Rank stuff + // SimpleMessenger stuff public: Mutex lock; Cond wait_cond; // for wait() @@ -409,12 +409,12 @@ private: void reaper(); public: - Rank() : accepter(this), - lock("Rank::lock"), started(false), need_addr(true), + SimpleMessenger() : accepter(this), + lock("SimpleMessenger::lock"), started(false), need_addr(true), max_local(0), num_local(0), my_type(-1), - global_seq_lock("Rank::global_seq_lock"), global_seq(0) { } - ~Rank() { } + global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0) { } + ~SimpleMessenger() { } //void set_listen_addr(tcpaddr_t& a); @@ -444,8 +444,4 @@ public: } } ; - - -extern Rank rank; - #endif diff --git a/src/testmsgr.cc b/src/testmsgr.cc index 6da4d057a9946..5bb484d159da1 100644 --- a/src/testmsgr.cc +++ b/src/testmsgr.cc @@ -83,6 +83,7 @@ int main(int argc, const char **argv, const char *envp[]) { // start up network g_my_addr = monmap.get_inst(whoami).addr; + SimpleMessenger rank; int err = rank.bind(); if (err < 0) return 1; -- 2.39.5