entity_inst_t(tcpaddr_t& a, int r) : addr(a), rank(r) {}
};
-inline bool operator==(entity_inst_t& a, entity_inst_t& b) { return a.rank == b.rank && a.addr == b.addr; }
-inline bool operator!=(entity_inst_t& a, entity_inst_t& b) { return !(a == b); }
-inline bool operator>(entity_inst_t& a, entity_inst_t& b) { return a.rank > b.rank; }
-inline bool operator>=(entity_inst_t& a, entity_inst_t& b) { return a.rank >= b.rank; }
-inline bool operator<(entity_inst_t& a, entity_inst_t& b) { return a.rank < b.rank; }
-inline bool operator<=(entity_inst_t& a, entity_inst_t& b) { return a.rank <= b.rank; }
-
-inline ostream& operator<<(ostream& out, entity_inst_t &i)
+inline bool operator==(const entity_inst_t& a, const entity_inst_t& b) { return a.rank == b.rank && a.addr == b.addr; }
+inline bool operator!=(const entity_inst_t& a, const entity_inst_t& b) { return !(a == b); }
+inline bool operator>(const entity_inst_t& a, const entity_inst_t& b) { return a.rank > b.rank; }
+inline bool operator>=(const entity_inst_t& a, const entity_inst_t& b) { return a.rank >= b.rank; }
+inline bool operator<(const entity_inst_t& a, const entity_inst_t& b) { return a.rank < b.rank; }
+inline bool operator<=(const entity_inst_t& a, const entity_inst_t& b) { return a.rank <= b.rank; }
+
+inline ostream& operator<<(ostream& out, const entity_inst_t &i)
{
return out << "rank" << i.rank << "_" << i.addr;
}
// send message
virtual void prepare_send_message(msg_addr_t dest) {}
virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0) = 0;
+ virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst) {
+ return send_message(m, dest); // overload me!
+ }
+
// make a procedure call
virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);
/* connect_rank
* NOTE: assumes rank.lock held.
*/
-Rank::Sender *Rank::connect_rank(entity_inst_t& inst)
+Rank::Sender *Rank::connect_rank(const entity_inst_t& inst)
{
assert(rank.lock.is_locked());
assert(inst != rank.my_inst);
lock.Unlock();
}
+void Rank::submit_message(Message *m, const entity_inst_t& dest_inst)
+{
+ const msg_addr_t dest = m->get_dest();
+
+ // lookup
+ EntityMessenger *entity = 0;
+ Sender *sender = 0;
+
+ lock.Lock();
+ {
+ // local?
+ if (dest_inst.rank == my_inst.rank) {
+ if (local.count(dest)) {
+ // local
+ dout(20) << "submit_message " << *m << " dest " << dest << " local" << endl;
+ if (g_conf.ms_single_dispatch) {
+ _submit_single_dispatch(m);
+ } else {
+ entity = local[dest];
+ }
+ } else {
+ // mid-register
+ dout(20) << "submit_message " << *m << " dest " << dest << " local but mid-register, waiting." << endl;
+ assert(0);
+ waiting_for_lookup[dest].push_back(m);
+ }
+ }
+ else {
+ // remote.
+ if (rank_sender.count( dest_inst.rank )) {
+ //&&
+ //rank_sender[dest_inst.rank]->inst == dest_inst) {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connected." << endl;
+ // connected.
+ sender = rank_sender[ dest_inst.rank ];
+ } else {
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_inst << ", connecting." << endl;
+ // not connected.
+ sender = connect_rank( dest_inst );
+ }
+ }
+ }
+ lock.Unlock();
+
+ // do it
+ if (entity) {
+ // local!
+ dout(20) << "submit_message " << *m << " dest " << dest << " local, queueing" << endl;
+ entity->queue_message(m);
+ }
+ else if (sender) {
+ // remote!
+ dout(20) << "submit_message " << *m << " dest " << dest << " remote, sending" << endl;
+ sender->send(m);
+ }
+}
+
void Rank::submit_message(Message *m)
{
rank.prepare_dest(dest);
}
+int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst)
+{
+ // set envelope
+ m->set_source(get_myaddr(), 0);
+ m->set_dest(dest, 0);
+
+ dout(1) << "--> "
+ << m->get_source() //<< ':' << m->get_source_port()
+ << " to " << m->get_dest() //<< ':' << m->get_dest_port()
+ << " ---- " << m->get_type_name()
+ << " ---- " << rank.my_inst << " --> " << inst
+ << " ---- " << m
+ << endl;
+
+ rank.submit_message(m, inst);
+
+ return 0;
+}
+
int Rank::EntityMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromport)
{
// set envelope
m->set_source(get_myaddr(), fromport);
m->set_dest(dest, port);
- m->set_lamport_send_stamp( get_lamport() );
-
+
dout(1) << "--> "
- << m->get_source() << ':' << m->get_source_port()
- << " to " << m->get_dest() << ':' << m->get_dest_port()
+ << m->get_source() //<< ':' << m->get_source_port()
+ << " to " << m->get_dest() //<< ':' << m->get_dest_port()
<< " ---- " << m->get_type_name()
- << " ---- " << rank.my_inst
+ << " ---- " << rank.my_inst << " --> ?"
<< " ---- " << m
<< endl;
lock.Unlock();
}
-/*void Rank::EntityMessenger::reset(msg_addr_t a)
-{
- assert(a != get_myaddr());
- if (rank.my_rank == 0) return;
- rank.lock.Lock();
- rank.down.erase(a);
- rank.reset_peer(a);
- rank.lock.Unlock();
-}
-*/
-
Mutex lock;
Cond cond;
- Sender(entity_inst_t& i) : inst(i), done(false), sd(0) {}
+ Sender(const entity_inst_t& i) : inst(i), done(false), sd(0) {}
virtual ~Sender() {}
void *entry();
virtual int shutdown();
virtual void prepare_send_message(msg_addr_t dest);
virtual int send_message(Message *m, msg_addr_t dest, int port=0, int fromport=0);
+ virtual int send_message(Message *m, msg_addr_t dest, const entity_inst_t& inst);
virtual void mark_down(msg_addr_t a, entity_inst_t& i);
virtual void mark_up(msg_addr_t a, entity_inst_t& i);
void handle_register_ack(class MNSRegisterAck *m);
void handle_lookup_reply(class MNSLookupReply *m);
- Sender *connect_rank(entity_inst_t& inst);
+ Sender *connect_rank(const entity_inst_t& inst);
void mark_down(msg_addr_t addr, entity_inst_t& i);
void mark_up(msg_addr_t addr, entity_inst_t& i);
EntityMessenger *register_entity(msg_addr_t addr);
void unregister_entity(EntityMessenger *ms);
+ void submit_message(Message *m, const entity_inst_t& inst);
void prepare_dest(msg_addr_t dest);
void submit_message(Message *m);
void submit_messages(list<Message*>& ls);
typedef struct sockaddr_in tcpaddr_t;
-inline ostream& operator<<(ostream& out, tcpaddr_t &a)
+inline ostream& operator<<(ostream& out, const tcpaddr_t &a)
{
unsigned char addr[4];
memcpy((char*)addr, (char*)&a.sin_addr.s_addr, 4);
#define derr(l) if (l<=g_conf.debug || l<=g_conf.debug_osd) cerr << g_clock.now() << " osd" << whoami << " " << (osdmap ? osdmap->get_epoch():0) << " "
char *osd_base_path = "./osddata";
-char *ebofs_base_path = "./ebofsdev";
+char *ebofs_base_path = "./dev";
// init object store
// try in this order:
- // ebofsdev/$num
- // ebofsdev/$hostname
- // ebofsdev/all
+ // dev/osd$num
+ // dev/osd.$hostname
+ // dev/osd.all
if (dev) {
strcpy(dev_path,dev);
hostname[0] = 0;
gethostname(hostname,100);
- sprintf(dev_path, "%s/%d", ebofs_base_path, whoami);
+ sprintf(dev_path, "%s/osd%d", ebofs_base_path, whoami);
struct stat sta;
if (::lstat(dev_path, &sta) != 0)
- sprintf(dev_path, "%s/%s", ebofs_base_path, hostname);
+ sprintf(dev_path, "%s/osd.%s", ebofs_base_path, hostname);
if (::lstat(dev_path, &sta) != 0)
- sprintf(dev_path, "%s/all", ebofs_base_path);
+ sprintf(dev_path, "%s/osd.all", ebofs_base_path);
}
if (g_conf.ebofs) {
i++) {
_share_map_outgoing( MSG_ADDR_OSD(*i) );
messenger->send_message(new MOSDPing(osdmap->get_epoch()),
- MSG_ADDR_OSD(*i));
+ MSG_ADDR_OSD(*i), osdmap->get_osd_inst(*i));
}
if (logger) logger->set("pingset", pingset.size());
bool is_up(int osd) { return !is_down(osd); }
bool is_out(int osd) { return out_osds.count(osd); }
bool is_in(int osd) { return !is_out(osd); }
-
+
+ const entity_inst_t& get_inst(int osd) {
+ assert(osd_inst.count(osd));
+ return osd_inst[osd];
+ }
bool get_inst(int osd, entity_inst_t& inst) {
if (osd_inst.count(osd)) {
inst = osd_inst[osd];