From c533fac1ddf760a9d33b4845611c9c86cf8e294b Mon Sep 17 00:00:00 2001 From: sageweil Date: Tue, 3 Oct 2006 20:13:56 +0000 Subject: [PATCH] constifying entity_inst_t, starting to rework messenger lookup stuff... git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@906 29311d96-e01e-0410-9327-a35deaab8ce9 --- ceph/msg/Message.h | 16 +++---- ceph/msg/Messenger.h | 4 ++ ceph/msg/NewMessenger.cc | 98 +++++++++++++++++++++++++++++++++------- ceph/msg/NewMessenger.h | 6 ++- ceph/msg/tcp.h | 2 +- ceph/osd/OSD.cc | 16 +++---- ceph/osd/OSDMap.h | 6 ++- 7 files changed, 111 insertions(+), 37 deletions(-) diff --git a/ceph/msg/Message.h b/ceph/msg/Message.h index 4b64531a08c8a..aebaa9d3fafbb 100644 --- a/ceph/msg/Message.h +++ b/ceph/msg/Message.h @@ -289,14 +289,14 @@ class entity_inst_t { entity_inst_t(tcpaddr_t& a, int r) : addr(a), rank(r) {} }; -inline bool operator==(entity_inst_t& a, entity_inst_t& b) { return a.rank == b.rank && a.addr == b.addr; } -inline bool operator!=(entity_inst_t& a, entity_inst_t& b) { return !(a == b); } -inline bool operator>(entity_inst_t& a, entity_inst_t& b) { return a.rank > b.rank; } -inline bool operator>=(entity_inst_t& a, entity_inst_t& b) { return a.rank >= b.rank; } -inline bool operator<(entity_inst_t& a, entity_inst_t& b) { return a.rank < b.rank; } -inline bool operator<=(entity_inst_t& a, entity_inst_t& b) { return a.rank <= b.rank; } - -inline ostream& operator<<(ostream& out, entity_inst_t &i) +inline bool operator==(const entity_inst_t& a, const entity_inst_t& b) { return a.rank == b.rank && a.addr == b.addr; } +inline bool operator!=(const entity_inst_t& a, const entity_inst_t& b) { return !(a == b); } +inline bool operator>(const entity_inst_t& a, const entity_inst_t& b) { return a.rank > b.rank; } +inline bool operator>=(const entity_inst_t& a, const entity_inst_t& b) { return a.rank >= b.rank; } +inline bool operator<(const entity_inst_t& a, const entity_inst_t& b) { return a.rank < b.rank; } +inline bool operator<=(const entity_inst_t& a, const entity_inst_t& b) { return a.rank <= b.rank; } + +inline ostream& operator<<(ostream& out, const entity_inst_t &i) { return out << "rank" << i.rank << "_" << i.addr; } diff --git a/ceph/msg/Messenger.h b/ceph/msg/Messenger.h index d7bd4ea7c57b3..6ce29d3e00595 100644 --- a/ceph/msg/Messenger.h +++ b/ceph/msg/Messenger.h @@ -86,6 +86,10 @@ class Messenger { // send message virtual void prepare_send_message(msg_addr_t dest) {} virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0; + virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst) { + return send_message(m, dest); // overload me! + } + // make a procedure call virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0); diff --git a/ceph/msg/NewMessenger.cc b/ceph/msg/NewMessenger.cc index 1010886fb36cb..13fc21713edf6 100644 --- a/ceph/msg/NewMessenger.cc +++ b/ceph/msg/NewMessenger.cc @@ -904,7 +904,7 @@ int Rank::start_rank(tcpaddr_t& ns) /* connect_rank * NOTE: assumes rank.lock held. */ -Rank::Sender *Rank::connect_rank(entity_inst_t& inst) +Rank::Sender *Rank::connect_rank(const entity_inst_t& inst) { assert(rank.lock.is_locked()); assert(inst != rank.my_inst); @@ -1091,6 +1091,63 @@ void Rank::prepare_dest(msg_addr_t dest) lock.Unlock(); } +void Rank::submit_message(Message *m, const entity_inst_t& dest_inst) +{ + const msg_addr_t dest = m->get_dest(); + + // lookup + EntityMessenger *entity = 0; + Sender *sender = 0; + + lock.Lock(); + { + // local? + if (dest_inst.rank == my_inst.rank) { + if (local.count(dest)) { + // local + dout(20) << "submit_message " << *m << " dest " << dest << " local" << endl; + if (g_conf.ms_single_dispatch) { + _submit_single_dispatch(m); + } else { + entity = local[dest]; + } + } else { + // mid-register + dout(20) << "submit_message " << *m << " dest " << dest << " local but mid-register, waiting." << endl; + assert(0); + waiting_for_lookup[dest].push_back(m); + } + } + else { + // remote. + if (rank_sender.count( dest_inst.rank )) { + //&& + //rank_sender[dest_inst.rank]->inst == dest_inst) { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connected." << endl; + // connected. + sender = rank_sender[ dest_inst.rank ]; + } else { + dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connecting." << endl; + // not connected. + sender = connect_rank( dest_inst ); + } + } + } + lock.Unlock(); + + // do it + if (entity) { + // local! + dout(20) << "submit_message " << *m << " dest " << dest << " local, queueing" << endl; + entity->queue_message(m); + } + else if (sender) { + // remote! + dout(20) << "submit_message " << *m << " dest " << dest << " remote, sending" << endl; + sender->send(m); + } +} + void Rank::submit_message(Message *m) { @@ -1507,19 +1564,37 @@ void Rank::EntityMessenger::prepare_send_message(msg_addr_t dest) rank.prepare_dest(dest); } +int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst) +{ + // set envelope + m->set_source(get_myaddr(), 0); + m->set_dest(dest, 0); + + dout(1) << "--> " + << m->get_source() //<< ':' << m->get_source_port() + << " to " << m->get_dest() //<< ':' << m->get_dest_port() + << " ---- " << m->get_type_name() + << " ---- " << rank.my_inst << " --> " << inst + << " ---- " << m + << endl; + + rank.submit_message(m, inst); + + return 0; +} + int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport) { // set envelope m->set_source(get_myaddr(), fromport); m->set_dest(dest, port); - m->set_lamport_send_stamp( get_lamport() ); - + dout(1) << "--> " - << m->get_source() << ':' << m->get_source_port() - << " to " << m->get_dest() << ':' << m->get_dest_port() + << m->get_source() //<< ':' << m->get_source_port() + << " to " << m->get_dest() //<< ':' << m->get_dest_port() << " ---- " << m->get_type_name() - << " ---- " << rank.my_inst + << " ---- " << rank.my_inst << " --> ?" << " ---- " << m << endl; @@ -1609,14 +1684,3 @@ void Rank::mark_up(msg_addr_t a, entity_inst_t& i) lock.Unlock(); } -/*void Rank::EntityMessenger::reset(msg_addr_t a) -{ - assert(a != get_myaddr()); - if (rank.my_rank == 0) return; - rank.lock.Lock(); - rank.down.erase(a); - rank.reset_peer(a); - rank.lock.Unlock(); -} -*/ - diff --git a/ceph/msg/NewMessenger.h b/ceph/msg/NewMessenger.h index 387348aafb76e..a63114caeaaba 100644 --- a/ceph/msg/NewMessenger.h +++ b/ceph/msg/NewMessenger.h @@ -106,7 +106,7 @@ class Rank : public Dispatcher { Mutex lock; Cond cond; - Sender(entity_inst_t& i) : inst(i), done(false), sd(0) {} + Sender(const entity_inst_t& i) : inst(i), done(false), sd(0) {} virtual ~Sender() {} void *entry(); @@ -187,6 +187,7 @@ class Rank : public Dispatcher { virtual int shutdown(); virtual void prepare_send_message(msg_addr_t dest); virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0); + virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst); virtual void mark_down(msg_addr_t a, entity_inst_t& i); virtual void mark_up(msg_addr_t a, entity_inst_t& i); @@ -263,7 +264,7 @@ class Rank : public Dispatcher { void handle_register_ack(class MNSRegisterAck *m); void handle_lookup_reply(class MNSLookupReply *m); - Sender *connect_rank(entity_inst_t& inst); + Sender *connect_rank(const entity_inst_t& inst); void mark_down(msg_addr_t addr, entity_inst_t& i); void mark_up(msg_addr_t addr, entity_inst_t& i); @@ -285,6 +286,7 @@ public: EntityMessenger *register_entity(msg_addr_t addr); void unregister_entity(EntityMessenger *ms); + void submit_message(Message *m, const entity_inst_t& inst); void prepare_dest(msg_addr_t dest); void submit_message(Message *m); void submit_messages(list& ls); diff --git a/ceph/msg/tcp.h b/ceph/msg/tcp.h index aac9d98224d98..eb560a09c772c 100644 --- a/ceph/msg/tcp.h +++ b/ceph/msg/tcp.h @@ -8,7 +8,7 @@ typedef struct sockaddr_in tcpaddr_t; -inline ostream& operator<<(ostream& out, tcpaddr_t &a) +inline ostream& operator<<(ostream& out, const tcpaddr_t &a) { unsigned char addr[4]; memcpy((char*)addr, (char*)&a.sin_addr.s_addr, 4); diff --git a/ceph/osd/OSD.cc b/ceph/osd/OSD.cc index 4b2e0ff11656b..7476ad39df85a 100644 --- a/ceph/osd/OSD.cc +++ b/ceph/osd/OSD.cc @@ -67,7 +67,7 @@ #define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cerr << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " " char *osd_base_path = "./osddata"; -char *ebofs_base_path = "./ebofsdev"; +char *ebofs_base_path = "./dev"; @@ -123,9 +123,9 @@ OSD::OSD(int id, Messenger *m, char *dev) // init object store // try in this order: - // ebofsdev/$num - // ebofsdev/$hostname - // ebofsdev/all + // dev/osd$num + // dev/osd.$hostname + // dev/osd.all if (dev) { strcpy(dev_path,dev); @@ -134,14 +134,14 @@ OSD::OSD(int id, Messenger *m, char *dev) hostname[0] = 0; gethostname(hostname,100); - sprintf(dev_path, "%s/%d", ebofs_base_path, whoami); + sprintf(dev_path, "%s/osd%d", ebofs_base_path, whoami); struct stat sta; if (::lstat(dev_path, &sta) != 0) - sprintf(dev_path, "%s/%s", ebofs_base_path, hostname); + sprintf(dev_path, "%s/osd.%s", ebofs_base_path, hostname); if (::lstat(dev_path, &sta) != 0) - sprintf(dev_path, "%s/all", ebofs_base_path); + sprintf(dev_path, "%s/osd.all", ebofs_base_path); } if (g_conf.ebofs) { @@ -484,7 +484,7 @@ void OSD::heartbeat() i++) { _share_map_outgoing( MSG_ADDR_OSD(*i) ); messenger->send_message(new MOSDPing(osdmap->get_epoch()), - MSG_ADDR_OSD(*i)); + MSG_ADDR_OSD(*i), osdmap->get_osd_inst(*i)); } if (logger) logger->set("pingset", pingset.size()); diff --git a/ceph/osd/OSDMap.h b/ceph/osd/OSDMap.h index 97d31944ddc0f..b25e7933f231d 100644 --- a/ceph/osd/OSDMap.h +++ b/ceph/osd/OSDMap.h @@ -143,7 +143,11 @@ private: bool is_up(int osd) { return !is_down(osd); } bool is_out(int osd) { return out_osds.count(osd); } bool is_in(int osd) { return !is_out(osd); } - + + const entity_inst_t& get_inst(int osd) { + assert(osd_inst.count(osd)); + return osd_inst[osd]; + } bool get_inst(int osd, entity_inst_t& inst) { if (osd_inst.count(osd)) { inst = osd_inst[osd]; -- 2.39.5