SimpleMessenger rank;
rank.bind();
+ rank.set_addr(mc.get_my_addr());
+
cout << "starting osd" << whoami
<< " at " << rank.get_rank_addr()
<< " osd_data " << g_conf.osd_data
* whenever the wire protocol changes. try to keep this string length
* constant.
*/
-#define CEPH_BANNER "ceph 014\n"
+#define CEPH_BANNER "ceph 015\n"
#define CEPH_BANNER_MAX_LEN 30
__u8 osd_protocol, osdc_protocol; /* internal and public */
__u8 mds_protocol, mdsc_protocol;
- struct ceph_entity_inst src, orig_src, dst;
+ struct ceph_entity_inst src, orig_src;
+ __le32 dst_erank;
__le32 crc; /* header crc32c */
} __attribute__ ((packed));
#include "msg/Message.h"
struct MClientMountAck : public Message {
+ __s32 client;
__s32 result;
cstring result_msg;
bufferlist monmap_bl;
bufferlist signed_ticket;
- MClientMountAck(int r = 0, const char *msg = 0) :
+ MClientMountAck(int c = -1, int r = 0, const char *msg = 0) :
Message(CEPH_MSG_CLIENT_MOUNT_ACK),
- result(r),
+ client(c), result(r),
result_msg(msg) { }
const char *get_type_name() { return "client_mount_ack"; }
void print(ostream& o) {
- o << "client_mount_ack(" << result;
+ o << "client_mount_ack(client" << client << " " << result;
if (result_msg.length()) o << " " << result_msg;
if (monmap_bl.length()) o << " + monmap";
if (signed_ticket.length()) o << " + ticket";
void decode_payload() {
bufferlist::iterator p = payload.begin();
+ ::decode(client, p);
::decode(result, p);
::decode(result_msg, p);
::decode(monmap_bl, p);
::decode(signed_ticket, p);
}
void encode_payload() {
+ ::encode(client, payload);
::encode(result, payload);
::encode(result_msg, payload);
::encode(monmap_bl, payload);
}
const char *get_type_name() { return "creply"; }
void print(ostream& o) {
- o << "client_reply(" << header.dst.name << ":" << head.tid;
+ o << "client_reply(???:" << head.tid;
o << " = " << get_result();
if (get_result() <= 0)
o << " " << strerror(-get_result());
class MMonMap : public Message {
public:
+ entity_addr_t addr;
bufferlist monmapbl;
MMonMap() : Message(CEPH_MSG_MON_MAP) { }
- MMonMap(bufferlist &bl) : Message(CEPH_MSG_MON_MAP) {
+ MMonMap(entity_addr_t t, bufferlist &bl) : Message(CEPH_MSG_MON_MAP) {
+ addr = t;
monmapbl.claim(bl);
}
const char *get_type_name() { return "mon_map"; }
void encode_payload() {
- payload = monmapbl;
+ ::encode(addr, payload);
+ ::encode(monmapbl, payload);
}
void decode_payload() {
- monmapbl = payload;
+ bufferlist::iterator p = payload.begin();
+ ::decode(addr, p);
+ ::decode(monmapbl, p);
}
};
// osdmap
epoch_t get_map_epoch() { return head.osdmap_epoch; }
- osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(),
+ /*osd_reqid_t get_reqid() { return osd_reqid_t(get_dest(),
head.client_inc,
head.tid); }
+ */
public:
MOSDOpReply(MOSDOp *req, __s32 result, epoch_t e, int acktype) :
const char *get_type_name() { return "osd_op_reply"; }
void print(ostream& out) {
- out << "osd_op_reply(" << get_reqid()
+ out << "osd_op_reply(" << head.tid
<< " " << oid << " " << ops;
if (may_write()) {
if (is_ondisk())
string s;
getline(ss, s);
- mon->messenger->send_message(new MClientMountAck(-EPERM, s.c_str()),
+ mon->messenger->send_message(new MClientMountAck(-1, -EPERM, s.c_str()),
m->get_orig_source_inst());
return true;
}
{
dout(10) << "handle_monmap " << *m << dendl;
monc_lock.Lock();
+
+ my_addr = m->addr;
+ messenger->_set_myaddr(m->addr);
+ dout(10) << " i am " << m->addr << dendl;
+
bufferlist::iterator p = m->monmapbl.begin();
::decode(monmap, p);
map_cond.Signal();
p = signed_ticket.begin();
::decode(ticket, p);
- messenger->reset_myname(m->get_dest());
+ messenger->reset_myname(entity_name_t::CLIENT(m->client));
mount_cond.Signal();
private:
Messenger *messenger;
+ entity_addr_t my_addr;
+
ClientTicket ticket;
bufferlist signed_ticket;
}
void pick_new_mon();
+ entity_addr_t get_my_addr() { return my_addr; }
+
const ceph_fsid_t& get_fsid() {
return monmap.fsid;
}
ss << "client protocol v " << (int)m->get_header().monc_protocol << " != server v " << CEPH_MONC_PROTOCOL;
string s;
getline(ss, s);
- messenger->send_message(new MClientMountAck(-EINVAL, s.c_str()),
+ messenger->send_message(new MClientMountAck(-1, -EINVAL, s.c_str()),
m->get_orig_source_inst());
}
dout(10) << "handle_mon_get_map" << dendl;
bufferlist bl;
monmap->encode(bl);
- messenger->send_message(new MMonMap(bl), m->get_orig_source_inst());
+ messenger->send_message(new MMonMap(m->get_orig_source_addr(), bl), m->get_orig_source_inst());
delete m;
}
}
// how i deal with transmission failures.
- virtual void ms_handle_failure(Message *m, const entity_inst_t& inst) { }
+ virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) { }
/*
* on any connection reset.
* this indicates that the ordered+reliable delivery semantics have
* been violated. messages may have been lost.
*/
- virtual void ms_handle_reset(const entity_addr_t& peer, entity_name_t last) { }
+ virtual void ms_handle_reset(const entity_addr_t& peer) { }
// on deliberate reset of connection by remote
// implies incoming messages dropped; possibly/probably some of our previous outgoing too.
- virtual void ms_handle_remote_reset(const entity_addr_t& peer, entity_name_t last) { }
+ virtual void ms_handle_remote_reset(const entity_addr_t& peer) { }
};
#endif
void set_priority(__s16 p) { header.priority = p; }
// source/dest
- entity_inst_t get_dest_inst() { return entity_inst_t(header.dst); }
- entity_name_t get_dest() { return entity_name_t(header.dst.name); }
- void set_dest_inst(entity_inst_t& inst) { header.dst = inst; }
-
entity_inst_t get_source_inst() { return entity_inst_t(header.src); }
entity_name_t get_source() { return entity_name_t(header.src.name); }
entity_addr_t get_source_addr() { return entity_addr_t(header.src.addr); }
ls.pop_front();
if ((long)m == BAD_REMOTE_RESET) {
lock.Lock();
- entity_addr_t a = remote_reset_q.front().first;
- entity_name_t n = remote_reset_q.front().second;
+ entity_addr_t a = remote_reset_q.front();
remote_reset_q.pop_front();
lock.Unlock();
- get_dispatcher()->ms_handle_remote_reset(a, n);
+ get_dispatcher()->ms_handle_remote_reset(a);
} else if ((long)m == BAD_RESET) {
lock.Lock();
- entity_addr_t a = reset_q.front().first;
- entity_name_t n = reset_q.front().second;
+ entity_addr_t a = reset_q.front();
reset_q.pop_front();
lock.Unlock();
- get_dispatcher()->ms_handle_reset(a, n);
+ get_dispatcher()->ms_handle_reset(a);
} else if ((long)m == BAD_FAILED) {
lock.Lock();
m = failed_q.front().first;
- entity_inst_t i = failed_q.front().second;
+ entity_addr_t a = failed_q.front().second;
failed_q.pop_front();
lock.Unlock();
- get_dispatcher()->ms_handle_failure(m, i);
+ get_dispatcher()->ms_handle_failure(m, a);
m->put();
} else {
- dout(1) << m->get_dest()
- << " <== " << m->get_source_inst()
+ dout(1) << "<== " << m->get_source_inst()
<< " " << m->get_seq()
<< " ==== " << *m
<< " ==== " << m->get_payload().length() << "+" << m->get_middle().length()
// set envelope
m->set_source_inst(_myinst);
m->set_orig_source_inst(_myinst);
- m->set_dest_inst(dest);
if (!m->get_priority()) m->set_priority(get_default_send_priority());
dout(1) << m->get_source()
<< " " << m
<< dendl;
- rank->submit_message(m, dest.addr);
+ rank->submit_message(m, dest);
return 0;
}
{
// set envelope
m->set_source_inst(_myinst);
- m->set_dest_inst(dest);
if (!m->get_priority()) m->set_priority(get_default_send_priority());
dout(1) << m->get_source()
<< " " << m
<< dendl;
- rank->submit_message(m, dest.addr);
+ rank->submit_message(m, dest);
return 0;
}
// set envelope
m->set_source_inst(_myinst);
m->set_orig_source_inst(_myinst);
- m->set_dest_inst(dest);
if (!m->get_priority()) m->set_priority(get_default_send_priority());
dout(1) << "lazy " << m->get_source()
<< " " << m
<< dendl;
- rank->submit_message(m, dest.addr, true);
+ rank->submit_message(m, dest, true);
return 0;
}
for (unsigned i=0; i<rank->local.size(); i++)
if (rank->local[i] && rank->local[i]->get_dispatcher())
- rank->local[i]->queue_reset(peer_addr, last_dest_name);
+ rank->local[i]->queue_reset(peer_addr);
// unregister
lock.Unlock();
report_failures();
for (unsigned i=0; i<rank->local.size(); i++)
if (rank->local[i] && rank->local[i]->get_dispatcher())
- rank->local[i]->queue_remote_reset(peer_addr, last_dest_name);
+ rank->local[i]->queue_remote_reset(peer_addr);
out_seq = 0;
in_seq = 0;
dout(1) << "fail on " << *m << ", dispatcher stopping, ignoring." << dendl;
} else {
dout(10) << "fail on " << *m << dendl;
- rank->local[srcrank]->queue_failure(m, m->get_dest_inst());
+ rank->local[srcrank]->queue_failure(m, peer_addr);
}
}
m->put();
if (m->get_seq() <= in_seq) {
dout(-10) << "reader got old message "
<< m->get_seq() << " <= " << in_seq << " " << m << " " << *m
- << " for " << m->get_dest()
<< ", discarding" << dendl;
delete m;
continue;
dout(10) << "reader got message "
<< m->get_seq() << " " << m << " " << *m
- << " for " << m->get_dest() << dendl;
+ << dendl;
// deliver
Endpoint *entity = 0;
rank->lock.Lock();
{
- unsigned erank = m->get_dest_inst().addr.erank;
+ unsigned erank = m->get_header().dst_erank;
if (erank < rank->max_local && rank->local[erank]) {
// find entity
entity = rank->local[erank];
entity->get();
// first message?
+ /*
if (entity->need_addr) {
entity->_set_myaddr(m->get_dest_inst().addr);
dout(2) << "reader entity addr is " << entity->get_myaddr() << dendl;
dout(2) << "reader rank_addr is " << rank->rank_addr << dendl;
rank->need_addr = false;
}
+ */
} else {
- derr(0) << "reader got message " << *m << " for " << m->get_dest() << ", which isn't local" << dendl;
+ derr(0) << "reader got message " << *m << ", which isn't local" << dendl;
}
}
rank->lock.Unlock();
lock.Lock();
if (rc < 0) {
- derr(1) << "writer error sending " << m << " to " << m->get_header().dst << ", "
+ derr(1) << "writer error sending " << m << ", "
<< errno << ": " << strerror(errno) << dendl;
fault();
}
return 0;
dout(20) << "reader got envelope type=" << header.type
- << " src " << header.src << " dst " << header.dst
+ << " src " << header.src
<< " front=" << header.front_len
<< " data=" << header.data_len
<< " off " << header.data_off
blist.append(m->get_middle());
blist.append(m->get_data());
- dout(20) << "write_message " << m << " to " << header.dst << dendl;
+ dout(20) << "write_message " << m << dendl;
// set up msghdr and iovecs
struct msghdr msg;
}
-void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, bool lazy)
+void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool lazy)
{
- const entity_name_t dest = m->get_dest();
+ const entity_addr_t& dest_addr = dest.addr;
+ m->get_header().dst_erank = dest_addr.erank;
assert(m->nref.test() == 0);
if (rank_addr.is_local_to(dest_addr)) {
if (dest_addr.erank < max_local && local[dest_addr.erank]) {
// local
- dout(20) << "submit_message " << *m << " dest " << dest << " local" << dendl;
+ dout(20) << "submit_message " << *m << " local" << dendl;
local[dest_addr.erank]->queue_message(m);
} else {
- derr(0) << "submit_message " << *m << " dest " << dest << " " << dest_addr << " local but not in local map? dropping." << dendl;
+ derr(0) << "submit_message " << *m << " " << dest_addr << " local but not in local map? dropping." << dendl;
//assert(0); // hmpf, this is probably mds->mon beacon from newsyn.
delete m;
}
pipe = rank_pipe[ dest_proc_addr ];
pipe->lock.Lock();
if (pipe->state == Pipe::STATE_CLOSED) {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
+ dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
pipe->unregister_pipe();
pipe->lock.Unlock();
pipe = 0;
} else {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
-
- /*
- // if this pipe was created by an incoming connection, but we haven't received
- // a message yet, then it won't have the policy set.
- if (pipe->get_out_seq() == 0)
- pipe->policy = policy_map[m->get_dest().type()];
- */
+ dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
pipe->_send(m);
pipe->lock.Unlock();
}
if (!pipe) {
if (lazy) {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", lazy, dropping." << dendl;
+ dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", lazy, dropping." << dendl;
delete m;
} else {
- dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", new pipe." << dendl;
+ dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
// not connected.
- pipe = connect_rank(dest_proc_addr, policy_map[m->get_dest().type()]);
+ pipe = connect_rank(dest_proc_addr, policy_map[dest.name.type()]);
pipe->send(m);
}
}
int sd;
int new_sd;
entity_addr_t peer_addr;
- entity_name_t last_dest_name;
Policy policy;
bool lossy_rx;
void _send(Message *m) {
m->get();
q[m->get_priority()].push_back(m);
- last_dest_name = m->get_dest();
cond.Signal();
}
Message *_get_next_outgoing() {
}
enum { BAD_REMOTE_RESET, BAD_RESET, BAD_FAILED };
- list<pair<entity_addr_t,entity_name_t> > remote_reset_q;
- list<pair<entity_addr_t,entity_name_t> > reset_q;
- list<pair<Message*,entity_inst_t> > failed_q;
+ list<entity_addr_t> remote_reset_q;
+ list<entity_addr_t> reset_q;
+ list<pair<Message*,entity_addr_t> > failed_q;
- void queue_remote_reset(entity_addr_t a, entity_name_t n) {
+ void queue_remote_reset(entity_addr_t a) {
lock.Lock();
- remote_reset_q.push_back(pair<entity_addr_t,entity_name_t>(a,n));
+ remote_reset_q.push_back(a);
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET);
cond.Signal();
lock.Unlock();
}
- void queue_reset(entity_addr_t a, entity_name_t n) {
+ void queue_reset(entity_addr_t a) {
lock.Lock();
- reset_q.push_back(pair<entity_addr_t,entity_name_t>(a,n));
+ reset_q.push_back(a);
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET);
cond.Signal();
lock.Unlock();
}
- void queue_failure(Message *m, entity_inst_t i) {
+ void queue_failure(Message *m, entity_addr_t a) {
lock.Lock();
m->get();
- failed_q.push_back(pair<Message*,entity_inst_t>(m,i));
+ failed_q.push_back(pair<Message*,entity_addr_t>(m, a));
dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_FAILED);
cond.Signal();
lock.Unlock();
return ++global_seq;
}
+ void set_addr(entity_addr_t a) {
+ rank_addr = a;
+ need_addr = false;
+ }
+
Endpoint *register_entity(entity_name_t addr);
void rename_entity(Endpoint *ms, entity_name_t newaddr);
void unregister_entity(Endpoint *ms);
- void submit_message(Message *m, const entity_addr_t& addr, bool lazy=false);
+ void submit_message(Message *m, const entity_inst_t& addr, bool lazy=false);
void prepare_dest(const entity_inst_t& inst);
// create a new messenger