sargs.pop_front();
}
if (a.length() == 0 || a == "~") {
- char s[20];
- sprintf(s,"/syn.%d.%d", client->whoami.v, seq);
+ char s[30];
+ sprintf(s,"/syn.%lld.%d", (long long)client->whoami.v, seq);
a = s;
}
return a;
want_state = MDSMap::STATE_BOOT;
beacon_start();
whoami = -1;
- messenger->reset_myname(entity_name_t::MDS(whoami));
+ messenger->set_myname(entity_name_t::MDS(whoami));
objecter->init();
if (oldwhoami != whoami) {
// update messenger.
dout(1) << "handle_mds_map i am now mds" << whoami << "." << incarnation << dendl;
- messenger->reset_myname(entity_name_t::MDS(whoami));
+ messenger->set_myname(entity_name_t::MDS(whoami));
// do i need an osdmap?
if (oldwhoami < 0) {
struct MClientMountAck : public Message {
__s64 client;
- entity_addr_t addr;
__s32 result;
cstring result_msg;
bufferlist monmap_bl;
void decode_payload() {
bufferlist::iterator p = payload.begin();
::decode(client, p);
- ::decode(addr, p);
::decode(result, p);
::decode(result_msg, p);
::decode(monmap_bl, p);
}
void encode_payload() {
::encode(client, payload);
- ::encode(addr, payload);
::encode(result, payload);
::encode(result_msg, payload);
::encode(monmap_bl, payload);
class MMonMap : public Message {
public:
- entity_addr_t addr;
bufferlist monmapbl;
MMonMap() : Message(CEPH_MSG_MON_MAP) { }
- MMonMap(entity_addr_t t, bufferlist &bl) : Message(CEPH_MSG_MON_MAP) {
- addr = t;
+ MMonMap(bufferlist &bl) : Message(CEPH_MSG_MON_MAP) {
monmapbl.claim(bl);
}
const char *get_type_name() { return "mon_map"; }
void encode_payload() {
- ::encode(addr, payload);
::encode(monmapbl, payload);
}
void decode_payload() {
bufferlist::iterator p = payload.begin();
- ::decode(addr, p);
::decode(monmapbl, p);
}
};
// reply with client ticket
MClientMountAck *ack = new MClientMountAck;
ack->client = client;
- ack->addr = to.addr;
mon->monmap->encode(ack->monmap_bl);
mon->send_reply(m, ack, to);
{
dout(10) << "dispatch " << *m << dendl;
+ if (my_addr == entity_addr_t())
+ my_addr = messenger->get_myaddr();
+
switch (m->get_type()) {
case CEPH_MSG_MON_MAP:
handle_monmap((MMonMap*)m);
dout(10) << "handle_monmap " << *m << dendl;
monc_lock.Lock();
- my_addr = m->addr;
- messenger->_set_myaddr(m->addr);
- dout(10) << " i am " << m->addr << dendl;
-
bufferlist::iterator p = m->monmapbl.begin();
::decode(monmap, p);
map_cond.Signal();
bufferlist::iterator p = m->monmap_bl.begin();
::decode(monmap, p);
- messenger->_set_myaddr(m->addr);
- messenger->reset_myname(entity_name_t::CLIENT(m->client));
+ messenger->set_myname(entity_name_t::CLIENT(m->client));
// finish.
timer.cancel_event(mount_timeout_event);
dout(10) << "handle_mon_get_map" << dendl;
bufferlist bl;
monmap->encode(bl);
- messenger->send_message(new MMonMap(m->get_orig_source_addr(), bl), m->get_orig_source_inst());
+ messenger->send_message(new MMonMap(bl), m->get_orig_source_inst());
delete m;
}
Dispatcher *dispatcher;
protected:
- entity_inst_t _myinst;
+ entity_name_t _my_name;
int default_send_priority;
atomic_t nref;
Messenger(entity_name_t w) : dispatcher(0),
default_send_priority(CEPH_MSG_PRIO_DEFAULT),
nref(1) {
- _myinst.name = w;
+ _my_name = w;
}
virtual ~Messenger() {
assert(nref.test() == 0);
}
// accessors
- entity_name_t get_myname() { return _myinst.name; }
- const entity_addr_t& get_myaddr() { return _myinst.addr; }
- const entity_inst_t& get_myinst() { return _myinst; }
+ entity_name_t get_myname() { return _my_name; }
+ virtual entity_addr_t get_myaddr() = 0;
+ entity_inst_t get_myinst() { return entity_inst_t(get_myname(), get_myaddr()); }
- void _set_myname(entity_name_t m) { _myinst.name = m; }
- virtual void _set_myaddr(entity_addr_t a) { _myinst.addr = a; }
- virtual void reset_myname(entity_name_t m) = 0;
+ void set_myname(entity_name_t m) { _my_name = m; }
void set_default_send_priority(int p) { default_send_priority = p; }
int get_default_send_priority() { return default_send_priority; }
}
rank->rank_addr.erank = 0;
- dout(1) << "accepter.bind rank_addr is " << rank->rank_addr
- << " need_addr=" << rank->need_addr
- << dendl;
+ dout(1) << "accepter.bind rank_addr is " << rank->rank_addr << " need_addr=" << rank->need_addr << dendl;
return 0;
}
int SimpleMessenger::Endpoint::send_message(Message *m, entity_inst_t dest)
{
// set envelope
- m->set_source_inst(_myinst);
- m->set_orig_source_inst(_myinst);
+ m->get_header().src = get_myinst();
+ m->get_header().orig_src = m->get_header().src;
+
if (!m->get_priority()) m->set_priority(get_default_send_priority());
- dout(1) << m->get_source()
+ dout(1) << m->get_source_inst()
<< " --> " << dest.name << " " << dest.addr
<< " -- " << *m
<< " -- ?+" << m->get_data().length()
int SimpleMessenger::Endpoint::forward_message(Message *m, entity_inst_t dest)
{
// set envelope
- m->set_source_inst(_myinst);
+ m->get_header().src = get_myinst();
+
if (!m->get_priority()) m->set_priority(get_default_send_priority());
dout(1) << m->get_source()
int SimpleMessenger::Endpoint::lazy_send_message(Message *m, entity_inst_t dest)
{
// set envelope
- m->set_source_inst(_myinst);
- m->set_orig_source_inst(_myinst);
+ m->get_header().src = get_myinst();
+ m->get_header().orig_src = m->get_header().src;
+
if (!m->get_priority()) m->set_priority(get_default_send_priority());
dout(1) << "lazy " << m->get_source()
-void SimpleMessenger::Endpoint::_set_myaddr(entity_addr_t a)
-{
- Messenger::_set_myaddr(a); // still call original
-
- dout(10) << "_set_myaddr " << a << dendl;
- rank->rank_addr.ipaddr = a.ipaddr;
-}
-
-void SimpleMessenger::Endpoint::reset_myname(entity_name_t newname)
+void SimpleMessenger::Endpoint::mark_down(entity_addr_t a)
{
- entity_name_t oldname = get_myname();
- dout(10) << "reset_myname " << oldname << " to " << newname << dendl;
- _set_myname(newname);
+ rank->mark_down(a);
}
-void SimpleMessenger::Endpoint::mark_down(entity_addr_t a)
+entity_addr_t SimpleMessenger::Endpoint::get_myaddr()
{
- rank->mark_down(a);
+ entity_addr_t a = rank->rank_addr;
+ a.erank = my_rank;
+ return a;
}
-
/**************************************
* Pipe
*/
}
dout(20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
- if (rank->need_addr) {
- rank->lock.Lock();
- entity_addr_t was = rank->rank_addr;
- rank->rank_addr.ipaddr = peer_addr_for_me.ipaddr;
- rank->rank_addr.ipaddr.sin_port = was.ipaddr.sin_port;
- dout(0) << "rank discovered i am " << rank->rank_addr
- << " (was " << was << ", peer says i am " << peer_addr_for_me << ")" << dendl;
- rank->need_addr = false;
- rank->lock.Unlock();
- }
+ if (rank->need_addr)
+ rank->learned_addr(peer_addr_for_me);
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = (char*)&rank->rank_addr;
// find entity
entity = rank->local[erank];
entity->get();
-
- // first message?
- /*
- if (entity->need_addr) {
- entity->_set_myaddr(m->get_dest_inst().addr);
- dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl;
- entity->need_addr = false;
- }
-
- if (rank->need_addr) {
- rank->rank_addr = m->get_dest_inst().addr;
- rank->rank_addr.erank = 0;
- dout(2) << "reader rank_addr is " << rank->rank_addr << dendl;
- rank->need_addr = false;
- }
- */
-
} else {
derr(0) << "reader got message " << *m << ", which isn't local" << dendl;
}
msgr->get();
local[erank] = msgr;
stopped[erank] = false;
- msgr->_myinst.addr = rank_addr;
- if (msgr->_myinst.addr.ipaddr == entity_addr_t().ipaddr)
- msgr->need_addr = true;
- msgr->_myinst.addr.erank = erank;
- dout(10) << "register_entity " << name << " at " << msgr->_myinst.addr
- << " need_addr=" << need_addr
- << dendl;
+ dout(10) << "register_entity " << name << " at " << msgr->get_myaddr() << dendl;
num_local++;
lock.Unlock();
}
+void SimpleMessenger::learned_addr(entity_addr_t peer_addr_for_me)
+{
+ lock.Lock();
+ entity_addr_t was = rank_addr;
+ rank_addr.ipaddr = peer_addr_for_me.ipaddr;
+ rank_addr.ipaddr.sin_port = was.ipaddr.sin_port;
+ dout(1) << "learned my addr " << rank_addr << dendl;
+ need_addr = false;
+ lock.Unlock();
+}
bool stop;
int qlen;
int my_rank;
- public:
- bool need_addr;
- private:
class DispatchThread : public Thread {
Endpoint *m;
public:
stop(false),
qlen(0),
my_rank(rn),
- need_addr(false),
dispatch_thread(this) { }
~Endpoint() { }
int get_dispatch_queue_len() { return qlen; }
- void _set_myaddr(entity_addr_t a);
- void reset_myname(entity_name_t m);
+ entity_addr_t get_myaddr();
+
int shutdown();
void suicide();
void prepare_dest(const entity_inst_t& inst);
void send_keepalive(const entity_inst_t& addr);
+ void learned_addr(entity_addr_t peer_addr_for_me);
+
// create a new messenger
Endpoint *new_entity(entity_name_t addr);