From f4826004a93b4b489cd34b3d0a0be7b1ce2954e8 Mon Sep 17 00:00:00 2001 From: Sage Weil Date: Tue, 3 Nov 2009 15:15:22 -0800 Subject: [PATCH] msgr: encode sockaddr.ss_family big endian in ceph_entity_addr The ss_family field is normally host endianness, but we want to exchange ceph_entity_addr across the wire and store it on disk. So, encode ss_family in big endian (to match the other sockaddr field endianness). Rev disk and wire protocols to match. --- src/include/msgr.h | 2 +- src/mds/mdstypes.h | 2 +- src/mon/Monitor.cc | 1 + src/mon/mon_types.h | 2 +- src/msg/SimpleMessenger.cc | 88 +++++++++++++++++------------ src/msg/msg_types.h | 112 +++++++++++++++++++++++-------------- src/msg/tcp.h | 4 +- src/osd/OSDMap.h | 2 +- src/osd/osd_types.h | 2 +- 9 files changed, 130 insertions(+), 85 deletions(-) diff --git a/src/include/msgr.h b/src/include/msgr.h index 9abc879e25b1b..8e3ea2eb1bf51 100644 --- a/src/include/msgr.h +++ b/src/include/msgr.h @@ -21,7 +21,7 @@ * whenever the wire protocol changes. try to keep this string length * constant. */ -#define CEPH_BANNER "ceph v022" +#define CEPH_BANNER "ceph v023" #define CEPH_BANNER_MAX_LEN 30 diff --git a/src/mds/mdstypes.h b/src/mds/mdstypes.h index 5598089964ddd..f08347657348e 100644 --- a/src/mds/mdstypes.h +++ b/src/mds/mdstypes.h @@ -20,7 +20,7 @@ using namespace std; #include -#define CEPH_FS_ONDISK_MAGIC "ceph fs volume v009" +#define CEPH_FS_ONDISK_MAGIC "ceph fs volume v010" //#define MDS_REF_SET // define me for improved debug output, sanity checking diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 1ba79b9486251..c9e9b819cecac 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -680,6 +680,7 @@ void Monitor::check_subs() void Monitor::check_sub(Subscription *sub) { + dout(0) << "check_sub monmap last " << sub->last << " have " << monmap->get_epoch() << dendl; if (sub->last < monmap->get_epoch()) { send_latest_monmap(sub->session->inst); if (sub->onetime) diff --git a/src/mon/mon_types.h b/src/mon/mon_types.h index d5f1b4b43005c..28c3b8496b408 100644 --- a/src/mon/mon_types.h +++ b/src/mon/mon_types.h @@ -37,6 +37,6 @@ inline const char *get_paxos_name(int p) { } } -#define CEPH_MON_ONDISK_MAGIC "ceph mon volume v011" +#define CEPH_MON_ONDISK_MAGIC "ceph mon volume v012" #endif diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index e463bb5d7e04b..58e87a671daad 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -97,7 +97,7 @@ int SimpleMessenger::Accepter::bind(int64_t force_nonce) rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr)); if (rc < 0) { char buf[80]; - derr(0) << "accepter.bind unable to bind to " << g_my_addr.in_addr() + derr(0) << "accepter.bind unable to bind to " << g_my_addr.ss_addr() << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; return -errno; } @@ -111,7 +111,7 @@ int SimpleMessenger::Accepter::bind(int64_t force_nonce) } if (rc < 0) { char buf[80]; - derr(0) << "accepter.bind unable to bind to " << g_my_addr.in_addr() + derr(0) << "accepter.bind unable to bind to " << g_my_addr.ss_addr() << " on any port in range " << CEPH_PORT_START << "-" << CEPH_PORT_LAST << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; return -errno; @@ -128,7 +128,7 @@ int SimpleMessenger::Accepter::bind(int64_t force_nonce) rc = ::listen(listen_sd, 128); if (rc < 0) { char buf[80]; - derr(0) << "accepter.bind unable to listen on " << g_my_addr.in_addr() + derr(0) << "accepter.bind unable to listen on " << g_my_addr.ss_addr() << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; return -errno; } @@ -142,11 +142,11 @@ int SimpleMessenger::Accepter::bind(int64_t force_nonce) if (rank->rank_addr.get_port() == 0) { rank->rank_addr.in4_addr() = listen_addr; if (force_nonce >= 0) - rank->rank_addr.v.nonce = force_nonce; + rank->rank_addr.nonce = force_nonce; else - rank->rank_addr.v.nonce = getpid(); // FIXME: pid might not be best choice here. + rank->rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here. } - rank->rank_addr.v.erank = 0; + rank->rank_addr.erank = 0; dout(1) << "accepter.bind rank_addr is " << rank->rank_addr << " need_addr=" << rank->need_addr << dendl; rank->did_bind = true; @@ -456,7 +456,7 @@ void SimpleMessenger::Endpoint::mark_down(entity_addr_t a) entity_addr_t SimpleMessenger::Endpoint::get_myaddr() { entity_addr_t a = rank->rank_addr; - a.v.erank = my_rank; + a.erank = my_rank; return a; } @@ -525,26 +525,24 @@ int SimpleMessenger::Pipe::accept() } // and my addr - rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr)); - if (rc < 0) { - dout(10) << "accept couldn't write my addr" << dendl; - state = STATE_CLOSED; - return -1; - } + bufferlist addrs; + ::encode(rank->rank_addr, addrs); // and peer's socket addr (they might not know their ip) entity_addr_t socket_addr; - socklen_t len = sizeof(socket_addr.in_addr()); - int r = ::getpeername(sd, (sockaddr*)&socket_addr.v.in_addr, &len); + socklen_t len = sizeof(socket_addr.ss_addr()); + int r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len); if (r < 0) { char buf[80]; dout(0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl; state = STATE_CLOSED; return -1; } - rc = tcp_write(sd, (char*)&socket_addr, sizeof(socket_addr)); + ::encode(socket_addr, addrs); + + rc = tcp_write(sd, addrs.c_str(), addrs.length()); if (rc < 0) { - dout(10) << "accept couldn't write peer addr" << dendl; + dout(10) << "accept couldn't write my+peer addr" << dendl; state = STATE_CLOSED; return -1; } @@ -565,17 +563,27 @@ int SimpleMessenger::Pipe::accept() state = STATE_CLOSED; return -1; } - rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr)); + bufferlist addrbl; + { + bufferptr tp(sizeof(peer_addr)); + addrbl.push_back(tp); + } + rc = tcp_read(sd, addrbl.c_str(), addrbl.length()); if (rc < 0) { dout(10) << "accept couldn't read peer_addr" << dendl; state = STATE_CLOSED; return -1; } + { + bufferlist::iterator ti = addrbl.begin(); + ::decode(peer_addr, ti); + } + dout(10) << "accept peer addr is " << peer_addr << dendl; if (peer_addr.is_blank_addr()) { // peer apparently doesn't know what ip they have; figure it out for them. int port = peer_addr.get_port(); - peer_addr.v.in_addr = socket_addr.v.in_addr; + peer_addr.addr = socket_addr.addr; peer_addr.set_port(port); dout(0) << "accept peer addr is really " << peer_addr << " (socket is " << socket_addr << ")" << dendl; @@ -826,6 +834,7 @@ int SimpleMessenger::Pipe::connect() char banner[strlen(CEPH_BANNER)]; entity_addr_t paddr; entity_addr_t peer_addr_for_me, socket_addr; + bufferlist addrbl, myaddrbl; // create socket? sd = ::socket(AF_INET, SOCK_STREAM, 0); @@ -841,7 +850,7 @@ int SimpleMessenger::Pipe::connect() // connect! dout(10) << "connecting to " << peer_addr << dendl; - rc = ::connect(sd, (sockaddr*)&peer_addr.v.in_addr, sizeof(peer_addr.v.in_addr)); + rc = ::connect(sd, (sockaddr*)&peer_addr.addr, sizeof(peer_addr.addr)); if (rc < 0) { dout(2) << "connect error " << peer_addr << ", " << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; @@ -880,11 +889,21 @@ int SimpleMessenger::Pipe::connect() } // identify peer - rc = tcp_read(sd, (char*)&paddr, sizeof(paddr)); + { + bufferptr p(sizeof(paddr) * 2); + addrbl.push_back(p); + } + rc = tcp_read(sd, addrbl.c_str(), addrbl.length()); if (rc < 0) { - dout(2) << "connect couldn't read peer addr, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + dout(2) << "connect couldn't read peer addrs, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } + { + bufferlist::iterator p = addrbl.begin(); + ::decode(paddr, p); + ::decode(peer_addr_for_me, p); + } + dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl; if (!peer_addr.is_local_to(paddr)) { if (paddr.is_blank_addr() && @@ -899,20 +918,16 @@ int SimpleMessenger::Pipe::connect() } } - // peer addr for me - rc = tcp_read(sd, (char*)&peer_addr_for_me, sizeof(peer_addr_for_me)); - if (rc < 0) { - dout(2) << "connect couldn't read peer addr for me, " << strerror_r(errno, buf, sizeof(buf)) << dendl; - goto fail; - } dout(20) << "connect peer addr for me is " << peer_addr_for_me << dendl; if (rank->need_addr) rank->learned_addr(peer_addr_for_me); + ::encode(rank->rank_addr, myaddrbl); + memset(&msg, 0, sizeof(msg)); - msgvec[0].iov_base = (char*)&rank->rank_addr; - msgvec[0].iov_len = sizeof(rank->rank_addr); + msgvec[0].iov_base = myaddrbl.c_str(); + msgvec[0].iov_len = myaddrbl.length(); msg.msg_iov = msgvec; msg.msg_iovlen = 1; msglen = msgvec[0].iov_len; @@ -1547,7 +1562,7 @@ Message *SimpleMessenger::Pipe::read_message() if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0) return 0; - + dout(20) << "reader got envelope type=" << header.type << " src " << header.src << " front=" << header.front_len @@ -1567,7 +1582,8 @@ Message *SimpleMessenger::Pipe::read_message() entity_addr_t srcaddr = header.src.addr; if (srcaddr.is_blank_addr()) { dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl; - header.orig_src.addr.in_addr = header.src.addr.in_addr = peer_addr.in_addr(); + ceph_entity_addr enc_peer_addr = peer_addr; + header.orig_src.addr.in_addr = header.src.addr.in_addr = enc_peer_addr.in_addr; } // read front @@ -1982,7 +1998,7 @@ int SimpleMessenger::start(bool nodaemon) } if (!did_bind) - rank_addr.v.nonce = getpid(); + rank_addr.nonce = getpid(); dout(1) << "rank.start" << dendl; started = true; @@ -2131,13 +2147,13 @@ void SimpleMessenger::unregister_entity(Endpoint *msgr) void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool lazy) { const entity_addr_t& dest_addr = dest.addr; - m->get_header().dst_erank = dest_addr.v.erank; + m->get_header().dst_erank = dest_addr.erank; assert(m->nref.test() == 0); // lookup entity_addr_t dest_proc_addr = dest_addr; - dest_proc_addr.v.erank = 0; + dest_proc_addr.erank = 0; lock.Lock(); { @@ -2313,7 +2329,7 @@ void SimpleMessenger::learned_addr(entity_addr_t peer_addr_for_me) { lock.Lock(); int port = rank_addr.get_port(); - rank_addr.in_addr() = peer_addr_for_me.in_addr(); + rank_addr.addr = peer_addr_for_me.addr; rank_addr.set_port(port); dout(1) << "learned my addr " << rank_addr << dendl; need_addr = false; diff --git a/src/msg/msg_types.h b/src/msg/msg_types.h index a9578701b7fc3..8cac27a974810 100644 --- a/src/msg/msg_types.h +++ b/src/msg/msg_types.h @@ -121,91 +121,114 @@ namespace __gnu_cxx { * thus identifies a particular process instance. * ipv4 for now. */ -WRITE_RAW_ENCODER(ceph_entity_addr) + +/* + * encode sockaddr.ss_family in big endian + */ +static inline void encode(const sockaddr_storage& a, bufferlist& bl) { + struct sockaddr_storage ss = a; + ss.ss_family = htons(ss.ss_family); + ::encode_raw(ss, bl); +} +static inline void decode(sockaddr_storage& a, bufferlist::iterator& bl) { + ::decode_raw(a, bl); + a.ss_family = ntohs(a.ss_family); +} struct entity_addr_t { - struct ceph_entity_addr v; + __u32 erank; + __u32 nonce; + sockaddr_storage addr; - entity_addr_t() { - memset(&v, 0, sizeof(v)); + entity_addr_t() : erank(0), nonce(0) { + memset(&addr, 0, sizeof(addr)); } entity_addr_t(const ceph_entity_addr &o) { - memcpy(&v, &o, sizeof(v)); + erank = o.erank; + nonce = o.nonce; + addr = o.in_addr; + addr.ss_family = ntohs(addr.ss_family); } - __u32 get_nonce() const { return v.nonce; } - void set_nonce(__u32 n) { v.nonce = n; } + __u32 get_nonce() const { return nonce; } + void set_nonce(__u32 n) { nonce = n; } - __u32 get_erank() const { return v.erank; } - void set_erank(__u32 r) { v.erank = r; } + __u32 get_erank() const { return erank; } + void set_erank(__u32 r) { erank = r; } - sockaddr_storage &in_addr() { - return *(sockaddr_storage *)&v.in_addr; + sockaddr_storage &ss_addr() { + return addr; } sockaddr_in &in4_addr() { - return *(sockaddr_in *)&v.in_addr; + return *(sockaddr_in *)&addr; } sockaddr_in6 &in6_addr() { - return *(sockaddr_in6 *)&v.in_addr; + return *(sockaddr_in6 *)&addr; } void set_in4_quad(int pos, int val) { - sockaddr_in *in4 = (sockaddr_in *)&v.in_addr; + sockaddr_in *in4 = (sockaddr_in *)&addr; in4->sin_family = AF_INET; unsigned char *ipq = (unsigned char*)&in4->sin_addr.s_addr; ipq[pos] = val; } void set_port(int port) { - switch (v.in_addr.ss_family) { + switch (addr.ss_family) { case AF_INET: - ((sockaddr_in *)&v.in_addr)->sin_port = htons(port); + ((sockaddr_in *)&addr)->sin_port = htons(port); break; case AF_INET6: - ((sockaddr_in6 *)&v.in_addr)->sin6_port = htons(port); + ((sockaddr_in6 *)&addr)->sin6_port = htons(port); break; default: assert(0); } } int get_port() const { - switch (v.in_addr.ss_family) { + switch (addr.ss_family) { case AF_INET: - return ntohs(((const sockaddr_in *)&v.in_addr)->sin_port); + return ntohs(((const sockaddr_in *)&addr)->sin_port); break; case AF_INET6: - return ntohs(((const sockaddr_in6 *)&v.in_addr)->sin6_port); + return ntohs(((const sockaddr_in6 *)&addr)->sin6_port); break; } return 0; } - operator ceph_entity_addr() const { return v; } + operator ceph_entity_addr() const { + ceph_entity_addr a; + a.erank = erank; + a.nonce = nonce; + a.in_addr = addr; + a.in_addr.ss_family = htons(addr.ss_family); + return a; + } - bool is_local_to(const entity_addr_t &other) const { - return ceph_entity_addr_is_local(&v, &other.v); + bool is_local_to(const entity_addr_t &o) const { + return nonce == o.nonce && + memcmp(&addr, &o.addr, sizeof(addr)) == 0; } - bool is_same_host(const entity_addr_t &other) const { - const ceph_entity_addr *a = &v; - const ceph_entity_addr *b = &other.v; - if (a->in_addr.ss_family != b->in_addr.ss_family) + bool is_same_host(const entity_addr_t &o) const { + if (addr.ss_family != o.addr.ss_family) return false; - if (a->in_addr.ss_family == AF_INET) - return ((struct sockaddr_in *)&a->in_addr)->sin_addr.s_addr == - ((struct sockaddr_in *)&b->in_addr)->sin_addr.s_addr; - if (a->in_addr.ss_family == AF_INET6) - return memcmp(((struct sockaddr_in6 *)&a->in_addr)->sin6_addr.s6_addr, - ((struct sockaddr_in6 *)&b->in_addr)->sin6_addr.s6_addr, - sizeof(((struct sockaddr_in6 *)&a->in_addr)->sin6_addr.s6_addr)); + if (addr.ss_family == AF_INET) + return ((struct sockaddr_in *)&addr)->sin_addr.s_addr == + ((struct sockaddr_in *)&o.addr)->sin_addr.s_addr; + if (addr.ss_family == AF_INET6) + return memcmp(((struct sockaddr_in6 *)&addr)->sin6_addr.s6_addr, + ((struct sockaddr_in6 *)&o.addr)->sin6_addr.s6_addr, + sizeof(((struct sockaddr_in6 *)&addr)->sin6_addr.s6_addr)); return false; } bool is_blank_addr() { - switch (v.in_addr.ss_family) { + switch (addr.ss_family) { case AF_INET: - return ((sockaddr_in *)&v.in_addr)->sin_addr.s_addr == INADDR_ANY; + return ((sockaddr_in *)&addr)->sin_addr.s_addr == INADDR_ANY; case AF_INET6: { - sockaddr_in6 *in6 = (sockaddr_in6 *)&v.in_addr; + sockaddr_in6 *in6 = (sockaddr_in6 *)&addr; return in6->sin6_addr.s6_addr32[0] == 0 && in6->sin6_addr.s6_addr32[1] == 0 && in6->sin6_addr.s6_addr32[2] == 0 && @@ -217,17 +240,21 @@ struct entity_addr_t { } void encode(bufferlist& bl) const { - ::encode(v, bl); + ::encode(erank, bl); + ::encode(nonce, bl); + ::encode(addr, bl); } void decode(bufferlist::iterator& bl) { - ::decode(v, bl); + ::decode(erank, bl); + ::decode(nonce, bl); + ::decode(addr, bl); } }; WRITE_CLASS_ENCODER(entity_addr_t) inline ostream& operator<<(ostream& out, const entity_addr_t &addr) { - return out << addr.v.in_addr << '/' << addr.v.nonce << '/' << addr.v.erank; + return out << addr.addr << '/' << addr.nonce << '/' << addr.erank; } inline bool operator==(const entity_addr_t& a, const entity_addr_t& b) { return memcmp(&a, &b, sizeof(a)) == 0; } @@ -243,7 +270,7 @@ namespace __gnu_cxx { size_t operator()( const entity_addr_t& x ) const { static blobhash H; - return H((const char*)&x.v, sizeof(x.v)); + return H((const char*)&x, sizeof(x)); } }; } @@ -308,7 +335,8 @@ inline ostream& operator<<(ostream& out, const entity_inst_t &i) } inline ostream& operator<<(ostream& out, const ceph_entity_inst &i) { - return out << *(const entity_inst_t*)&i; + entity_inst_t n = i; + return out << n; } diff --git a/src/msg/tcp.h b/src/msg/tcp.h index 6a0c60cee676f..f3ab052d76349 100644 --- a/src/msg/tcp.h +++ b/src/msg/tcp.h @@ -19,7 +19,7 @@ inline ostream& operator<<(ostream& out, const sockaddr_storage &ss) getnameinfo((struct sockaddr *)&ss, sizeof(ss), buf, sizeof(buf), serv, sizeof(serv), NI_NUMERICHOST | NI_NUMERICSERV); - return out << buf << ':' << serv; + return out << ss.ss_family << ":" << buf << ':' << serv; } inline ostream& operator<<(ostream& out, const sockaddr_in &ss) @@ -29,7 +29,7 @@ inline ostream& operator<<(ostream& out, const sockaddr_in &ss) getnameinfo((struct sockaddr *)&ss, sizeof(ss), buf, sizeof(buf), serv, sizeof(serv), NI_NUMERICHOST | NI_NUMERICSERV); - return out << buf << ':' << serv; + return out << ss.sin_family << ":" << buf << ':' << serv; } diff --git a/src/osd/OSDMap.h b/src/osd/OSDMap.h index be43c269f177c..637a57ac14d49 100644 --- a/src/osd/OSDMap.h +++ b/src/osd/OSDMap.h @@ -407,7 +407,7 @@ private: assert(exists(osd)); entity_inst_t i(entity_name_t::OSD(osd), osd_addr[osd]); - i.addr.v.erank = i.addr.v.erank + 1; // heartbeat addr erank is regular addr erank + 1 + i.addr.erank = i.addr.erank + 1; // heartbeat addr erank is regular addr erank + 1 return i; } diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index d4bce24ffe70e..2886314cafac4 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -24,7 +24,7 @@ -#define CEPH_OSD_ONDISK_MAGIC "ceph osd volume v024" +#define CEPH_OSD_ONDISK_MAGIC "ceph osd volume v025" #define CEPH_OSD_NEARFULL_RATIO .8 #define CEPH_OSD_FULL_RATIO .95 -- 2.39.5