From: Sage Weil Date: Thu, 27 Aug 2009 20:50:28 +0000 (-0700) Subject: msgr: remove 'dst' from ceph_msg_header X-Git-Tag: v0.14~105 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=094d3b6f8fbc9f28fb6dbc802f21c6b3ad5e25ef;p=ceph.git msgr: remove 'dst' from ceph_msg_header Shouldn't be there anyway. We do still need to keep erank in there though. --- diff --git a/src/cosd.cc b/src/cosd.cc index f96854a6726f..f67c54ca024e 100644 --- a/src/cosd.cc +++ b/src/cosd.cc @@ -125,6 +125,8 @@ int main(int argc, const char **argv) SimpleMessenger rank; rank.bind(); + rank.set_addr(mc.get_my_addr()); + cout << "starting osd" << whoami << " at " << rank.get_rank_addr() << " osd_data " << g_conf.osd_data diff --git a/src/include/msgr.h b/src/include/msgr.h index 50859cf4f47f..1764012f870d 100644 --- a/src/include/msgr.h +++ b/src/include/msgr.h @@ -21,7 +21,7 @@ * whenever the wire protocol changes. try to keep this string length * constant. */ -#define CEPH_BANNER "ceph 014\n" +#define CEPH_BANNER "ceph 015\n" #define CEPH_BANNER_MAX_LEN 30 @@ -133,7 +133,8 @@ struct ceph_msg_header { __u8 osd_protocol, osdc_protocol; /* internal and public */ __u8 mds_protocol, mdsc_protocol; - struct ceph_entity_inst src, orig_src, dst; + struct ceph_entity_inst src, orig_src; + __le32 dst_erank; __le32 crc; /* header crc32c */ } __attribute__ ((packed)); diff --git a/src/messages/MClientMountAck.h b/src/messages/MClientMountAck.h index fb5b7f0eaa66..37020df92e20 100644 --- a/src/messages/MClientMountAck.h +++ b/src/messages/MClientMountAck.h @@ -18,19 +18,20 @@ #include "msg/Message.h" struct MClientMountAck : public Message { + __s32 client; __s32 result; cstring result_msg; bufferlist monmap_bl; bufferlist signed_ticket; - MClientMountAck(int r = 0, const char *msg = 0) : + MClientMountAck(int c = -1, int r = 0, const char *msg = 0) : Message(CEPH_MSG_CLIENT_MOUNT_ACK), - result(r), + client(c), result(r), result_msg(msg) { } const char *get_type_name() { return "client_mount_ack"; } void print(ostream& o) { - o << "client_mount_ack(" << result; + o << "client_mount_ack(client" << client << " " << result; if (result_msg.length()) o << " " << result_msg; if (monmap_bl.length()) o << " + monmap"; if (signed_ticket.length()) o << " + ticket"; @@ -39,12 +40,14 @@ struct MClientMountAck : public Message { void decode_payload() { bufferlist::iterator p = payload.begin(); + ::decode(client, p); ::decode(result, p); ::decode(result_msg, p); ::decode(monmap_bl, p); ::decode(signed_ticket, p); } void encode_payload() { + ::encode(client, payload); ::encode(result, payload); ::encode(result_msg, payload); ::encode(monmap_bl, payload); diff --git a/src/messages/MClientReply.h b/src/messages/MClientReply.h index e07977348adf..899a55859392 100644 --- a/src/messages/MClientReply.h +++ b/src/messages/MClientReply.h @@ -202,7 +202,7 @@ public: } const char *get_type_name() { return "creply"; } void print(ostream& o) { - o << "client_reply(" << header.dst.name << ":" << head.tid; + o << "client_reply(???:" << head.tid; o << " = " << get_result(); if (get_result() <= 0) o << " " << strerror(-get_result()); diff --git a/src/messages/MMonMap.h b/src/messages/MMonMap.h index e6e3e326d63e..b2f5d966fc01 100644 --- a/src/messages/MMonMap.h +++ b/src/messages/MMonMap.h @@ -19,20 +19,25 @@ class MMonMap : public Message { public: + entity_addr_t addr; bufferlist monmapbl; MMonMap() : Message(CEPH_MSG_MON_MAP) { } - MMonMap(bufferlist &bl) : Message(CEPH_MSG_MON_MAP) { + MMonMap(entity_addr_t t, bufferlist &bl) : Message(CEPH_MSG_MON_MAP) { + addr = t; monmapbl.claim(bl); } const char *get_type_name() { return "mon_map"; } void encode_payload() { - payload = monmapbl; + ::encode(addr, payload); + ::encode(monmapbl, payload); } void decode_payload() { - monmapbl = payload; + bufferlist::iterator p = payload.begin(); + ::decode(addr, p); + ::decode(monmapbl, p); } }; diff --git a/src/messages/MOSDOpReply.h b/src/messages/MOSDOpReply.h index 55fa0e841cfb..f5d15677d094 100644 --- a/src/messages/MOSDOpReply.h +++ b/src/messages/MOSDOpReply.h @@ -55,9 +55,10 @@ class MOSDOpReply : public Message { // osdmap epoch_t get_map_epoch() { return head.osdmap_epoch; } - osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(), + /*osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(), head.client_inc, head.tid); } + */ public: MOSDOpReply(MOSDOp *req, __s32 result, epoch_t e, int acktype) : @@ -100,7 +101,7 @@ public: const char *get_type_name() { return "osd_op_reply"; } void print(ostream& out) { - out << "osd_op_reply(" << get_reqid() + out << "osd_op_reply(" << head.tid << " " << oid << " " << ops; if (may_write()) { if (is_ondisk()) diff --git a/src/mon/ClientMonitor.cc b/src/mon/ClientMonitor.cc index 3d3642f6d129..be9998027e1a 100644 --- a/src/mon/ClientMonitor.cc +++ b/src/mon/ClientMonitor.cc @@ -139,7 +139,7 @@ bool ClientMonitor::check_mount(MClientMount *m) string s; getline(ss, s); - mon->messenger->send_message(new MClientMountAck(-EPERM, s.c_str()), + mon->messenger->send_message(new MClientMountAck(-1, -EPERM, s.c_str()), m->get_orig_source_inst()); return true; } diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 03fd483f1a1a..25066ab3c0f1 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -174,6 +174,11 @@ void MonClient::handle_monmap(MMonMap *m) { dout(10) << "handle_monmap " << *m << dendl; monc_lock.Lock(); + + my_addr = m->addr; + messenger->_set_myaddr(m->addr); + dout(10) << " i am " << m->addr << dendl; + bufferlist::iterator p = m->monmapbl.begin(); ::decode(monmap, p); map_cond.Signal(); @@ -259,7 +264,7 @@ void MonClient::handle_mount_ack(MClientMountAck* m) p = signed_ticket.begin(); ::decode(ticket, p); - messenger->reset_myname(m->get_dest()); + messenger->reset_myname(entity_name_t::CLIENT(m->client)); mount_cond.Signal(); diff --git a/src/mon/MonClient.h b/src/mon/MonClient.h index c7e0ebdfb1c2..246ef7c36100 100644 --- a/src/mon/MonClient.h +++ b/src/mon/MonClient.h @@ -34,6 +34,8 @@ public: private: Messenger *messenger; + entity_addr_t my_addr; + ClientTicket ticket; bufferlist signed_ticket; @@ -87,6 +89,8 @@ private: } void pick_new_mon(); + entity_addr_t get_my_addr() { return my_addr; } + const ceph_fsid_t& get_fsid() { return monmap.fsid; } diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index f672105cea9d..5592cef386b1 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -360,7 +360,7 @@ bool Monitor::dispatch_impl(Message *m) ss << "client protocol v " << (int)m->get_header().monc_protocol << " != server v " << CEPH_MONC_PROTOCOL; string s; getline(ss, s); - messenger->send_message(new MClientMountAck(-EINVAL, s.c_str()), + messenger->send_message(new MClientMountAck(-1, -EINVAL, s.c_str()), m->get_orig_source_inst()); } @@ -477,7 +477,7 @@ void Monitor::handle_mon_get_map(MMonGetMap *m) dout(10) << "handle_mon_get_map" << dendl; bufferlist bl; monmap->encode(bl); - messenger->send_message(new MMonMap(bl), m->get_orig_source_inst()); + messenger->send_message(new MMonMap(m->get_orig_source_addr(), bl), m->get_orig_source_inst()); delete m; } diff --git a/src/msg/Dispatcher.h b/src/msg/Dispatcher.h index c2e42860e59f..3f06a54e7afe 100644 --- a/src/msg/Dispatcher.h +++ b/src/msg/Dispatcher.h @@ -64,18 +64,18 @@ public: } // how i deal with transmission failures. - virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) { } + virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) { } /* * on any connection reset. * this indicates that the ordered+reliable delivery semantics have * been violated. messages may have been lost. */ - virtual void ms_handle_reset(const entity_addr_t& peer, entity_name_t last) { } + virtual void ms_handle_reset(const entity_addr_t& peer) { } // on deliberate reset of connection by remote // implies incoming messages dropped; possibly/probably some of our previous outgoing too. - virtual void ms_handle_remote_reset(const entity_addr_t& peer, entity_name_t last) { } + virtual void ms_handle_remote_reset(const entity_addr_t& peer) { } }; #endif diff --git a/src/msg/Message.h b/src/msg/Message.h index 386df64b3bfd..1d89994e1021 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -222,10 +222,6 @@ public: void set_priority(__s16 p) { header.priority = p; } // source/dest - entity_inst_t get_dest_inst() { return entity_inst_t(header.dst); } - entity_name_t get_dest() { return entity_name_t(header.dst.name); } - void set_dest_inst(entity_inst_t& inst) { header.dst = inst; } - entity_inst_t get_source_inst() { return entity_inst_t(header.src); } entity_name_t get_source() { return entity_name_t(header.src.name); } entity_addr_t get_source_addr() { return entity_addr_t(header.src.addr); } diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 554bc9a12317..23377e758ad5 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -286,29 +286,26 @@ void SimpleMessenger::Endpoint::dispatch_entry() ls.pop_front(); if ((long)m == BAD_REMOTE_RESET) { lock.Lock(); - entity_addr_t a = remote_reset_q.front().first; - entity_name_t n = remote_reset_q.front().second; + entity_addr_t a = remote_reset_q.front(); remote_reset_q.pop_front(); lock.Unlock(); - get_dispatcher()->ms_handle_remote_reset(a, n); + get_dispatcher()->ms_handle_remote_reset(a); } else if ((long)m == BAD_RESET) { lock.Lock(); - entity_addr_t a = reset_q.front().first; - entity_name_t n = reset_q.front().second; + entity_addr_t a = reset_q.front(); reset_q.pop_front(); lock.Unlock(); - get_dispatcher()->ms_handle_reset(a, n); + get_dispatcher()->ms_handle_reset(a); } else if ((long)m == BAD_FAILED) { lock.Lock(); m = failed_q.front().first; - entity_inst_t i = failed_q.front().second; + entity_addr_t a = failed_q.front().second; failed_q.pop_front(); lock.Unlock(); - get_dispatcher()->ms_handle_failure(m, i); + get_dispatcher()->ms_handle_failure(m, a); m->put(); } else { - dout(1) << m->get_dest() - << " <== " << m->get_source_inst() + dout(1) << "<== " << m->get_source_inst() << " " << m->get_seq() << " ==== " << *m << " ==== " << m->get_payload().length() << "+" << m->get_middle().length() @@ -384,7 +381,6 @@ int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest) // set envelope m->set_source_inst(_myinst); m->set_orig_source_inst(_myinst); - m->set_dest_inst(dest); if (!m->get_priority()) m->set_priority(get_default_send_priority()); dout(1) << m->get_source() @@ -394,7 +390,7 @@ int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest) << " " << m << dendl; - rank->submit_message(m, dest.addr); + rank->submit_message(m, dest); return 0; } @@ -403,7 +399,6 @@ int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest) { // set envelope m->set_source_inst(_myinst); - m->set_dest_inst(dest); if (!m->get_priority()) m->set_priority(get_default_send_priority()); dout(1) << m->get_source() @@ -413,7 +408,7 @@ int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest) << " " << m << dendl; - rank->submit_message(m, dest.addr); + rank->submit_message(m, dest); return 0; } @@ -425,7 +420,6 @@ int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) // set envelope m->set_source_inst(_myinst); m->set_orig_source_inst(_myinst); - m->set_dest_inst(dest); if (!m->get_priority()) m->set_priority(get_default_send_priority()); dout(1) << "lazy " << m->get_source() @@ -435,7 +429,7 @@ int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest) << " " << m << dendl; - rank->submit_message(m, dest.addr, true); + rank->submit_message(m, dest, true); return 0; } @@ -1100,7 +1094,7 @@ void SimpleMessenger::Pipe::fail() for (unsigned i=0; ilocal.size(); i++) if (rank->local[i] && rank->local[i]->get_dispatcher()) - rank->local[i]->queue_reset(peer_addr, last_dest_name); + rank->local[i]->queue_reset(peer_addr); // unregister lock.Unlock(); @@ -1118,7 +1112,7 @@ void SimpleMessenger::Pipe::was_session_reset() report_failures(); for (unsigned i=0; ilocal.size(); i++) if (rank->local[i] && rank->local[i]->get_dispatcher()) - rank->local[i]->queue_remote_reset(peer_addr, last_dest_name); + rank->local[i]->queue_remote_reset(peer_addr); out_seq = 0; in_seq = 0; @@ -1142,7 +1136,7 @@ void SimpleMessenger::Pipe::report_failures() dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl; } else { dout(10) << "fail on " << *m << dendl; - rank->local[srcrank]->queue_failure(m, m->get_dest_inst()); + rank->local[srcrank]->queue_failure(m, peer_addr); } } m->put(); @@ -1248,7 +1242,6 @@ void SimpleMessenger::Pipe::reader() if (m->get_seq() <= in_seq) { dout(-10) << "reader got old message " << m->get_seq() << " <= " << in_seq << " " << m << " " << *m - << " for " << m->get_dest() << ", discarding" << dendl; delete m; continue; @@ -1271,20 +1264,21 @@ void SimpleMessenger::Pipe::reader() dout(10) << "reader got message " << m->get_seq() << " " << m << " " << *m - << " for " << m->get_dest() << dendl; + << dendl; // deliver Endpoint *entity = 0; rank->lock.Lock(); { - unsigned erank = m->get_dest_inst().addr.erank; + unsigned erank = m->get_header().dst_erank; if (erank < rank->max_local && rank->local[erank]) { // find entity entity = rank->local[erank]; entity->get(); // first message? + /* if (entity->need_addr) { entity->_set_myaddr(m->get_dest_inst().addr); dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl; @@ -1297,9 +1291,10 @@ void SimpleMessenger::Pipe::reader() dout(2) << "reader rank_addr is " << rank->rank_addr << dendl; rank->need_addr = false; } + */ } else { - derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl; + derr(0) << "reader got message " << *m << ", which isn't local" << dendl; } } rank->lock.Unlock(); @@ -1443,7 +1438,7 @@ void SimpleMessenger::Pipe::writer() lock.Lock(); if (rc < 0) { - derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", " + derr(1) << "writer error sending " << m << ", " << errno << ": " << strerror(errno) << dendl; fault(); } @@ -1497,7 +1492,7 @@ Message *SimpleMessenger::Pipe::read_message() return 0; dout(20) << "reader got envelope type=" << header.type - << " src " << header.src << " dst " << header.dst + << " src " << header.src << " front=" << header.front_len << " data=" << header.data_len << " off " << header.data_off @@ -1687,7 +1682,7 @@ int SimpleMessenger::Pipe::write_message(Message *m) blist.append(m->get_middle()); blist.append(m->get_data()); - dout(20) << "write_message " << m << " to " << header.dst << dendl; + dout(20) << "write_message " << m << dendl; // set up msghdr and iovecs struct msghdr msg; @@ -2011,9 +2006,10 @@ void SimpleMessenger::unregister_entity(Endpoint *msgr) } -void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy) +void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool lazy) { - const entity_name_t dest = m->get_dest(); + const entity_addr_t& dest_addr = dest.addr; + m->get_header().dst_erank = dest_addr.erank; assert(m->nref.test() == 0); @@ -2034,10 +2030,10 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, if (rank_addr.is_local_to(dest_addr)) { if (dest_addr.erank < max_local && local[dest_addr.erank]) { // local - dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl; + dout(20) << "submit_message " << *m << " local" << dendl; local[dest_addr.erank]->queue_message(m); } else { - derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl; + derr(0) << "submit_message " << *m << " " << dest_addr << " local but not in local map? dropping." << dendl; //assert(0); // hmpf, this is probably mds->mon beacon from newsyn. delete m; } @@ -2050,19 +2046,12 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, pipe = rank_pipe[ dest_proc_addr ]; pipe->lock.Lock(); if (pipe->state == Pipe::STATE_CLOSED) { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl; + dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl; pipe->unregister_pipe(); pipe->lock.Unlock(); pipe = 0; } else { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl; - - /* - // if this pipe was created by an incoming connection, but we haven't received - // a message yet, then it won't have the policy set. - if (pipe->get_out_seq() == 0) - pipe->policy = policy_map[m->get_dest().type()]; - */ + dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl; pipe->_send(m); pipe->lock.Unlock(); @@ -2070,12 +2059,12 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, } if (!pipe) { if (lazy) { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl; + dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", lazy, dropping." << dendl; delete m; } else { - dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl; + dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl; // not connected. - pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]); + pipe = connect_rank(dest_proc_addr, policy_map[dest.name.type()]); pipe->send(m); } } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 6af3fc9ade87..dda1be3057cc 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -129,7 +129,6 @@ private: int sd; int new_sd; entity_addr_t peer_addr; - entity_name_t last_dest_name; Policy policy; bool lossy_rx; @@ -244,7 +243,6 @@ private: void _send(Message *m) { m->get(); q[m->get_priority()].push_back(m); - last_dest_name = m->get_dest(); cond.Signal(); } Message *_get_next_outgoing() { @@ -311,28 +309,28 @@ private: } enum { BAD_REMOTE_RESET, BAD_RESET, BAD_FAILED }; - list > remote_reset_q; - list > reset_q; - list > failed_q; + list remote_reset_q; + list reset_q; + list > failed_q; - void queue_remote_reset(entity_addr_t a, entity_name_t n) { + void queue_remote_reset(entity_addr_t a) { lock.Lock(); - remote_reset_q.push_back(pair(a,n)); + remote_reset_q.push_back(a); dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET); cond.Signal(); lock.Unlock(); } - void queue_reset(entity_addr_t a, entity_name_t n) { + void queue_reset(entity_addr_t a) { lock.Lock(); - reset_q.push_back(pair(a,n)); + reset_q.push_back(a); dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET); cond.Signal(); lock.Unlock(); } - void queue_failure(Message *m, entity_inst_t i) { + void queue_failure(Message *m, entity_addr_t a) { lock.Lock(); m->get(); - failed_q.push_back(pair(m,i)); + failed_q.push_back(pair(m, a)); dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_FAILED); cond.Signal(); lock.Unlock(); @@ -438,11 +436,16 @@ public: return ++global_seq; } + void set_addr(entity_addr_t a) { + rank_addr = a; + need_addr = false; + } + Endpoint *register_entity(entity_name_t addr); void rename_entity(Endpoint *ms, entity_name_t newaddr); void unregister_entity(Endpoint *ms); - void submit_message(Message *m, const entity_addr_t& addr, bool lazy=false); + void submit_message(Message *m, const entity_inst_t& addr, bool lazy=false); void prepare_dest(const entity_inst_t& inst); // create a new messenger