* whenever the wire protocol changes. try to keep this string length
* constant.
*/
-#define CEPH_BANNER "ceph v022"
+#define CEPH_BANNER "ceph v023"
#define CEPH_BANNER_MAX_LEN 30
#include <boost/pool/pool.hpp>
-#define CEPH_FS_ONDISK_MAGIC "ceph fs volume v009"
+#define CEPH_FS_ONDISK_MAGIC "ceph fs volume v010"
//#define MDS_REF_SET // define me for improved debug output, sanity checking
void Monitor::check_sub(Subscription *sub)
{
+ dout(0) << "check_sub monmap last " << sub->last << " have " << monmap->get_epoch() << dendl;
if (sub->last < monmap->get_epoch()) {
send_latest_monmap(sub->session->inst);
if (sub->onetime)
}
}
-#define CEPH_MON_ONDISK_MAGIC "ceph mon volume v011"
+#define CEPH_MON_ONDISK_MAGIC "ceph mon volume v012"
#endif
rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr, sizeof(listen_addr));
if (rc < 0) {
char buf[80];
- derr(0) << "accepter.bind unable to bind to " << g_my_addr.in_addr()
+ derr(0) << "accepter.bind unable to bind to " << g_my_addr.ss_addr()
<< ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
return -errno;
}
}
if (rc < 0) {
char buf[80];
- derr(0) << "accepter.bind unable to bind to " << g_my_addr.in_addr()
+ derr(0) << "accepter.bind unable to bind to " << g_my_addr.ss_addr()
<< " on any port in range " << CEPH_PORT_START << "-" << CEPH_PORT_LAST
<< ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
return -errno;
rc = ::listen(listen_sd, 128);
if (rc < 0) {
char buf[80];
- derr(0) << "accepter.bind unable to listen on " << g_my_addr.in_addr()
+ derr(0) << "accepter.bind unable to listen on " << g_my_addr.ss_addr()
<< ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
return -errno;
}
if (rank->rank_addr.get_port() == 0) {
rank->rank_addr.in4_addr() = listen_addr;
if (force_nonce >= 0)
- rank->rank_addr.v.nonce = force_nonce;
+ rank->rank_addr.nonce = force_nonce;
else
- rank->rank_addr.v.nonce = getpid(); // FIXME: pid might not be best choice here.
+ rank->rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
}
- rank->rank_addr.v.erank = 0;
+ rank->rank_addr.erank = 0;
dout(1) << "accepter.bind rank_addr is " << rank->rank_addr << " need_addr=" << rank->need_addr << dendl;
rank->did_bind = true;
entity_addr_t SimpleMessenger::Endpoint::get_myaddr()
{
entity_addr_t a = rank->rank_addr;
- a.v.erank = my_rank;
+ a.erank = my_rank;
return a;
}
}
// and my addr
- rc = tcp_write(sd, (char*)&rank->rank_addr, sizeof(rank->rank_addr));
- if (rc < 0) {
- dout(10) << "accept couldn't write my addr" << dendl;
- state = STATE_CLOSED;
- return -1;
- }
+ bufferlist addrs;
+ ::encode(rank->rank_addr, addrs);
// and peer's socket addr (they might not know their ip)
entity_addr_t socket_addr;
- socklen_t len = sizeof(socket_addr.in_addr());
- int r = ::getpeername(sd, (sockaddr*)&socket_addr.v.in_addr, &len);
+ socklen_t len = sizeof(socket_addr.ss_addr());
+ int r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len);
if (r < 0) {
char buf[80];
dout(0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
state = STATE_CLOSED;
return -1;
}
- rc = tcp_write(sd, (char*)&socket_addr, sizeof(socket_addr));
+ ::encode(socket_addr, addrs);
+
+ rc = tcp_write(sd, addrs.c_str(), addrs.length());
if (rc < 0) {
- dout(10) << "accept couldn't write peer addr" << dendl;
+ dout(10) << "accept couldn't write my+peer addr" << dendl;
state = STATE_CLOSED;
return -1;
}
state = STATE_CLOSED;
return -1;
}
- rc = tcp_read(sd, (char*)&peer_addr, sizeof(peer_addr));
+ bufferlist addrbl;
+ {
+ bufferptr tp(sizeof(peer_addr));
+ addrbl.push_back(tp);
+ }
+ rc = tcp_read(sd, addrbl.c_str(), addrbl.length());
if (rc < 0) {
dout(10) << "accept couldn't read peer_addr" << dendl;
state = STATE_CLOSED;
return -1;
}
+ {
+ bufferlist::iterator ti = addrbl.begin();
+ ::decode(peer_addr, ti);
+ }
+
dout(10) << "accept peer addr is " << peer_addr << dendl;
if (peer_addr.is_blank_addr()) {
// peer apparently doesn't know what ip they have; figure it out for them.
int port = peer_addr.get_port();
- peer_addr.v.in_addr = socket_addr.v.in_addr;
+ peer_addr.addr = socket_addr.addr;
peer_addr.set_port(port);
dout(0) << "accept peer addr is really " << peer_addr
<< " (socket is " << socket_addr << ")" << dendl;
char banner[strlen(CEPH_BANNER)];
entity_addr_t paddr;
entity_addr_t peer_addr_for_me, socket_addr;
+ bufferlist addrbl, myaddrbl;
// create socket?
sd = ::socket(AF_INET, SOCK_STREAM, 0);
// connect!
dout(10) << "connecting to " << peer_addr << dendl;
- rc = ::connect(sd, (sockaddr*)&peer_addr.v.in_addr, sizeof(peer_addr.v.in_addr));
+ rc = ::connect(sd, (sockaddr*)&peer_addr.addr, sizeof(peer_addr.addr));
if (rc < 0) {
dout(2) << "connect error " << peer_addr
<< ", " << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
}
// identify peer
- rc = tcp_read(sd, (char*)&paddr, sizeof(paddr));
+ {
+ bufferptr p(sizeof(paddr) * 2);
+ addrbl.push_back(p);
+ }
+ rc = tcp_read(sd, addrbl.c_str(), addrbl.length());
if (rc < 0) {
- dout(2) << "connect couldn't read peer addr, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
+ dout(2) << "connect couldn't read peer addrs, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
}
+ {
+ bufferlist::iterator p = addrbl.begin();
+ ::decode(paddr, p);
+ ::decode(peer_addr_for_me, p);
+ }
+
dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl;
if (!peer_addr.is_local_to(paddr)) {
if (paddr.is_blank_addr() &&
}
}
- // peer addr for me
- rc = tcp_read(sd, (char*)&peer_addr_for_me, sizeof(peer_addr_for_me));
- if (rc < 0) {
- dout(2) << "connect couldn't read peer addr for me, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
- goto fail;
- }
dout(20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
if (rank->need_addr)
rank->learned_addr(peer_addr_for_me);
+ ::encode(rank->rank_addr, myaddrbl);
+
memset(&msg, 0, sizeof(msg));
- msgvec[0].iov_base = (char*)&rank->rank_addr;
- msgvec[0].iov_len = sizeof(rank->rank_addr);
+ msgvec[0].iov_base = myaddrbl.c_str();
+ msgvec[0].iov_len = myaddrbl.length();
msg.msg_iov = msgvec;
msg.msg_iovlen = 1;
msglen = msgvec[0].iov_len;
if (tcp_read( sd, (char*)&header, sizeof(header) ) < 0)
return 0;
-
+
dout(20) << "reader got envelope type=" << header.type
<< " src " << header.src
<< " front=" << header.front_len
entity_addr_t srcaddr = header.src.addr;
if (srcaddr.is_blank_addr()) {
dout(10) << "reader munging src addr " << header.src << " to be " << peer_addr << dendl;
- header.orig_src.addr.in_addr = header.src.addr.in_addr = peer_addr.in_addr();
+ ceph_entity_addr enc_peer_addr = peer_addr;
+ header.orig_src.addr.in_addr = header.src.addr.in_addr = enc_peer_addr.in_addr;
}
// read front
}
if (!did_bind)
- rank_addr.v.nonce = getpid();
+ rank_addr.nonce = getpid();
dout(1) << "rank.start" << dendl;
started = true;
void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool lazy)
{
const entity_addr_t& dest_addr = dest.addr;
- m->get_header().dst_erank = dest_addr.v.erank;
+ m->get_header().dst_erank = dest_addr.erank;
assert(m->nref.test() == 0);
// lookup
entity_addr_t dest_proc_addr = dest_addr;
- dest_proc_addr.v.erank = 0;
+ dest_proc_addr.erank = 0;
lock.Lock();
{
{
lock.Lock();
int port = rank_addr.get_port();
- rank_addr.in_addr() = peer_addr_for_me.in_addr();
+ rank_addr.addr = peer_addr_for_me.addr;
rank_addr.set_port(port);
dout(1) << "learned my addr " << rank_addr << dendl;
need_addr = false;
* thus identifies a particular process instance.
* ipv4 for now.
*/
-WRITE_RAW_ENCODER(ceph_entity_addr)
+
+/*
+ * encode sockaddr.ss_family in big endian
+ */
+static inline void encode(const sockaddr_storage& a, bufferlist& bl) {
+ struct sockaddr_storage ss = a;
+ ss.ss_family = htons(ss.ss_family);
+ ::encode_raw(ss, bl);
+}
+static inline void decode(sockaddr_storage& a, bufferlist::iterator& bl) {
+ ::decode_raw(a, bl);
+ a.ss_family = ntohs(a.ss_family);
+}
struct entity_addr_t {
- struct ceph_entity_addr v;
+ __u32 erank;
+ __u32 nonce;
+ sockaddr_storage addr;
- entity_addr_t() {
- memset(&v, 0, sizeof(v));
+ entity_addr_t() : erank(0), nonce(0) {
+ memset(&addr, 0, sizeof(addr));
}
entity_addr_t(const ceph_entity_addr &o) {
- memcpy(&v, &o, sizeof(v));
+ erank = o.erank;
+ nonce = o.nonce;
+ addr = o.in_addr;
+ addr.ss_family = ntohs(addr.ss_family);
}
- __u32 get_nonce() const { return v.nonce; }
- void set_nonce(__u32 n) { v.nonce = n; }
+ __u32 get_nonce() const { return nonce; }
+ void set_nonce(__u32 n) { nonce = n; }
- __u32 get_erank() const { return v.erank; }
- void set_erank(__u32 r) { v.erank = r; }
+ __u32 get_erank() const { return erank; }
+ void set_erank(__u32 r) { erank = r; }
- sockaddr_storage &in_addr() {
- return *(sockaddr_storage *)&v.in_addr;
+ sockaddr_storage &ss_addr() {
+ return addr;
}
sockaddr_in &in4_addr() {
- return *(sockaddr_in *)&v.in_addr;
+ return *(sockaddr_in *)&addr;
}
sockaddr_in6 &in6_addr() {
- return *(sockaddr_in6 *)&v.in_addr;
+ return *(sockaddr_in6 *)&addr;
}
void set_in4_quad(int pos, int val) {
- sockaddr_in *in4 = (sockaddr_in *)&v.in_addr;
+ sockaddr_in *in4 = (sockaddr_in *)&addr;
in4->sin_family = AF_INET;
unsigned char *ipq = (unsigned char*)&in4->sin_addr.s_addr;
ipq[pos] = val;
}
void set_port(int port) {
- switch (v.in_addr.ss_family) {
+ switch (addr.ss_family) {
case AF_INET:
- ((sockaddr_in *)&v.in_addr)->sin_port = htons(port);
+ ((sockaddr_in *)&addr)->sin_port = htons(port);
break;
case AF_INET6:
- ((sockaddr_in6 *)&v.in_addr)->sin6_port = htons(port);
+ ((sockaddr_in6 *)&addr)->sin6_port = htons(port);
break;
default:
assert(0);
}
}
int get_port() const {
- switch (v.in_addr.ss_family) {
+ switch (addr.ss_family) {
case AF_INET:
- return ntohs(((const sockaddr_in *)&v.in_addr)->sin_port);
+ return ntohs(((const sockaddr_in *)&addr)->sin_port);
break;
case AF_INET6:
- return ntohs(((const sockaddr_in6 *)&v.in_addr)->sin6_port);
+ return ntohs(((const sockaddr_in6 *)&addr)->sin6_port);
break;
}
return 0;
}
- operator ceph_entity_addr() const { return v; }
+ operator ceph_entity_addr() const {
+ ceph_entity_addr a;
+ a.erank = erank;
+ a.nonce = nonce;
+ a.in_addr = addr;
+ a.in_addr.ss_family = htons(addr.ss_family);
+ return a;
+ }
- bool is_local_to(const entity_addr_t &other) const {
- return ceph_entity_addr_is_local(&v, &other.v);
+ bool is_local_to(const entity_addr_t &o) const {
+ return nonce == o.nonce &&
+ memcmp(&addr, &o.addr, sizeof(addr)) == 0;
}
- bool is_same_host(const entity_addr_t &other) const {
- const ceph_entity_addr *a = &v;
- const ceph_entity_addr *b = &other.v;
- if (a->in_addr.ss_family != b->in_addr.ss_family)
+ bool is_same_host(const entity_addr_t &o) const {
+ if (addr.ss_family != o.addr.ss_family)
return false;
- if (a->in_addr.ss_family == AF_INET)
- return ((struct sockaddr_in *)&a->in_addr)->sin_addr.s_addr ==
- ((struct sockaddr_in *)&b->in_addr)->sin_addr.s_addr;
- if (a->in_addr.ss_family == AF_INET6)
- return memcmp(((struct sockaddr_in6 *)&a->in_addr)->sin6_addr.s6_addr,
- ((struct sockaddr_in6 *)&b->in_addr)->sin6_addr.s6_addr,
- sizeof(((struct sockaddr_in6 *)&a->in_addr)->sin6_addr.s6_addr));
+ if (addr.ss_family == AF_INET)
+ return ((struct sockaddr_in *)&addr)->sin_addr.s_addr ==
+ ((struct sockaddr_in *)&o.addr)->sin_addr.s_addr;
+ if (addr.ss_family == AF_INET6)
+ return memcmp(((struct sockaddr_in6 *)&addr)->sin6_addr.s6_addr,
+ ((struct sockaddr_in6 *)&o.addr)->sin6_addr.s6_addr,
+ sizeof(((struct sockaddr_in6 *)&addr)->sin6_addr.s6_addr));
return false;
}
bool is_blank_addr() {
- switch (v.in_addr.ss_family) {
+ switch (addr.ss_family) {
case AF_INET:
- return ((sockaddr_in *)&v.in_addr)->sin_addr.s_addr == INADDR_ANY;
+ return ((sockaddr_in *)&addr)->sin_addr.s_addr == INADDR_ANY;
case AF_INET6:
{
- sockaddr_in6 *in6 = (sockaddr_in6 *)&v.in_addr;
+ sockaddr_in6 *in6 = (sockaddr_in6 *)&addr;
return in6->sin6_addr.s6_addr32[0] == 0 &&
in6->sin6_addr.s6_addr32[1] == 0 &&
in6->sin6_addr.s6_addr32[2] == 0 &&
}
void encode(bufferlist& bl) const {
- ::encode(v, bl);
+ ::encode(erank, bl);
+ ::encode(nonce, bl);
+ ::encode(addr, bl);
}
void decode(bufferlist::iterator& bl) {
- ::decode(v, bl);
+ ::decode(erank, bl);
+ ::decode(nonce, bl);
+ ::decode(addr, bl);
}
};
WRITE_CLASS_ENCODER(entity_addr_t)
inline ostream& operator<<(ostream& out, const entity_addr_t &addr)
{
- return out << addr.v.in_addr << '/' << addr.v.nonce << '/' << addr.v.erank;
+ return out << addr.addr << '/' << addr.nonce << '/' << addr.erank;
}
inline bool operator==(const entity_addr_t& a, const entity_addr_t& b) { return memcmp(&a, &b, sizeof(a)) == 0; }
size_t operator()( const entity_addr_t& x ) const
{
static blobhash H;
- return H((const char*)&x.v, sizeof(x.v));
+ return H((const char*)&x, sizeof(x));
}
};
}
}
inline ostream& operator<<(ostream& out, const ceph_entity_inst &i)
{
- return out << *(const entity_inst_t*)&i;
+ entity_inst_t n = i;
+ return out << n;
}
getnameinfo((struct sockaddr *)&ss, sizeof(ss), buf, sizeof(buf),
serv, sizeof(serv),
NI_NUMERICHOST | NI_NUMERICSERV);
- return out << buf << ':' << serv;
+ return out << ss.ss_family << ":" << buf << ':' << serv;
}
inline ostream& operator<<(ostream& out, const sockaddr_in &ss)
getnameinfo((struct sockaddr *)&ss, sizeof(ss), buf, sizeof(buf),
serv, sizeof(serv),
NI_NUMERICHOST | NI_NUMERICSERV);
- return out << buf << ':' << serv;
+ return out << ss.sin_family << ":" << buf << ':' << serv;
}
assert(exists(osd));
entity_inst_t i(entity_name_t::OSD(osd),
osd_addr[osd]);
- i.addr.v.erank = i.addr.v.erank + 1; // heartbeat addr erank is regular addr erank + 1
+ i.addr.erank = i.addr.erank + 1; // heartbeat addr erank is regular addr erank + 1
return i;
}
-#define CEPH_OSD_ONDISK_MAGIC "ceph osd volume v024"
+#define CEPH_OSD_ONDISK_MAGIC "ceph osd volume v025"
#define CEPH_OSD_NEARFULL_RATIO .8
#define CEPH_OSD_FULL_RATIO .95