From e39f438a4874bc7570fc6caf21c683987b3a7a29 Mon Sep 17 00:00:00 2001 From: sageweil Date: Mon, 29 Oct 2007 17:12:02 +0000 Subject: [PATCH] simplemessenger send handshake, reconnect attempts, timeout; ceph_entity_addr now uses sockaddr_in git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1999 29311d96-e01e-0410-9327-a35deaab8ce9 --- trunk/ceph/TODO | 36 +- trunk/ceph/config.cc | 9 +- trunk/ceph/config.h | 4 +- trunk/ceph/fakefuse.cc | 2 +- trunk/ceph/fakesyn.cc | 2 +- trunk/ceph/include/ceph_fs.h | 16 +- trunk/ceph/msg/FakeMessenger.cc | 2 +- trunk/ceph/msg/FakeMessenger.h | 3 - trunk/ceph/msg/Message.h | 3 + trunk/ceph/msg/SimpleMessenger.cc | 1910 ++++++++++++++++------------- trunk/ceph/msg/SimpleMessenger.h | 87 +- trunk/ceph/msg/msg_types.h | 24 +- trunk/ceph/msg/tcp.cc | 2 +- trunk/ceph/msg/tcp.h | 10 +- 14 files changed, 1160 insertions(+), 950 deletions(-) diff --git a/trunk/ceph/TODO b/trunk/ceph/TODO index f6e6642014cf9..78c0b4e8918f8 100644 --- a/trunk/ceph/TODO +++ b/trunk/ceph/TODO @@ -1,54 +1,27 @@ some smallish projects: -- crush rewrite in C - - generalize any memory management etc. to allow use in kernel and userspace - userspace crush tools - xml import/export? - ? -- pg monitor service - - to support statfs? - - general pg health - - some sort of (throttled) osd status reporting - - dynamic pg creation (eventually!) - -- SimpleMessenger - - clean up/merge Messenger/Dispatcher interfaces - - auto close idle connections - - delivery ack and buffering, and then reconnect - - take a look at RDS? http://oss.oracle.com/projects/rds/ - - generalize monitor client? - throttle message resend attempts - ENOSPC on client, OSD - - code cleanup - endian portability - word size - clean up all encoded structures -general kernel planning +kernel planning - soft consistency on (kernel) lookup? - accurate reconstruction of (syscall) path? - -sage doc -- mdsmonitor beacon semantics -- cache expiration, cache invariants - - including dual expire states, transition, vs subtree grouping of expire messages -- recovery states, implicit barrier are rejoin -- journal content - - importmaps and up:resolve -- metablob version semantics - - -mds bugs +mds mustfix - open file rejournaling vs capped log... - open files vs shutdown in general! need to export any caps on replicated metadata - export caps to auth on unlinked inodes @@ -186,9 +159,8 @@ messenger simplemessenger - close idle connections -- buffer sent messages until a receive is acknowledged (handshake!) - - retry, timeout on connection or transmission failure -- exponential backoff on monitor resend attempts (actually, this should go outside the messenger!) +- take a look at RDS? http://oss.oracle.com/projects/rds/ + objectcacher - merge clean bh's diff --git a/trunk/ceph/config.cc b/trunk/ceph/config.cc index a96071ed63fd4..390178655a8e7 100644 --- a/trunk/ceph/config.cc +++ b/trunk/ceph/config.cc @@ -158,12 +158,14 @@ md_config_t g_conf = { // --- messenger --- ms_tcp_nodelay: true, + ms_retry_interval: 2.0, // how often to attempt reconnect + ms_fail_interval: 15.0, // fail after this long + ms_die_on_failure: false, ms_stripe_osds: false, ms_skip_rank0: false, ms_overlay_clients: false, - ms_die_on_failure: false, // --- mon --- mon_tick_interval: 5, @@ -473,10 +475,11 @@ bool parse_ip_port(const char *s, entity_addr_t& a) } s++; off++; + unsigned char *ipq = (unsigned char*)&a.v.ipaddr.sin_addr.s_addr; if (count <= 3) - a.v.ipq[count] = val; + ipq[count] = val; else - a.v.port = val; + a.set_port(val); count++; if (count == 4 && *s != ':') break; diff --git a/trunk/ceph/config.h b/trunk/ceph/config.h index 169887d2b23fe..ef286a9c86052 100644 --- a/trunk/ceph/config.h +++ b/trunk/ceph/config.h @@ -113,11 +113,13 @@ struct md_config_t { */ bool ms_tcp_nodelay; + double ms_retry_interval; + double ms_fail_interval; + bool ms_die_on_failure; bool ms_stripe_osds; bool ms_skip_rank0; bool ms_overlay_clients; - bool ms_die_on_failure; // mon int mon_tick_interval; diff --git a/trunk/ceph/fakefuse.cc b/trunk/ceph/fakefuse.cc index b08d00d11a5d6..3a778e1b64e39 100644 --- a/trunk/ceph/fakefuse.cc +++ b/trunk/ceph/fakefuse.cc @@ -90,7 +90,7 @@ int main(int argc, char **argv) { entity_addr_t a; a.v.nonce = getpid(); for (int i=0; imon_inst[i] = entity_inst_t(entity_name_t::MON(i), a); // hack ; see FakeMessenger.cc } diff --git a/trunk/ceph/fakesyn.cc b/trunk/ceph/fakesyn.cc index 9d12f138379ae..eea6f28755ec9 100644 --- a/trunk/ceph/fakesyn.cc +++ b/trunk/ceph/fakesyn.cc @@ -88,7 +88,7 @@ int main(int argc, char **argv) entity_addr_t a; a.v.nonce = getpid(); for (int i=0; imon_inst[i] = entity_inst_t(entity_name_t::MON(i), a); // hack ; see FakeMessenger.cc } diff --git a/trunk/ceph/include/ceph_fs.h b/trunk/ceph/include/ceph_fs.h index 0a812c70b13d8..7be2afdd8e3a2 100644 --- a/trunk/ceph/include/ceph_fs.h +++ b/trunk/ceph/include/ceph_fs.h @@ -7,6 +7,7 @@ #define _FS_CEPH_CEPH_FS_H #include +#include typedef __u64 ceph_ino_t; @@ -136,6 +137,12 @@ struct ceph_entity_name { #define CEPH_ENTITY_TYPE_CLIENT 4 #define CEPH_ENTITY_TYPE_ADMIN 5 +#define CEPH_MSGR_TAG_READY 0 // server -> client + oseq: ready for messages +#define CEPH_MSGR_TAG_REJECT 1 // server -> client + oseq: decline socket +#define CEPH_MSGR_TAG_MSG 2 // message +#define CEPH_MSGR_TAG_ACK 3 // message ack +#define CEPH_MSGR_TAG_CLOSE 4 // closing pipe + /* * entity_addr @@ -145,17 +152,12 @@ struct ceph_entity_name { struct ceph_entity_addr { __u32 erank; /* entity's rank in process */ __u32 nonce; /* unique id for process (e.g. pid) */ - __u32 port; /* ip port */ - __u8 ipq[4]; /* ipv4 addr quad */ + struct sockaddr_in ipaddr; }; #define ceph_entity_addr_is_local(a,b) \ ((a).nonce == (b).nonce && \ - (a).port == (b).port && \ - (a).ipq[0] == (b).ipq[0] && \ - (a).ipq[1] == (b).ipq[1] && \ - (a).ipq[2] == (b).ipq[2] && \ - (a).ipq[3] == (b).ipq[3]) + (a).ipaddr == (b).ipaddr) struct ceph_entity_inst { diff --git a/trunk/ceph/msg/FakeMessenger.cc b/trunk/ceph/msg/FakeMessenger.cc index 87019ba010a33..590b3214eb351 100644 --- a/trunk/ceph/msg/FakeMessenger.cc +++ b/trunk/ceph/msg/FakeMessenger.cc @@ -275,7 +275,7 @@ FakeMessenger::FakeMessenger(entity_name_t me) : Messenger(me) // assign rank unsigned r = directory.size(); _myinst.name = me; - _myinst.addr.v.port = 0; + _myinst.addr.set_port(0); _myinst.addr.v.erank = r; _myinst.addr.v.nonce = getpid(); diff --git a/trunk/ceph/msg/FakeMessenger.h b/trunk/ceph/msg/FakeMessenger.h index ab89b7e47f983..0b08b8c9d4c55 100644 --- a/trunk/ceph/msg/FakeMessenger.h +++ b/trunk/ceph/msg/FakeMessenger.h @@ -54,9 +54,6 @@ class FakeMessenger : public Messenger { // msg interface int send_message(Message *m, entity_inst_t dest); - // events - //virtual void trigger_timer(Timer *t); - int get_dispatch_queue_len() { return qlen; } // -- incoming queue -- diff --git a/trunk/ceph/msg/Message.h b/trunk/ceph/msg/Message.h index c4be3da01616b..b4bf53db8b4fd 100644 --- a/trunk/ceph/msg/Message.h +++ b/trunk/ceph/msg/Message.h @@ -201,6 +201,9 @@ public: void set_recv_stamp(utime_t t) { recv_stamp = t; } utime_t get_recv_stamp() { return recv_stamp; } + unsigned get_seq() { return env.seq; } + void set_seq(unsigned s) { env.seq = s; } + // ENVELOPE ---- // type diff --git a/trunk/ceph/msg/SimpleMessenger.cc b/trunk/ceph/msg/SimpleMessenger.cc index b6cac09880eb5..48576755b1eb5 100644 --- a/trunk/ceph/msg/SimpleMessenger.cc +++ b/trunk/ceph/msg/SimpleMessenger.cc @@ -62,7 +62,8 @@ void Rank::sigint() derr(0) << "got control-c, exiting" << dendl; // force close listener socket - ::close(accepter.listen_sd); + if (accepter.listen_sd > 0) + ::close(accepter.listen_sd); // force close all pipe sockets, too for (hash_map::iterator p = rank_pipe.begin(); @@ -116,7 +117,7 @@ int Rank::Accepter::start() } // use whatever user specified (if anything) - tcpaddr_t listen_addr; + sockaddr_in listen_addr; g_my_addr.make_addr(listen_addr); /* socket creation */ @@ -153,12 +154,12 @@ int Rank::Accepter::start() myhostname->h_addr_list[0], myhostname->h_length); rank.rank_addr.set_addr(listen_addr); - rank.rank_addr.v.port = 0; // see below + rank.rank_addr.set_port(0); } - if (rank.rank_addr.v.port == 0) { + if (rank.rank_addr.get_port() == 0) { entity_addr_t tmp; tmp.set_addr(listen_addr); - rank.rank_addr.v.port = tmp.v.port; + rank.rank_addr.set_port(tmp.get_port()); rank.rank_addr.v.nonce = getpid(); // FIXME: pid might not be best choice here. } rank.rank_addr.v.erank = 0; @@ -203,9 +204,19 @@ void *Rank::Accepter::entry() if (sd > 0) { dout(10) << "accepted incoming on sd " << sd << dendl; + // disable Nagle algorithm? + if (g_conf.ms_tcp_nodelay) { + int flag = 1; + int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); + if (r < 0) + dout(0) << "accepter could't set TCP_NODELAY: " << strerror(errno) << dendl; + } + rank.lock.Lock(); if (rank.num_local > 0) { - Pipe *p = new Pipe(sd); + Pipe *p = new Pipe(Pipe::STATE_ACCEPTING); + p->sd = sd; + p->start_reader(); rank.pipes.insert(p); } rank.lock.Unlock(); @@ -215,7 +226,7 @@ void *Rank::Accepter::entry() } dout(20) << "accepter closing" << dendl; - ::close(listen_sd); + if (listen_sd > 0) ::close(listen_sd); dout(10) << "accepter stopping" << dendl; return 0; } @@ -228,1056 +239,1257 @@ void Rank::Accepter::stop() } -/************************************** - * Pipe + + +/******************************************** + * Rank */ -int Rank::Pipe::accept() -{ - // my creater gave me sd via accept() - - // announce myself. - int rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr)); - if (rc < 0) { - ::close(sd); - done = true; - return -1; - } - - // identify peer - rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr)); - if (rc < 0) { - dout(10) << "pipe(? " << this << ").accept couldn't read peer addr" << dendl; - ::close(sd); - done = true; - return -1; - } - - // register pipe. - rank.lock.Lock(); - { - if (rank.rank_pipe.count(peer_addr) == 0) { - // install as outgoing pipe! - dout(10) << "pipe(" << peer_addr << ' ' << this << ").accept peer is " << peer_addr << dendl; - rank.rank_pipe[peer_addr] = this; - // create writer thread. - writer_running = true; - writer_thread.create(); - } else { - // hrm, this may affect message delivery order.. keep both pipes! - dout(10) << "pipe(" << peer_addr << ' ' << this << ").accept already have a pipe for this peer (" << rank.rank_pipe[peer_addr] << "), will receive on this pipe only" << dendl; +/* + * note: assumes lock is held + */ +void Rank::reaper() +{ + dout(10) << "reaper" << dendl; + assert(lock.is_locked()); - // FIXME i could stop the receiver on the other pipe.. - - /* - // low ranks' Pipes "win" - if (peer_addr < rank.my_addr) { - dout(10) << "pipe(" << peer_addr << ' ' << this << ").accept peer is " << peer_addr - << ", already had pipe, but switching to this new one" << dendl; - // switch to this new Pipe - rank.rank_pipe[peer_addr]->unregister(); // close old one - rank.rank_pipe[peer_addr]->close(); // close old one - rank.rank_pipe[peer_addr] = this; - } else { - dout(10) << "pipe(" << peer_addr << ' ' << this << ").accept peer is " << peer_addr - << ", already had pipe, sticking with it" << dendl; - } - */ - } + while (!pipe_reap_queue.empty()) { + Pipe *p = pipe_reap_queue.front(); + dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; + p->unregister_pipe(); + pipe_reap_queue.pop_front(); + assert(pipes.count(p)); + pipes.erase(p); + p->join(); + dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; + delete p; } - rank.lock.Unlock(); - - return 0; // success. } -int Rank::Pipe::connect() -{ - dout(10) << "pipe(" << peer_addr << ' ' << this << ").connect" << dendl; - - // create socket? - sd = socket(AF_INET,SOCK_STREAM,0); - assert(sd > 0); - - // bind any port - struct sockaddr_in myAddr; - myAddr.sin_family = AF_INET; - myAddr.sin_addr.s_addr = htonl(INADDR_ANY); - myAddr.sin_port = htons( 0 ); - - int rc = bind(sd, (struct sockaddr *) &myAddr, sizeof(myAddr)); - assert(rc>=0); - // connect! - tcpaddr_t tcpaddr; - peer_addr.make_addr(tcpaddr); - rc = ::connect(sd, (sockaddr*)&tcpaddr, sizeof(myAddr)); - if (rc < 0) { - dout(10) << "connect error " << peer_addr - << ", " << errno << ": " << strerror(errno) << dendl; - return rc; +int Rank::start_rank() +{ + lock.Lock(); + if (started) { + dout(10) << "start_rank already started" << dendl; + lock.Unlock(); + return 0; } + dout(10) << "start_rank" << dendl; + lock.Unlock(); - // identify peer ..... FIXME - entity_addr_t paddr; - rc = tcp_read(sd, (char*)&paddr, sizeof(paddr)); - if (!rc) { // bool - dout(0) << "pipe(" << peer_addr << ' ' << this << ").connect couldn't read peer addr" << dendl; - return -1; - } - if (!ceph_entity_addr_is_local(peer_addr.v, paddr.v)) { - dout(0) << "pipe(" << peer_addr << ' ' << this << ").connect peer identifies itself as " << paddr << ", wrong guy!" << dendl; - ::close(sd); - sd = 0; + // bind to a socket + if (accepter.start() < 0) return -1; - } - // identify myself - rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr)); - if (rc < 0) - return -1; - - // register pipe - /* - rank.lock.Lock(); - { - if (rank.rank_pipe.count(peer_addr) == 0) { - dout(10) << "pipe(" << peer_addr << ' ' << this << ").connect registering pipe" << dendl; - rank.rank_pipe[peer_addr] = this; - } else { - // this is normal. - dout(10) << "pipe(" << peer_addr << ' ' << this << ").connect pipe already registered." << dendl; - } - } - rank.lock.Unlock(); - */ + lock.Lock(); - // start reader - reader_running = true; - reader_thread.create(); - + dout(1) << "start_rank at " << rank_addr << dendl; + started = true; + lock.Unlock(); return 0; } -void Rank::Pipe::unregister() + +/* connect_rank + * NOTE: assumes rank.lock held. + */ +Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr) { assert(rank.lock.is_locked()); - if (rank.rank_pipe.count(peer_addr) && - rank.rank_pipe[peer_addr] == this) { - dout(10) << "pipe(" << peer_addr << ' ' << this - << ").unregister" << dendl; - rank.rank_pipe.erase(peer_addr); - } else { - dout(10) << "pipe(" << peer_addr << ' ' << this - << ").unregister - not registerd" << dendl; - } + assert(addr != rank.rank_addr); + + dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; + + // create pipe + Pipe *pipe = new Pipe(Pipe::STATE_CONNECTING); + pipe->peer_addr = addr; + pipe->start_writer(); + pipe->register_pipe(); + pipes.insert(pipe); + + return pipe; } -void Rank::Pipe::close() -{ - dout(10) << "pipe(" << peer_addr << ' ' << this << ").close" << dendl; - // queue close message? - if (!need_to_send_close) { - dout(10) << "pipe(" << peer_addr << ' ' << this - << ").close already closing/closed" << dendl; - return; - } - - if (!writer_running) { - dout(10) << "pipe(" << peer_addr << ' ' << this - << ").close not queueing MSG_CLOSE, no writer running" << dendl; - } else { - dout(10) << "pipe(" << peer_addr << ' ' << this - << ").close queueing MSG_CLOSE" << dendl; - lock.Lock(); - q.push_back(new MGenericMessage(MSG_CLOSE)); - cond.Signal(); - need_to_send_close = false; - lock.Unlock(); - } -} -/* read msgs from socket. - * also, server. - * - */ -void Rank::Pipe::reader() -{ - if (server) - accept(); - // loop. - while (!done) { - Message *m = read_message(); - if (!m || m->get_type() == 0) { - if (m) { - delete m; - dout(10) << "pipe(" << peer_addr << ' ' << this << ").reader read MSG_CLOSE message" << dendl; - need_to_send_close = false; - } else { - derr(10) << "pipe(" << peer_addr << ' ' << this << ").reader read null message" << dendl; - } - rank.lock.Lock(); - unregister(); - rank.lock.Unlock(); - close(); - done = true; - cond.Signal(); // wake up writer too. - break; - } - in_seq++; +/* register_entity + */ +Rank::EntityMessenger *Rank::register_entity(entity_name_t name) +{ + dout(10) << "register_entity " << name << dendl; + lock.Lock(); + + // create messenger + int erank = max_local; + EntityMessenger *msgr = new EntityMessenger(name, erank); - dout(10) << "pipe(" << peer_addr << ' ' << this << ").reader got message " - << in_seq << " " << m << " " << *m - << " for " << m->get_dest() << dendl; + // add to directory + max_local++; + local.resize(max_local); + stopped.resize(max_local); - // deliver - EntityMessenger *entity = 0; + local[erank] = msgr; + stopped[erank] = false; + msgr->my_addr = rank_addr; + msgr->my_addr.v.erank = erank; - rank.lock.Lock(); - { - unsigned erank = m->get_dest_inst().addr.v.erank; - if (erank < rank.max_local && rank.local[erank]) { - // find entity - entity = rank.local[erank]; - } else { - derr(0) << "pipe(" << peer_addr << ' ' << this << ").reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl; - } - } - rank.lock.Unlock(); - - if (entity) - entity->queue_message(m); // queue - } + dout(10) << "register_entity " << name << " at " << msgr->my_addr << dendl; + num_local++; - // reap? - bool reap = false; - lock.Lock(); - { - reader_running = false; - if (!writer_running) reap = true; - } lock.Unlock(); - - if (reap) { - dout(20) << "pipe(" << peer_addr << ' ' << this << ").reader queueing for reap" << dendl; - ::close(sd); - rank.lock.Lock(); - { - rank.pipe_reap_queue.push_back(this); - rank.wait_cond.Signal(); - } - rank.lock.Unlock(); - } + return msgr; } -/* write msgs to socket. - * also, client. - */ -void Rank::Pipe::writer() +void Rank::unregister_entity(EntityMessenger *msgr) { - if (!server) { - int rc = connect(); - if (rc < 0) { - derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error connecting, " - << errno << ": " << strerror(errno) - << dendl; - done = true; - list out; - fail(out); - } - } + lock.Lock(); + dout(10) << "unregister_entity " << msgr->get_myname() << dendl; + + // remove from local directory. + local[msgr->my_rank] = 0; + stopped[msgr->my_rank] = true; + num_local--; - // disable Nagle algorithm? - if (g_conf.ms_tcp_nodelay) { - int flag = 1; - int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); - if (r < 0) - dout(0) << "pipe(" << peer_addr << ' ' << this << ").writer couldn't set TCP_NODELAY: " << strerror(errno) << dendl; - } + wait_cond.Signal(); - // loop. - lock.Lock(); - while (!q.empty() || !done) { - - if (!q.empty()) { - dout(20) << "pipe(" << peer_addr << ' ' << this << ").writer grabbing message(s)" << dendl; - - // grab outgoing list - list out; - out.swap(q); - - // drop lock while i send these - lock.Unlock(); - - while (!out.empty()) { - Message *m = out.front(); - out.pop_front(); - - dout(20) << "pipe(" << peer_addr << ' ' << this << ").writer sending " << m << " " << *m << dendl; - - // marshall - if (m->empty_payload()) - m->encode_payload(); - - if (write_message(m) < 0) { - // failed! - derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error sending " << *m << " to " << m->get_dest() - << ", " << errno << ": " << strerror(errno) - << dendl; - out.push_front(m); - fail(out); - done = true; - break; - } + lock.Unlock(); +} - // did i just send a close? - if (m->get_type() == MSG_CLOSE) - done = true; - // clean up - delete m; - } +void Rank::submit_message(Message *m, const entity_addr_t& dest_addr) +{ + const entity_name_t dest = m->get_dest(); - lock.Lock(); - continue; - } - - // wait - dout(20) << "pipe(" << peer_addr << ' ' << this << ").writer sleeping" << dendl; - cond.Wait(lock); - } - lock.Unlock(); - - dout(20) << "pipe(" << peer_addr << ' ' << this << ").writer finishing" << dendl; + // lookup + entity_addr_t dest_proc_addr = dest_addr; + dest_proc_addr.v.erank = 0; - // reap? - bool reap = false; lock.Lock(); { - writer_running = false; - if (!reader_running) reap = true; + // local? + if (ceph_entity_addr_is_local(dest_addr.v, rank_addr.v)) { + if (dest_addr.v.erank < max_local && local[dest_addr.v.erank]) { + // local + dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl; + local[dest_addr.v.erank]->queue_message(m); + } else { + derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl; + //assert(0); // hmpf, this is probably mds->mon beacon from newsyn. + delete m; + } + } + else { + // remote. + Pipe *pipe = 0; + if (rank_pipe.count( dest_proc_addr )) { + // connected? + pipe = rank_pipe[ dest_proc_addr ]; + pipe->lock.Lock(); + if (pipe->state == Pipe::STATE_CLOSED) { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl; + pipe->unregister_pipe(); + pipe->lock.Unlock(); + pipe = 0; + } else { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl; + pipe->_send(m); + pipe->lock.Unlock(); + } + } + if (!pipe) { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl; + // not connected. + pipe = connect_rank( dest_proc_addr ); + pipe->send(m); + } + } } + lock.Unlock(); - - if (reap) { - dout(20) << "pipe(" << peer_addr << ' ' << this << ").writer queueing for reap" << dendl; - ::close(sd); - rank.lock.Lock(); - { - rank.pipe_reap_queue.push_back(this); - rank.wait_cond.Signal(); - } - rank.lock.Unlock(); - } } -Message *Rank::Pipe::read_message() -{ - // envelope - //dout(10) << "receiver.read_message from sd " << sd << dendl; - - ceph_message_header env; - if (!tcp_read( sd, (char*)&env, sizeof(env) )) { - need_to_send_close = false; - return 0; - } - - dout(20) << "pipe(" << peer_addr << ' ' << this << ").reader got envelope type=" << env.type - << " src " << env.src << " dst " << env.dst - << " nchunks=" << env.nchunks - << dendl; - - // payload - bufferlist blist; - int32_t pos = 0; - list chunk_at; - for (unsigned i=0; iset_chunk_payload_at(chunk_at); - - dout(20) << "pipe(" << peer_addr << ' ' << this << ").reader got " << s << " byte message from " - << m->get_source() << dendl; - - return m; -} - + // done! clean up. + dout(20) << "wait: stopping accepter thread" << dendl; + accepter.stop(); + dout(20) << "wait: stopped accepter thread" << dendl; -int Rank::Pipe::do_sendmsg(Message *m, struct msghdr *msg, int len) -{ - while (len > 0) { - if (0) { // sanity - int l = 0; - for (unsigned i=0; imsg_iovlen; i++) - l += msg->msg_iov[i].iov_len; - assert(l == len); + // close+reap all pipes + lock.Lock(); + { + dout(10) << "wait: closing pipes" << dendl; + list toclose; + for (hash_map::iterator i = rank_pipe.begin(); + i != rank_pipe.end(); + i++) + toclose.push_back(i->second); + for (list::iterator i = toclose.begin(); + i != toclose.end(); + i++) { + (*i)->unregister_pipe(); + (*i)->dirty_close(); } - int r = ::sendmsg(sd, msg, 0); - if (r < 0) { - assert(r == -1); - derr(1) << "pipe(" << peer_addr << ' ' << this << ").writer error on sendmsg for " << *m - << " to " << m->get_dest() - << ", " << strerror(errno) - << dendl; - need_to_send_close = false; - return -1; - } - len -= r; - if (len == 0) break; - - // hrmph. trim r bytes off the front of our message. - dout(20) << "pipe(" << peer_addr << ' ' << this << ").writer partial sendmsg for " << *m - << " to " << m->get_dest() - << " did " << r << ", still have " << len - << dendl; - while (r > 0) { - if (msg->msg_iov[0].iov_len <= (size_t)r) { - // lose this whole item - //dout(30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl; - r -= msg->msg_iov[0].iov_len; - msg->msg_iov++; - msg->msg_iovlen--; - } else { - // partial! - //dout(30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl; - msg->msg_iov[0].iov_base = (void*)((long)msg->msg_iov[0].iov_base + r); - msg->msg_iov[0].iov_len -= r; - break; - } + reaper(); + dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl; + while (!pipes.empty()) { + wait_cond.Wait(lock); + reaper(); } } - return 0; -} + lock.Unlock(); + dout(10) << "wait: done." << dendl; + dout(1) << "shutdown complete." << dendl; +} -int Rank::Pipe::write_message(Message *m) -{ - // get envelope, buffers - ceph_message_header *env = &m->get_envelope(); - bufferlist blist; - blist.claim( m->get_payload() ); - - // chunk out page aligned buffers? - if (blist.length() == 0) - env->nchunks = 0; - else { - env->nchunks = 1 + m->get_chunk_payload_at().size(); // header + explicit chunk points - if (!m->get_chunk_payload_at().empty()) - dout(20) << "chunking at " << m->get_chunk_payload_at() - << " in " << *m << " len " << blist.length() - << dendl; - } - dout(20) << "pipe(" << peer_addr << ' ' << this << ").write_message " << m << " " << *m - << " to " << m->get_dest() - << " in " << env->nchunks - << dendl; - - // set up msghdr and iovecs - struct msghdr msg; - memset(&msg, 0, sizeof(msg)); - struct iovec msgvec[1 + blist.buffers().size() + env->nchunks*2]; // conservative upper bound - msg.msg_iov = msgvec; - int msglen = 0; - - // send envelope - msgvec[0].iov_base = (char*)env; - msgvec[0].iov_len = sizeof(*env); - msglen += sizeof(*env); - msg.msg_iovlen++; - - // payload - list::const_iterator pb = blist.buffers().begin(); - list::const_iterator pc = m->get_chunk_payload_at().begin(); - int b_off = 0; // carry-over buffer offset, if any - int bl_pos = 0; // blist pos - int nchunks = env->nchunks; - int32_t chunksizes[nchunks]; - for (int curchunk=0; curchunk < nchunks; curchunk++) { - // start a chunk - int32_t size = blist.length() - bl_pos; - if (pc != m->get_chunk_payload_at().end()) { - assert(*pc > bl_pos); - size = *pc - bl_pos; - dout(30) << "pos " << bl_pos << " explicit chunk at " << *pc << " size " << size << " of " << blist.length() << dendl; - pc++; - } - assert(size > 0); - dout(30) << "chunk " << curchunk << " pos " << bl_pos << " size " << size << dendl; - // chunk size - chunksizes[curchunk] = size; - msgvec[msg.msg_iovlen].iov_base = &chunksizes[curchunk]; - msgvec[msg.msg_iovlen].iov_len = sizeof(int32_t); - msglen += sizeof(int32_t); - msg.msg_iovlen++; - // chunk contents - int left = size; - while (left > 0) { - int donow = MIN(left, (int)pb->length()-b_off); - assert(donow > 0); - dout(30) << " bl_pos " << bl_pos << " b_off " << b_off - << " leftinchunk " << left - << " buffer len " << pb->length() - << " writing " << donow - << dendl; - if (msg.msg_iovlen >= IOV_MAX-1) { - if (do_sendmsg(m, &msg, msglen)) - return -1; +/********************************** + * EntityMessenger + */ - // and restart the iov - msg.msg_iov = msgvec; - msg.msg_iovlen = 0; - msglen = 0; +void Rank::EntityMessenger::dispatch_entry() +{ + lock.Lock(); + while (!stop) { + if (!dispatch_queue.empty() || !prio_dispatch_queue.empty()) { + list ls; + if (!prio_dispatch_queue.empty()) { + ls.swap(prio_dispatch_queue); + pqlen = 0; + } else { + if (0) { + ls.swap(dispatch_queue); + qlen = 0; + } else { + // limit how much low-prio stuff we grab, to avoid starving high-prio messages! + ls.push_back(dispatch_queue.front()); + dispatch_queue.pop_front(); + qlen--; + } } - msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off); - msgvec[msg.msg_iovlen].iov_len = donow; - msglen += donow; - msg.msg_iovlen++; - - left -= donow; - assert(left >= 0); - b_off += donow; - bl_pos += donow; - if (b_off != (int)pb->length()) - break; - pb++; - b_off = 0; + lock.Unlock(); + { + // deliver + while (!ls.empty()) { + if (stop) { + dout(1) << "dispatch: stop=true, discarding " << ls.size() + << " messages in dispatch queue" << dendl; + break; + } + Message *m = ls.front(); + ls.pop_front(); + dout(1) << m->get_dest() + << " <== " << m->get_source_inst() + << " ==== " << *m + << " ==== " << m + << dendl; + dispatch(m); + dout(20) << "done calling dispatch on " << m << dendl; + } + } + lock.Lock(); + continue; } - assert(left == 0); + cond.Wait(lock); } - assert(pb == blist.buffers().end()); + lock.Unlock(); + + // deregister + rank.unregister_entity(this); +} + +void Rank::EntityMessenger::ready() +{ + dout(10) << "ready " << get_myaddr() << dendl; + assert(!dispatch_thread.is_started()); - // send - if (do_sendmsg(m, &msg, msglen)) - return -1; + // start my dispatch thread + dispatch_thread.create(); +} + + +int Rank::EntityMessenger::shutdown() +{ + dout(10) << "shutdown " << get_myaddr() << dendl; + + // stop my dispatch thread + if (dispatch_thread.am_self()) { + dout(10) << "shutdown i am dispatch, setting stop flag" << dendl; + stop = true; + } else { + dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl; + lock.Lock(); + stop = true; + cond.Signal(); + lock.Unlock(); + } return 0; } - -void Rank::Pipe::fail(list& out) +void Rank::EntityMessenger::suicide() { - derr(10) << "pipe(" << peer_addr << ' ' << this << ").fail" << dendl; + dout(10) << "suicide " << get_myaddr() << dendl; + shutdown(); + // hmm, or exit(0)? +} - // FIXME: possible race before i reclaim lock here? - - // deactivate myself +void Rank::EntityMessenger::prepare_dest(const entity_addr_t& addr) +{ rank.lock.Lock(); { - if (rank.rank_pipe.count(peer_addr) && - rank.rank_pipe[peer_addr] == this) - rank.rank_pipe.erase(peer_addr); + if (rank.rank_pipe.count(addr) == 0) + rank.connect_rank(addr); } rank.lock.Unlock(); +} - // what do i do about reader()? FIXME +int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest) +{ + // set envelope + m->set_source(get_myname()); + m->set_source_addr(my_addr); + m->set_dest_inst(dest); + + dout(1) << m->get_source() + << " --> " << dest.name << " " << dest.addr + << " -- " << *m + << " -- " << m + << dendl; - // sort my messages by (source) dispatcher, dest. - map > > by_dis; + rank.submit_message(m, dest.addr); + + return 0; +} + + + +void Rank::EntityMessenger::reset_myname(entity_name_t newname) +{ + entity_name_t oldname = get_myname(); + dout(10) << "reset_myname " << oldname << " to " << newname << dendl; + _set_myname(newname); +} + + + + +void Rank::EntityMessenger::mark_down(entity_addr_t a) +{ + rank.mark_down(a); +} + +void Rank::mark_down(entity_addr_t addr) +{ + //if (my_rank == 0) return; // ugh.. rank0 already handles this stuff in the namer lock.Lock(); - { - // include out at front of queue - q.splice(q.begin(), out); - - // sort - while (!q.empty()) { - unsigned srcrank = q.front()->get_source_inst().addr.v.erank; - if (q.front()->get_type() == MSG_CLOSE) { - delete q.front(); - } - else if (srcrank < rank.max_local && rank.local[srcrank]) { - EntityMessenger *mgr = rank.local[srcrank]; - Dispatcher *dis = mgr->get_dispatcher(); - if (mgr->is_stopped()) { - // ignore. - dout(1) << "pipe(" << peer_addr << ' ' << this << ").fail on " << *q.front() << ", dispatcher stopping, ignoring." << dendl; - delete q.front(); - } else { - by_dis[dis][q.front()->get_dest()].push_back(q.front()); - } - } - else { - // oh well. sending entity musta just shut down? - delete q.front(); + /* + if (entity_map.count(a) && + entity_map[a] > inst) { + dout(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << dendl; + derr(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << dendl; + // do nothing! + } else { + if (entity_map.count(a) == 0) { + // don't know it + dout(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << dendl; + derr(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << dendl; + } else { + // know it + assert(entity_map[a] <= inst); + dout(10) << "mark_down " << a << " inst " << inst << dendl; + derr(10) << "mark_down " << a << " inst " << inst << dendl; + + entity_map.erase(a); + + if (rank_pipe.count(inst)) { + rank_pipe[inst]->close(); + rank_pipe.erase(inst); } - q.pop_front(); } } + */ lock.Unlock(); - - // report failure(s) to dispatcher(s) - for (map > >::iterator i = by_dis.begin(); - i != by_dis.end(); - ++i) - for (map >::iterator j = i->second.begin(); - j != i->second.end(); - ++j) - for (list::iterator k = j->second.begin(); - k != j->second.end(); - ++k) { - derr(1) << "pipe(" << peer_addr << ' ' << this << ").fail on " << **k << " to " << (*k)->get_dest_inst() << dendl; - if (i->first) - i->first->ms_handle_failure(*k, (*k)->get_dest_inst()); - } } - -/******************************************** - * Rank +/************************************** + * Pipe */ +#undef dout +#undef derr +#define dout(l) if (l<=g_conf.debug_ms) *_dout << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this << ")." +#define derr(l) if (l<=g_conf.debug_ms) *_derr << dbeginl << g_clock.now() << " " << pthread_self() << " -- " << rank.rank_addr << " >> " << peer_addr << " pipe(" << this << ")." /* - * note: assumes lock is held + * we have to be careful about connection races: + * A initiates connection + * B initiates connection + * B accepts A's connection + * A rejects B's connection (or vice-versa) + * + * this is controlled by whether accept uses the new incoming socket + * as the new pipe. two cases: + * old new + * connecting connecting -> use socket initiated by lower address + * open connecting + * -> use new socket _only_ if connect_seq matches. that is, the + * peer reconnected subsequent to the current open socket. if + * connect_seq _doesn't_ match, it means that it is an old attempt. */ -void Rank::reaper() + +int Rank::Pipe::accept() { - dout(10) << "reaper" << dendl; - assert(lock.is_locked()); + dout(10) << "accept" << dendl; - while (!pipe_reap_queue.empty()) { - Pipe *p = pipe_reap_queue.front(); - dout(10) << "reaper reaping pipe " << p->get_peer_addr() << dendl; - pipe_reap_queue.pop_front(); - assert(pipes.count(p)); - pipes.erase(p); - p->join(); - dout(10) << "reaper reaped pipe " << p->get_peer_addr() << dendl; - delete p; + // my creater gave me sd via accept() + assert(state == STATE_ACCEPTING); + + // announce myself. + int rc = tcp_write(sd, (char*)&rank.rank_addr, sizeof(rank.rank_addr)); + if (rc < 0) { + dout(10) << "accept couldn't write my addr" << dendl; + state = STATE_CLOSED; + return -1; + } + + // identify peer + rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr)); + if (rc < 0) { + dout(10) << "accept couldn't read peer addr" << dendl; + state = STATE_CLOSED; + return -1; } -} + __u32 cseq; + rc = tcp_read(sd, (char*)&cseq, sizeof(cseq)); + if (rc < 0) { + dout(10) << "accept couldn't read connect seq" << dendl; + state = STATE_CLOSED; + return -1; + } -int Rank::start_rank() -{ - lock.Lock(); - if (started) { - dout(10) << "start_rank already started" << dendl; - lock.Unlock(); - return 0; + dout(20) << "accept got connect_seq " << cseq << dendl; + + // register pipe. + rank.lock.Lock(); + { + if (rank.rank_pipe.count(peer_addr) == 0) { + dout(10) << "accept new peer " << peer_addr << dendl; + register_pipe(); + } else { + // hmm! + Pipe *other = rank.rank_pipe[peer_addr]; + other->lock.Lock(); + + dout(10) << "accept got connect_seq " << cseq + << ", existing pipe connect_seq " << other->connect_seq + << " state " << other->state + << dendl; + + // if open race, low addr's pipe "wins". + // otherwise, look at oseq vs out_seq + if ((other->state == STATE_CONNECTING && peer_addr < rank.rank_addr) || + (other->state == STATE_OPEN && cseq == other->connect_seq)) { + dout(10) << "accept already had pipe " << other + << ", but switching to this new one" << dendl; + // switch to this new Pipe + other->state = STATE_CLOSED; + assert(q.empty()); + other->cond.Signal(); + other->unregister_pipe(); + register_pipe(); + + // steal queue and out_seq + other->take_queue(q); + out_seq = other->out_seq; + //for (list::iterator p = q.begin(); p != q.end(); p++) + //(*p)->set_seq(++out_seq); + } + else { + dout(10) << "accept already had pipe " << other + << ", closing other" << dendl; + state = STATE_CLOSED; + } + other->lock.Unlock(); + } } - dout(10) << "start_rank" << dendl; - lock.Unlock(); + rank.lock.Unlock(); - // bind to a socket - if (accepter.start() < 0) - return -1; + char tag; + if (state == STATE_CLOSED) { + dout(10) << "accept closed, sending REJECT tag" << dendl; + tag = CEPH_MSGR_TAG_REJECT; + } else { + dout(10) << "accept sending READY tag" << dendl; + tag = CEPH_MSGR_TAG_READY; + state = STATE_OPEN; + } - lock.Lock(); + if (tcp_write(sd, &tag, 1) < 0 || + tcp_write(sd, (char*)&connect_seq, sizeof(connect_seq)) < 0) { + // hrmpf + dout(10) << "accept couldn't send initial tag+seq: " + << strerror(errno) << dendl; + fault(); + } - dout(1) << "start_rank at " << rank_addr << dendl; - started = true; - lock.Unlock(); - return 0; + if (state != STATE_CLOSED) { + dout(10) << "accept starting writer, " + << "state=" << state << dendl; + start_writer(); + } + + return 0; // success. } +int Rank::Pipe::connect() +{ + dout(10) << "connect " << connect_seq << dendl; + assert(lock.is_locked()); + if (sd > 0) { + ::close(sd); + sd = 0; + } + __u32 cseq = connect_seq; -/* connect_rank - * NOTE: assumes rank.lock held. - */ -Rank::Pipe *Rank::connect_rank(const entity_addr_t& addr) -{ - assert(rank.lock.is_locked()); - assert(addr != rank.rank_addr); + lock.Unlock(); + + int newsd; + char tag; + int rc; + struct sockaddr_in myAddr; + sockaddr_in tcpaddr; + entity_addr_t paddr; + struct msghdr msg; + struct iovec msgvec[2]; + int msglen; + + // create socket? + newsd = ::socket(AF_INET,SOCK_STREAM,0); + if (newsd < 0) { + dout(-1) << "connect couldn't created socket " << strerror(errno) << dendl; + assert(0); + goto fail; + } - dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; + // bind any port + myAddr.sin_family = AF_INET; + myAddr.sin_addr.s_addr = htonl(INADDR_ANY); + myAddr.sin_port = htons( 0 ); - // create pipe - Pipe *pipe = new Pipe(addr); - rank.rank_pipe[addr] = pipe; - pipes.insert(pipe); + rc = ::bind(newsd, (struct sockaddr *) &myAddr, sizeof(myAddr)); + assert(rc>=0); + + // connect! + peer_addr.make_addr(tcpaddr); + rc = ::connect(newsd, (sockaddr*)&tcpaddr, sizeof(myAddr)); + if (rc < 0) { + dout(10) << "connect error " << peer_addr + << ", " << errno << ": " << strerror(errno) << dendl; + goto fail; + } + + // disable Nagle algorithm? + if (g_conf.ms_tcp_nodelay) { + int flag = 1; + int r = ::setsockopt(newsd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); + if (r < 0) + dout(0) << "connect couldn't set TCP_NODELAY: " << strerror(errno) << dendl; + } - // register - rank.rank_pipe[addr] = pipe; + // identify peer + rc = tcp_read(newsd, (char*)&paddr, sizeof(paddr)); + if (!rc) { // bool + dout(0) << "connect couldn't read peer addr" << dendl; + goto fail; + } + dout(20) << "connect read peer addr " << paddr << dendl; + if (!ceph_entity_addr_is_local(peer_addr.v, paddr.v)) { + dout(0) << "connect peer identifies itself as " + << paddr << "... wrong node!" << dendl; + goto fail; + } + + // identify myself, and send open seq + memset(&msg, 0, sizeof(msg)); + msgvec[0].iov_base = (char*)&rank.rank_addr; + msgvec[0].iov_len = sizeof(rank.rank_addr); + msgvec[1].iov_base = (char*)&cseq; + msgvec[1].iov_len = sizeof(cseq); + msg.msg_iov = msgvec; + msg.msg_iovlen = 2; + msglen = msgvec[0].iov_len + msgvec[1].iov_len; + + if (do_sendmsg(newsd, &msg, msglen)) { + dout(20) << "connect couldn't write self, seq" << dendl; + goto fail; + } + + dout(20) << "connect wrote self, seq, waiting for tag" << dendl; + + // wait for tag + tag = -1; + if (tcp_read(newsd, &tag, 1) <= 0 || + tcp_read(newsd, (char*)&cseq, sizeof(cseq)) <= 0) + goto fail; + + dout(20) << "connect got initial tag " << (int)tag << " + seq " << cseq << dendl; + + lock.Lock(); + + // FINISH + if (state != STATE_CONNECTING) { + dout(20) << "connect hmm, not connecting anymore, failing" << dendl; + goto fail2; // hmm! + } + if (tag != CEPH_MSGR_TAG_READY) { + dout(20) << "connect didn't get READY tag, my connect_seq=" << connect_seq + << ", got " << cseq << dendl; + if (connect_seq != cseq) { + dout(0) << "connect got REJECT tag, old connect_seq was " << connect_seq + << ", taking new " << cseq << dendl; + connect_seq = cseq; + } + goto fail2; + } + state = STATE_OPEN; + this->sd = newsd; + connect_seq++; + first_fault = last_attempt = utime_t(); + dout(20) << "connect success " << connect_seq << dendl; + + if (!reader_running) { + dout(20) << "connect starting reader" << dendl; + start_reader(); + } + return 0; - return pipe; + fail: + lock.Lock(); + fail2: + if (newsd > 0) ::close(newsd); + fault(); + return -1; } +void Rank::Pipe::register_pipe() +{ + dout(10) << "register" << dendl; + assert(rank.lock.is_locked()); + assert(rank.rank_pipe.count(peer_addr) == 0); + rank.rank_pipe[peer_addr] = this; +} +void Rank::Pipe::unregister_pipe() +{ + assert(rank.lock.is_locked()); + if (rank.rank_pipe.count(peer_addr) && + rank.rank_pipe[peer_addr] == this) { + dout(10) << "unregister" << dendl; + rank.rank_pipe.erase(peer_addr); + } else { + dout(10) << "unregister - not registered" << dendl; + } +} +void Rank::Pipe::fault() +{ + assert(lock.is_locked()); + if (q.empty()) { + dout(0) << "fault nothing to send, closing" << dendl; + state = STATE_CLOSED; + } else { + utime_t now = g_clock.now(); + if (state != STATE_CONNECTING) { + dout(0) << "fault initiating reconnect" << dendl; + connect_seq++; + state = STATE_CONNECTING; + first_fault = now; + } else if (first_fault.sec() == 0) { + dout(0) << "fault during connect" << dendl; + first_fault = now; + } else { + utime_t failinterval = now - first_fault; + utime_t retryinterval = now - last_attempt; + dout(10) << "fault failure was " << failinterval + << " ago, last attempt was at " << last_attempt + << ", " << retryinterval << " ago" << dendl; + if (failinterval > g_conf.ms_fail_interval) { + // give up + dout(0) << "fault giving up" << dendl; + state = STATE_CLOSED; + fail(); + } else if (retryinterval < g_conf.ms_retry_interval) { + // wait + now += (g_conf.ms_retry_interval - retryinterval); + dout(10) << "fault waiting until " << now << dendl; + cond.WaitUntil(lock, now); + dout(10) << "fault done waiting or woke up" << dendl; + } + } + last_attempt = now; + } + cond.Signal(); +} +void Rank::Pipe::fail() +{ + derr(10) << "fail" << dendl; + assert(lock.is_locked()); + cond.Signal(); - -/* register_entity - */ -Rank::EntityMessenger *Rank::register_entity(entity_name_t name) -{ - dout(10) << "register_entity " << name << dendl; + // deactivate myself + lock.Unlock(); + rank.lock.Lock(); + unregister_pipe(); + rank.lock.Unlock(); lock.Lock(); - - // create messenger - int erank = max_local; - EntityMessenger *msgr = new EntityMessenger(name, erank); - // add to directory - max_local++; - local.resize(max_local); - stopped.resize(max_local); + // report failures + q.splice(q.begin(), sent); + while (!q.empty()) { + Message *m = q.front(); + q.pop_front(); + unsigned srcrank = m->get_source_inst().addr.v.erank; + if (srcrank >= rank.max_local || rank.local[srcrank] == 0) { + dout(1) << "fail on " << *m << ", srcrank " << srcrank << " dne, dropping" << dendl; + delete m; + continue; + } + if (rank.local[srcrank]->is_stopped()) { + dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl; + delete m; + continue; + } + dout(10) << "fail on " << *m << dendl; + rank.local[srcrank]->get_dispatcher()->ms_handle_failure(m, m->get_dest_inst()); + } +} - local[erank] = msgr; - stopped[erank] = false; - msgr->my_addr = rank_addr; - msgr->my_addr.v.erank = erank; - dout(10) << "register_entity " << name << " at " << msgr->my_addr << dendl; - num_local++; - +void Rank::Pipe::dirty_close() +{ + dout(10) << "dirty_close" << dendl; + lock.Lock(); + state = STATE_CLOSING; + cond.Signal(); lock.Unlock(); - return msgr; } -void Rank::unregister_entity(EntityMessenger *msgr) +/* read msgs from socket. + * also, server. + */ +void Rank::Pipe::reader() { lock.Lock(); - dout(10) << "unregister_entity " << msgr->get_myname() << dendl; - - // remove from local directory. - local[msgr->my_rank] = 0; - stopped[msgr->my_rank] = true; - num_local--; - - wait_cond.Signal(); - lock.Unlock(); -} + if (state == STATE_ACCEPTING) + accept(); + // loop. + while (state != STATE_CLOSED) { + assert(lock.is_locked()); -void Rank::submit_message(Message *m, const entity_addr_t& dest_addr) -{ - const entity_name_t dest = m->get_dest(); + // sleep if (re)connecting + if (state == STATE_CONNECTING) { + dout(20) << "reader sleeping during reconnect" << dendl; + cond.Wait(lock); + continue; + } - // lookup - EntityMessenger *entity = 0; - Pipe *pipe = 0; + lock.Unlock(); - lock.Lock(); - { - // local? - if (ceph_entity_addr_is_local(dest_addr.v, rank_addr.v)) { - if (dest_addr.v.erank < max_local && local[dest_addr.v.erank]) { - // local - dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl; - entity = local[dest_addr.v.erank]; - } else { - derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map?" << dendl; - //assert(0); // hmpf, this is probably mds->mon beacon from newsyn. - } + char tag = -1; + dout(20) << "reader reading tag..." << dendl; + int rc = tcp_read(sd, (char*)&tag, 1); + if (rc <= 0) { + lock.Lock(); + dout(20) << "reader couldn't read tag" << dendl; + fault(); + continue; } - else { - // remote. - if (rank_pipe.count( dest_addr )) { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", already connected." << dendl; - // connected. - pipe = rank_pipe[ dest_addr ]; + + // open ... + if (tag == CEPH_MSGR_TAG_ACK) { + dout(20) << "reader got ACK" << dendl; + __u32 seq; + int rc = tcp_read( sd, (char*)&seq, sizeof(seq)); + lock.Lock(); + if (rc < 0) { + dout(20) << "reader couldn't read ack seq" << dendl; + fault(); } else { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", connecting." << dendl; - // not connected. - pipe = connect_rank( dest_addr ); + dout(15) << "reader got ack seq " << seq << dendl; + // trim sent list + while (!sent.empty() && + sent.front()->get_seq() <= seq) { + Message *m = sent.front(); + sent.pop_front(); + dout(10) << "reader got ack seq " + << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl; + delete m; + } } + continue; } - } - - // do it - if (entity) { - // local! - dout(20) << "submit_message " << *m << " dest " << dest << " local, queueing" << dendl; - entity->queue_message(m); - } - else if (pipe) { - // remote! - dout(20) << "submit_message " << *m << " dest " << dest << " remote, sending" << dendl; - pipe->send(m); - } - - lock.Unlock(); -} - - - + else if (tag == CEPH_MSGR_TAG_MSG) { + dout(20) << "reader got MSG" << dendl; + Message *m = read_message(); + if (!m) { + derr(10) << "reader read null message" << dendl; + lock.Lock(); + fault(); + continue; + } -void Rank::wait() -{ - lock.Lock(); - while (1) { - // reap dead pipes - reaper(); + // note received seq# + lock.Lock(); + if (m->get_seq() <= in_seq) { + dout(-10) << "reader got old message " + << m->get_seq() << " <= " << in_seq << " " << m << " " << *m + << " for " << m->get_dest() + << ", discarding" << dendl; + delete m; + continue; + } + in_seq++; + assert(in_seq == m->get_seq()); + cond.Signal(); // wake up writer, to ack this + lock.Unlock(); + + dout(10) << "reader got message " + << m->get_seq() << " " << m << " " << *m + << " for " << m->get_dest() << dendl; + + // deliver + EntityMessenger *entity = 0; + + rank.lock.Lock(); + { + unsigned erank = m->get_dest_inst().addr.v.erank; + if (erank < rank.max_local && rank.local[erank]) { + // find entity + entity = rank.local[erank]; + } else { + derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl; + } + } + rank.lock.Unlock(); + + if (entity) + entity->queue_message(m); // queue - if (num_local == 0) { - dout(10) << "wait: everything stopped" << dendl; - break; // everything stopped. - } else { - dout(10) << "wait: local still has " << local.size() << " items, waiting" << dendl; - } + lock.Lock(); + } - wait_cond.Wait(lock); - } - lock.Unlock(); - - // done! clean up. - dout(20) << "wait: stopping accepter thread" << dendl; - accepter.stop(); - dout(20) << "wait: stopped accepter thread" << dendl; - - // close+reap all pipes - lock.Lock(); - { - dout(10) << "wait: closing pipes" << dendl; - list toclose; - for (hash_map::iterator i = rank_pipe.begin(); - i != rank_pipe.end(); - i++) - toclose.push_back(i->second); - for (list::iterator i = toclose.begin(); - i != toclose.end(); - i++) { - (*i)->unregister(); - (*i)->close(); + else if (tag == CEPH_MSGR_TAG_CLOSE) { + dout(20) << "reader got CLOSE" << dendl; + lock.Lock(); + fault(); // treat as a fault; i.e. reconnect|close + continue; } - - reaper(); - dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl; - while (!pipes.empty()) { - wait_cond.Wait(lock); - reaper(); + else { + dout(0) << "reader bad tag " << (int)tag << dendl; + lock.Lock(); + fault(); } } - lock.Unlock(); - - dout(10) << "wait: done." << dendl; - dout(1) << "shutdown complete." << dendl; -} + + // reap? + bool reap = false; + reader_running = false; + if (!writer_running) reap = true; + lock.Unlock(); + if (reap) { + dout(20) << "reader queueing for reap" << dendl; + if (sd > 0) ::close(sd); + rank.lock.Lock(); + { + rank.pipe_reap_queue.push_back(this); + rank.wait_cond.Signal(); + } + rank.lock.Unlock(); + } +} -/********************************** - * EntityMessenger +/* write msgs to socket. + * also, client. */ - -void Rank::EntityMessenger::dispatch_entry() +void Rank::Pipe::writer() { lock.Lock(); - while (!stop) { - if (!dispatch_queue.empty() || !prio_dispatch_queue.empty()) { - list ls; - if (!prio_dispatch_queue.empty()) { - ls.swap(prio_dispatch_queue); - pqlen = 0; - } else { - if (0) { - ls.swap(dispatch_queue); - qlen = 0; - } else { - // limit how much low-prio stuff we grab, to avoid starving high-prio messages! - ls.push_back(dispatch_queue.front()); - dispatch_queue.pop_front(); - qlen--; + + while (state != STATE_CLOSED) { + // connect? + if (state == STATE_CONNECTING) { + connect(); + continue; + } + + if (state == STATE_CLOSING) { + // write close tag + dout(20) << "writer writing CLOSE tag" << dendl; + char c = CEPH_MSGR_TAG_CLOSE; + lock.Unlock(); + ::write(sd, &c, 1); + lock.Lock(); + state = STATE_CLOSED; + continue; + } + + if (state != STATE_CONNECTING && + (!q.empty() || in_seq > in_seq_acked)) { + + // send ack? + if (in_seq > in_seq_acked) { + int send_seq = in_seq; + lock.Unlock(); + int rc = write_ack(send_seq); + lock.Lock(); + if (rc < 0) { + dout(20) << "writer couldn't write ack" << dendl; + fault(); + continue; } + in_seq_acked = send_seq; } - lock.Unlock(); - { - // deliver - while (!ls.empty()) { - if (stop) { - dout(1) << "dispatch: stop=true, discarding " << ls.size() - << " messages in dispatch queue" << dendl; - break; - } - Message *m = ls.front(); - ls.pop_front(); - dout(1) << m->get_dest() - << " <== " << m->get_source_inst() - << " ==== " << *m - << " ==== " << m - << dendl; - dispatch(m); - dout(20) << "done calling dispatch on " << m << dendl; + // grab outgoing message + if (!q.empty()) { + Message *m = q.front(); + q.pop_front(); + sent.push_back(m); // move to sent list + lock.Unlock(); + dout(20) << "writer sending " << m->get_seq() << " " << m << " " << *m << dendl; + if (m->empty_payload()) m->encode_payload(); + int rc = write_message(m); + lock.Lock(); + + if (rc < 0) { + derr(1) << "writer error sending " << *m << " to " << m->get_dest() << ", " + << errno << ": " << strerror(errno) << dendl; + fault(); } } - lock.Lock(); continue; } + + // wait + dout(20) << "writer sleeping" << dendl; cond.Wait(lock); } - lock.Unlock(); + + dout(20) << "writer finishing" << dendl; - // deregister - rank.unregister_entity(this); -} + // reap? + bool reap = false; + writer_running = false; + if (!reader_running) reap = true; -void Rank::EntityMessenger::ready() -{ - dout(10) << "ready " << get_myaddr() << dendl; - assert(!dispatch_thread.is_started()); + lock.Unlock(); - // start my dispatch thread - dispatch_thread.create(); + if (reap) { + dout(20) << "writer queueing for reap" << dendl; + if (sd > 0) ::close(sd); + rank.lock.Lock(); + { + rank.pipe_reap_queue.push_back(this); + rank.wait_cond.Signal(); + } + rank.lock.Unlock(); + } } -int Rank::EntityMessenger::shutdown() +Message *Rank::Pipe::read_message() { - dout(10) << "shutdown " << get_myaddr() << dendl; + // envelope + //dout(10) << "receiver.read_message from sd " << sd << dendl; - // stop my dispatch thread - if (dispatch_thread.am_self()) { - dout(10) << "shutdown i am dispatch, setting stop flag" << dendl; - stop = true; - } else { - dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl; - lock.Lock(); - stop = true; - cond.Signal(); - lock.Unlock(); + ceph_message_header env; + if (!tcp_read( sd, (char*)&env, sizeof(env) )) + return 0; + + dout(20) << "reader got envelope type=" << env.type + << " src " << env.src << " dst " << env.dst + << " nchunks=" << env.nchunks + << dendl; + + // payload + bufferlist blist; + int32_t pos = 0; + list chunk_at; + for (unsigned i=0; iset_chunk_payload_at(chunk_at); + + dout(20) << "reader got " << s << " byte message from " + << m->get_source() << dendl; + + return m; } -void Rank::EntityMessenger::suicide() -{ - dout(10) << "suicide " << get_myaddr() << dendl; - shutdown(); - // hmm, or exit(0)? -} -void Rank::EntityMessenger::prepare_dest(const entity_addr_t& addr) +int Rank::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len) { - rank.lock.Lock(); - { - if (rank.rank_pipe.count(addr) == 0) - rank.connect_rank(addr); + while (len > 0) { + if (0) { // sanity + int l = 0; + for (unsigned i=0; imsg_iovlen; i++) + l += msg->msg_iov[i].iov_len; + assert(l == len); + } + + int r = ::sendmsg(sd, msg, 0); + if (r < 0) { + assert(r == -1); + dout(1) << "error on sendmsg " << strerror(errno) << dendl; + return -1; + } + len -= r; + if (len == 0) break; + + // hrmph. trim r bytes off the front of our message. + dout(20) << "partial sendmsg, did " << r << ", still have " << len << dendl; + while (r > 0) { + if (msg->msg_iov[0].iov_len <= (size_t)r) { + // lose this whole item + //dout(30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl; + r -= msg->msg_iov[0].iov_len; + msg->msg_iov++; + msg->msg_iovlen--; + } else { + // partial! + //dout(30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl; + msg->msg_iov[0].iov_base = (void*)((long)msg->msg_iov[0].iov_base + r); + msg->msg_iov[0].iov_len -= r; + break; + } + } } - rank.lock.Unlock(); + return 0; } -int Rank::EntityMessenger::send_message(Message *m, entity_inst_t dest) + +int Rank::Pipe::write_ack(unsigned seq) { - // set envelope - m->set_source(get_myname()); - m->set_source_addr(my_addr); - m->set_dest_inst(dest); - - dout(1) << m->get_source() - << " --> " << dest.name << " " << dest.addr - << " -- " << *m - << " -- " << m - << dendl; + dout(10) << "write_ack " << seq << dendl; - rank.submit_message(m, dest.addr); + char c = CEPH_MSGR_TAG_ACK; + __u32 s = seq;/*cpu_to_le32(seq);*/ + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[2]; + msgvec[0].iov_base = &c; + msgvec[0].iov_len = 1; + msgvec[1].iov_base = &s; + msgvec[1].iov_len = sizeof(s); + msg.msg_iov = msgvec; + msg.msg_iovlen = 2; + + if (do_sendmsg(sd, &msg, 5) < 0) + return -1; return 0; } - -void Rank::EntityMessenger::reset_myname(entity_name_t newname) +int Rank::Pipe::write_message(Message *m) { - entity_name_t oldname = get_myname(); - dout(10) << "reset_myname " << oldname << " to " << newname << dendl; - _set_myname(newname); -} + // get envelope, buffers + ceph_message_header *env = &m->get_envelope(); + bufferlist blist; + blist.claim( m->get_payload() ); + + // chunk out page aligned buffers? + if (blist.length() == 0) + env->nchunks = 0; + else { + env->nchunks = 1 + m->get_chunk_payload_at().size(); // header + explicit chunk points + if (!m->get_chunk_payload_at().empty()) + dout(20) << "chunking at " << m->get_chunk_payload_at() + << " in " << *m << " len " << blist.length() + << dendl; + } + dout(20) << "write_message " << m << " " << *m + << " to " << m->get_dest() + << " in " << env->nchunks + << dendl; + + // set up msghdr and iovecs + struct msghdr msg; + memset(&msg, 0, sizeof(msg)); + struct iovec msgvec[2 + blist.buffers().size() + env->nchunks*2]; // conservative upper bound + msg.msg_iov = msgvec; + int msglen = 0; + + // send tag + char tag = CEPH_MSGR_TAG_MSG; + msgvec[msg.msg_iovlen].iov_base = &tag; + msgvec[msg.msg_iovlen].iov_len = 1; + msglen++; + msg.msg_iovlen++; + // send envelope + msgvec[msg.msg_iovlen].iov_base = (char*)env; + msgvec[msg.msg_iovlen].iov_len = sizeof(*env); + msglen += sizeof(*env); + msg.msg_iovlen++; + + // payload + list::const_iterator pb = blist.buffers().begin(); + list::const_iterator pc = m->get_chunk_payload_at().begin(); + int b_off = 0; // carry-over buffer offset, if any + int bl_pos = 0; // blist pos + int nchunks = env->nchunks; + int32_t chunksizes[nchunks]; + for (int curchunk=0; curchunk < nchunks; curchunk++) { + // start a chunk + int32_t size = blist.length() - bl_pos; + if (pc != m->get_chunk_payload_at().end()) { + assert(*pc > bl_pos); + size = *pc - bl_pos; + dout(30) << "pos " << bl_pos << " explicit chunk at " << *pc << " size " << size << " of " << blist.length() << dendl; + pc++; + } + assert(size > 0); + dout(30) << "chunk " << curchunk << " pos " << bl_pos << " size " << size << dendl; -void Rank::EntityMessenger::mark_down(entity_addr_t a) -{ - rank.mark_down(a); -} + // chunk size + chunksizes[curchunk] = size; + msgvec[msg.msg_iovlen].iov_base = &chunksizes[curchunk]; + msgvec[msg.msg_iovlen].iov_len = sizeof(int32_t); + msglen += sizeof(int32_t); + msg.msg_iovlen++; -void Rank::mark_down(entity_addr_t addr) -{ - //if (my_rank == 0) return; // ugh.. rank0 already handles this stuff in the namer - lock.Lock(); - /* - if (entity_map.count(a) && - entity_map[a] > inst) { - dout(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << dendl; - derr(10) << "mark_down " << a << " inst " << inst << " < " << entity_map[a] << dendl; - // do nothing! - } else { - if (entity_map.count(a) == 0) { - // don't know it - dout(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << dendl; - derr(10) << "mark_down " << a << " inst " << inst << " ... unknown by me" << dendl; - } else { - // know it - assert(entity_map[a] <= inst); - dout(10) << "mark_down " << a << " inst " << inst << dendl; - derr(10) << "mark_down " << a << " inst " << inst << dendl; - - entity_map.erase(a); - - if (rank_pipe.count(inst)) { - rank_pipe[inst]->close(); - rank_pipe.erase(inst); + // chunk contents + int left = size; + while (left > 0) { + int donow = MIN(left, (int)pb->length()-b_off); + assert(donow > 0); + dout(30) << " bl_pos " << bl_pos << " b_off " << b_off + << " leftinchunk " << left + << " buffer len " << pb->length() + << " writing " << donow + << dendl; + + if (msg.msg_iovlen >= IOV_MAX-1) { + if (do_sendmsg(sd, &msg, msglen)) + return -1; + + // and restart the iov + msg.msg_iov = msgvec; + msg.msg_iovlen = 0; + msglen = 0; } + + msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()+b_off); + msgvec[msg.msg_iovlen].iov_len = donow; + msglen += donow; + msg.msg_iovlen++; + + left -= donow; + assert(left >= 0); + b_off += donow; + bl_pos += donow; + if (b_off != (int)pb->length()) + break; + pb++; + b_off = 0; } + assert(left == 0); } - */ - lock.Unlock(); + assert(pb == blist.buffers().end()); + + // send + if (do_sendmsg(sd, &msg, msglen)) + return -1; + + return 0; } diff --git a/trunk/ceph/msg/SimpleMessenger.h b/trunk/ceph/msg/SimpleMessenger.h index d1c8e72dd30a8..4ef9144e343ca 100644 --- a/trunk/ceph/msg/SimpleMessenger.h +++ b/trunk/ceph/msg/SimpleMessenger.h @@ -65,23 +65,39 @@ private: // pipe class Pipe { - protected: + public: + enum { + STATE_ACCEPTING, + STATE_CONNECTING, + STATE_OPEN, + STATE_CLOSED, + STATE_CLOSING + //STATE_GOTCLOSE, // got (but haven't sent) a close + //STATE_SENTCLOSE // sent (but haven't got) a close + }; + int sd; - bool done; + int new_sd; entity_addr_t peer_addr; - bool server; - bool need_to_send_close; + + Mutex lock; + int state; + + protected: + + utime_t first_fault; // time of original failure + utime_t last_attempt; // time of last reconnect attempt bool reader_running; bool writer_running; list q; list sent; - Mutex lock; Cond cond; - - int out_seq, out_acked; - int in_seq; + + __u32 connect_seq; + __u32 out_seq; + __u32 in_seq, in_seq_acked; int accept(); // server handshake int connect(); // client handshake @@ -90,8 +106,16 @@ private: Message *read_message(); int write_message(Message *m); - int do_sendmsg(Message *m, struct msghdr *msg, int len); - void fail(list& ls); + int do_sendmsg(int sd, struct msghdr *msg, int len); + int write_ack(unsigned s); + + void fault(); + void fail(); + + void take_queue(list& ls) { + ls.splice(ls.begin(), q); + ls.splice(ls.begin(), sent); + } // threads class Reader : public Thread { @@ -111,22 +135,19 @@ private: friend class Writer; public: - Pipe(int s) : sd(s), - done(false), server(true), - need_to_send_close(true), - reader_running(false), writer_running(false), - out_seq(0), out_acked(0), in_seq(0), - reader_thread(this), writer_thread(this) { - // server + Pipe(int st) : + sd(0), + state(st), + reader_running(false), writer_running(false), + connect_seq(0), + out_seq(0), in_seq(0), in_seq_acked(0), + reader_thread(this), writer_thread(this) { } + + void start_reader() { reader_running = true; reader_thread.create(); } - Pipe(const entity_addr_t &pi) : sd(0), - done(false), peer_addr(pi), server(false), - need_to_send_close(true), - reader_running(false), writer_running(false), - reader_thread(this), writer_thread(this) { - // client + void start_writer() { writer_running = true; writer_thread.create(); } @@ -137,24 +158,26 @@ private: entity_addr_t& get_peer_addr() { return peer_addr; } - void unregister(); - void close(); + void register_pipe(); + void unregister_pipe(); + void dirty_close(); void join() { if (writer_thread.is_started()) writer_thread.join(); - if (reader_thread.is_started()) reader_thread.join(); + if (reader_thread.is_started()) { + reader_thread.kill(SIGUSR1); + reader_thread.join(); + } } void send(Message *m) { lock.Lock(); - q.push_back(m); - cond.Signal(); + _send(m); lock.Unlock(); } - void send(list& ls) { - lock.Lock(); - q.splice(q.end(), ls); + void _send(Message *m) { + q.push_back(m); + m->set_seq(++out_seq); cond.Signal(); - lock.Unlock(); } void force_close() { diff --git a/trunk/ceph/msg/msg_types.h b/trunk/ceph/msg/msg_types.h index 7346c8947093a..a9e3ec4f970f8 100644 --- a/trunk/ceph/msg/msg_types.h +++ b/trunk/ceph/msg/msg_types.h @@ -108,25 +108,23 @@ struct entity_addr_t { memset(&v, 0, sizeof(v)); } - void set_addr(tcpaddr_t a) { - memcpy((char*)v.ipq, (char*)&a.sin_addr.s_addr, 4); - v.port = ntohs(a.sin_port); + void set_addr(sockaddr_in& a) { + memcpy((char*)&v.ipaddr, (char*)&a, sizeof(a)); } - void make_addr(tcpaddr_t& a) const { - memset(&a, 0, sizeof(a)); - a.sin_family = AF_INET; - memcpy((char*)&a.sin_addr.s_addr, (char*)v.ipq, 4); - a.sin_port = htons(v.port); + void make_addr(sockaddr_in& a) const { + memcpy((char*)&a, (char*)&v.ipaddr, sizeof(a)); + } + void set_port(int port) { + v.ipaddr.sin_port = htons(port); + } + int get_port() { + return ntohs(v.ipaddr.sin_port); } }; inline ostream& operator<<(ostream& out, const entity_addr_t &addr) { - return out << (int)addr.v.ipq[0] - << '.' << (int)addr.v.ipq[1] - << '.' << (int)addr.v.ipq[2] - << '.' << (int)addr.v.ipq[3] - << ':' << addr.v.port + return out << addr.v.ipaddr << '#' << addr.v.nonce << '@' << addr.v.erank; } diff --git a/trunk/ceph/msg/tcp.cc b/trunk/ceph/msg/tcp.cc index a131e3d6dd7dc..43fd27ab372ff 100644 --- a/trunk/ceph/msg/tcp.cc +++ b/trunk/ceph/msg/tcp.cc @@ -51,7 +51,7 @@ int tcp_write(int sd, char *buf, int len) } */ -int tcp_hostlookup(char *str, tcpaddr_t& ta) +int tcp_hostlookup(char *str, sockaddr_in& ta) { char *host = str; char *port = 0; diff --git a/trunk/ceph/msg/tcp.h b/trunk/ceph/msg/tcp.h index e234da400dfe4..a59cf2a8ac47f 100644 --- a/trunk/ceph/msg/tcp.h +++ b/trunk/ceph/msg/tcp.h @@ -8,11 +8,9 @@ #include #include -typedef struct sockaddr_in tcpaddr_t; - using std::ostream; -inline ostream& operator<<(ostream& out, const tcpaddr_t &a) +inline ostream& operator<<(ostream& out, const sockaddr_in &a) { unsigned char addr[4]; memcpy((char*)addr, (char*)&a.sin_addr.s_addr, 4); @@ -56,12 +54,12 @@ inline int tcp_write(int sd, const char *buf, int len) { } -extern int tcp_hostlookup(char *str, tcpaddr_t& ta); +extern int tcp_hostlookup(char *str, sockaddr_in& ta); -inline bool operator==(const tcpaddr_t& a, const tcpaddr_t& b) { +inline bool operator==(const sockaddr_in& a, const sockaddr_in& b) { return strncmp((const char*)&a, (const char*)&b, sizeof(a)) == 0; } -inline bool operator!=(const tcpaddr_t& a, const tcpaddr_t& b) { +inline bool operator!=(const sockaddr_in& a, const sockaddr_in& b) { return strncmp((const char*)&a, (const char*)&b, sizeof(a)) != 0; } -- 2.39.5