#define DOUT_SUBSYS ms
#undef dout_prefix
-#define dout_prefix _prefix(rank)
-static ostream& _prefix(SimpleMessenger *rank) {
- return *_dout << dbeginl << pthread_self() << " -- " << rank->rank_addr << " ";
+#define dout_prefix _prefix(messenger)
+static ostream& _prefix(SimpleMessenger *messenger) {
+ return *_dout << dbeginl << pthread_self() << " -- " << messenger->ms_addr << " ";
}
return -errno;
}
- rank->rank_addr = g_my_addr;
- if (rank->rank_addr != entity_addr_t())
- rank->need_addr = false;
+ messenger->ms_addr = g_my_addr;
+ if (messenger->ms_addr != entity_addr_t())
+ messenger->need_addr = false;
else
- rank->need_addr = true;
+ messenger->need_addr = true;
- if (rank->rank_addr.get_port() == 0) {
- rank->rank_addr.in4_addr() = listen_addr;
+ if (messenger->ms_addr.get_port() == 0) {
+ messenger->ms_addr.in4_addr() = listen_addr;
if (force_nonce >= 0)
- rank->rank_addr.nonce = force_nonce;
+ messenger->ms_addr.nonce = force_nonce;
else
- rank->rank_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
+ messenger->ms_addr.nonce = getpid(); // FIXME: pid might not be best choice here.
}
- rank->rank_addr.erank = 0;
+ messenger->ms_addr.erank = 0;
- dout(1) << "accepter.bind rank_addr is " << rank->rank_addr << " need_addr=" << rank->need_addr << dendl;
- rank->did_bind = true;
+ dout(1) << "accepter.bind ms_addr is " << messenger->ms_addr << " need_addr=" << messenger->need_addr << dendl;
+ messenger->did_bind = true;
return 0;
}
dout(0) << "accepter could't set TCP_NODELAY: " << strerror_r(errno, buf, sizeof(buf)) << dendl;
}
- rank->lock.Lock();
+ messenger->lock.Lock();
- if (!rank->endpoint_stopped) {
- Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING);
+ if (!messenger->endpoint_stopped) {
+ Pipe *p = new Pipe(messenger, Pipe::STATE_ACCEPTING);
p->sd = sd;
p->start_reader();
- rank->pipes.insert(p);
+ messenger->pipes.insert(p);
}
- rank->lock.Unlock();
+ messenger->lock.Unlock();
} else {
dout(0) << "accepter no incoming connection? sd = " << sd
<< " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
entity_addr_t SimpleMessenger::get_myaddr()
{
- entity_addr_t a = rank->rank_addr;
+ entity_addr_t a = messenger->ms_addr;
a.erank = 0;
return a;
}
#define dout_prefix _pipe_prefix()
ostream& SimpleMessenger::Pipe::_pipe_prefix() {
return *_dout << dbeginl << pthread_self()
- << " -- " << rank->rank_addr << " >> " << peer_addr << " pipe(" << this
+ << " -- " << messenger->ms_addr << " >> " << peer_addr << " pipe(" << this
<< " sd=" << sd
<< " pgs=" << peer_global_seq
<< " cs=" << connect_seq
// and my addr
bufferlist addrs;
- ::encode(rank->rank_addr, addrs);
+ ::encode(messenger->ms_addr, addrs);
// and peer's socket addr (they might not know their ip)
entity_addr_t socket_addr;
<< " global_seq " << connect.global_seq
<< dendl;
- rank->lock.Lock();
+ messenger->lock.Lock();
// note peer's type, flags
set_peer_type(connect.host_type);
- policy = rank->get_policy(connect.host_type);
+ policy = messenger->get_policy(connect.host_type);
dout(10) << "accept of host_type " << connect.host_type
<< ", policy.lossy=" << policy.lossy
<< dendl;
memset(&reply, 0, sizeof(reply));
- reply.protocol_version = get_proto_version(rank->my_type, peer_type, false);
+ reply.protocol_version = get_proto_version(messenger->my_type, peer_type, false);
// mismatch?
dout(10) << "accept my proto " << reply.protocol_version
<< ", their proto " << connect.protocol_version << dendl;
if (connect.protocol_version != reply.protocol_version) {
reply.tag = CEPH_MSGR_TAG_BADPROTOVER;
- rank->lock.Unlock();
+ messenger->lock.Unlock();
goto reply;
}
if (feat_missing) {
dout(1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl;
reply.tag = CEPH_MSGR_TAG_FEATURES;
- rank->lock.Unlock();
+ messenger->lock.Unlock();
goto reply;
}
- rank->lock.Unlock();
- if (rank->verify_authorizer(connection_state, peer_type,
+ messenger->lock.Unlock();
+ if (messenger->verify_authorizer(connection_state, peer_type,
connect.authorizer_protocol, authorizer, authorizer_reply, authorizer_valid) &&
!authorizer_valid) {
dout(0) << "accept bad authorizer" << dendl;
reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER;
goto reply;
}
- rank->lock.Lock();
+ messenger->lock.Lock();
// existing?
- if (rank->rank_pipe.count(peer_addr)) {
- existing = rank->rank_pipe[peer_addr];
+ if (messenger->rank_pipe.count(peer_addr)) {
+ existing = messenger->rank_pipe[peer_addr];
existing->pipe_lock.Lock();
if (connect.global_seq < existing->peer_global_seq) {
reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
reply.global_seq = existing->peer_global_seq; // so we can send it below..
existing->pipe_lock.Unlock();
- rank->lock.Unlock();
+ messenger->lock.Unlock();
goto reply;
} else {
dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
reply.connect_seq = existing->connect_seq; // so we can send it below..
existing->pipe_lock.Unlock();
- rank->lock.Unlock();
+ messenger->lock.Unlock();
goto reply;
}
}
if (connect.connect_seq == existing->connect_seq) {
// connection race?
- if (peer_addr < rank->rank_addr ||
+ if (peer_addr < messenger->ms_addr ||
existing->policy.server) {
// incoming wins
dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
// our existing outgoing wins
dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq
<< " == " << connect.connect_seq << ", sending WAIT" << dendl;
- assert(peer_addr > rank->rank_addr);
+ assert(peer_addr > messenger->ms_addr);
assert(existing->state == STATE_CONNECTING); // this will win
reply.tag = CEPH_MSGR_TAG_WAIT;
existing->pipe_lock.Unlock();
- rank->lock.Unlock();
+ messenger->lock.Unlock();
goto reply;
}
}
<< ", " << existing << ".cseq = " << existing->connect_seq
<< "), sending RESETSESSION" << dendl;
reply.tag = CEPH_MSGR_TAG_RESETSESSION;
- rank->lock.Unlock();
+ messenger->lock.Unlock();
existing->pipe_lock.Unlock();
goto reply;
}
else if (connect.connect_seq > 0) {
// we reset, and they are opening a new session
dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl;
- rank->lock.Unlock();
+ messenger->lock.Unlock();
reply.tag = CEPH_MSGR_TAG_RESETSESSION;
goto reply;
} else {
// send READY reply
reply.tag = CEPH_MSGR_TAG_READY;
- reply.global_seq = rank->get_global_seq();
+ reply.global_seq = messenger->get_global_seq();
reply.connect_seq = connect_seq;
reply.flags = 0;
reply.authorizer_len = authorizer_reply.length();
// ok!
register_pipe();
- rank->lock.Unlock();
+ messenger->lock.Unlock();
rc = tcp_write(sd, (char*)&reply, sizeof(reply));
if (rc < 0)
closed_socket();
}
__u32 cseq = connect_seq;
- __u32 gseq = rank->get_global_seq();
+ __u32 gseq = messenger->get_global_seq();
// stop reader thrad
join_reader();
dout(20) << "connect peer addr for me is " << peer_addr_for_me << dendl;
- if (rank->need_addr)
- rank->learned_addr(peer_addr_for_me);
+ if (messenger->need_addr)
+ messenger->learned_addr(peer_addr_for_me);
- ::encode(rank->rank_addr, myaddrbl);
+ ::encode(messenger->ms_addr, myaddrbl);
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = myaddrbl.c_str();
dout(2) << "connect couldn't write my addr, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
goto fail;
}
- dout(10) << "connect sent my addr " << rank->rank_addr << dendl;
+ dout(10) << "connect sent my addr " << messenger->ms_addr << dendl;
while (1) {
delete authorizer;
- authorizer = rank->get_authorizer(peer_type, false);
+ authorizer = messenger->get_authorizer(peer_type, false);
bufferlist authorizer_reply;
ceph_msg_connect connect;
connect.features = CEPH_FEATURE_SUPPORTED;
- connect.host_type = rank->my_type;
+ connect.host_type = messenger->my_type;
connect.global_seq = gseq;
connect.connect_seq = cseq;
- connect.protocol_version = get_proto_version(rank->my_type, peer_type, true);
+ connect.protocol_version = get_proto_version(messenger->my_type, peer_type, true);
connect.authorizer_protocol = authorizer ? authorizer->protocol : 0;
connect.authorizer_len = authorizer ? authorizer->bl.length() : 0;
if (authorizer)
goto stop_locked;
got_bad_auth = true;
pipe_lock.Unlock();
- authorizer = rank->get_authorizer(peer_type, true); // try harder
+ authorizer = messenger->get_authorizer(peer_type, true); // try harder
continue;
}
if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) {
continue;
}
if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
- gseq = rank->get_global_seq(reply.global_seq);
+ gseq = messenger->get_global_seq(reply.global_seq);
dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq
<< " chose new " << gseq << dendl;
pipe_lock.Unlock();
backoff = utime_t();
dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
- if (!rank->endpoint_stopped) {
+ if (!messenger->endpoint_stopped) {
Connection * cstate = connection_state->get();
pipe_lock.Unlock();
- rank->dispatch_queue.queue_connect(cstate);
+ messenger->dispatch_queue.queue_connect(cstate);
pipe_lock.Lock();
}
void SimpleMessenger::Pipe::register_pipe()
{
dout(10) << "register_pipe" << dendl;
- assert(rank->lock.is_locked());
- assert(rank->rank_pipe.count(peer_addr) == 0);
- rank->rank_pipe[peer_addr] = this;
+ assert(messenger->lock.is_locked());
+ assert(messenger->rank_pipe.count(peer_addr) == 0);
+ messenger->rank_pipe[peer_addr] = this;
}
void SimpleMessenger::Pipe::unregister_pipe()
{
- assert(rank->lock.is_locked());
- if (rank->rank_pipe.count(peer_addr) &&
- rank->rank_pipe[peer_addr] == this) {
+ assert(messenger->lock.is_locked());
+ if (messenger->rank_pipe.count(peer_addr) &&
+ messenger->rank_pipe[peer_addr] == this) {
dout(10) << "unregister_pipe" << dendl;
- rank->rank_pipe.erase(peer_addr);
+ messenger->rank_pipe.erase(peer_addr);
} else {
dout(10) << "unregister_pipe - not registered" << dendl;
}
void SimpleMessenger::Pipe::discard_queue()
{
dout(10) << "discard_queue" << dendl;
- DispatchQueue& q = rank->dispatch_queue;
+ DispatchQueue& q = messenger->dispatch_queue;
pipe_lock.Unlock();
xlist<Pipe *>* list_on;
discard_queue();
- if (!rank->endpoint_stopped) {
+ if (!messenger->endpoint_stopped) {
Connection * cstate = connection_state->get();
pipe_lock.Unlock();
- rank->dispatch_queue.queue_reset(cstate);
+ messenger->dispatch_queue.queue_reset(cstate);
pipe_lock.Lock();
}
}
dout(10) << "was_session_reset" << dendl;
discard_queue();
- if (!rank->endpoint_stopped) {
+ if (!messenger->endpoint_stopped) {
Connection * cstate = connection_state->get();
pipe_lock.Unlock();
- rank->dispatch_queue.queue_remote_reset(cstate);
+ messenger->dispatch_queue.queue_remote_reset(cstate);
pipe_lock.Lock();
}
// queue for reap
dout(10) << "unlock_maybe_reap queueing for reap" << dendl;
- rank->lock.Lock();
+ messenger->lock.Lock();
{
- rank->pipe_reap_queue.push_back(this);
- rank->wait_cond.Signal();
+ messenger->pipe_reap_queue.push_back(this);
+ messenger->wait_cond.Signal();
}
- rank->lock.Unlock();
+ messenger->lock.Unlock();
} else {
pipe_lock.Unlock();
}
}
if (!did_bind)
- rank_addr.nonce = getpid();
+ ms_addr.nonce = getpid();
- dout(1) << "rank.start" << dendl;
+ dout(1) << "messenger.start" << dendl;
started = true;
lock.Unlock();
// daemonize?
if (g_conf.daemonize && !nodaemon) {
if (Thread::get_num_threads() > 0) {
- derr(0) << "rank.start BUG: there are " << Thread::get_num_threads()
- << " already started that will now die! call rank.start() sooner."
+ derr(0) << "messenger.start BUG: there are " << Thread::get_num_threads()
+ << " already started that will now die! call messenger.start() sooner."
<< dendl;
}
- dout(1) << "rank.start daemonizing" << dendl;
+ dout(1) << "messenger.start daemonizing" << dendl;
if (1) {
daemon(1, 0);
/* connect_rank
- * NOTE: assumes rank.lock held.
+ * NOTE: assumes messenger.lock held.
*/
SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, int type)
{
assert(lock.is_locked());
- assert(addr != rank_addr);
+ assert(addr != ms_addr);
dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl;
lock.Lock();
{
// local?
- if (rank_addr.is_local_to(dest_addr)) {
+ if (ms_addr.is_local_to(dest_addr)) {
if (dest_addr.get_erank() == 0 && !endpoint_stopped) {
// local
dout(20) << "submit_message " << *m << " local" << dendl;
lock.Lock();
{
// local?
- if (!rank_addr.is_local_to(dest_addr)) {
+ if (!ms_addr.is_local_to(dest_addr)) {
// remote.
Pipe *pipe = 0;
if (rank_pipe.count( dest_proc_addr )) {
void SimpleMessenger::learned_addr(entity_addr_t peer_addr_for_me)
{
lock.Lock();
- int port = rank_addr.get_port();
- rank_addr.addr = peer_addr_for_me.addr;
- rank_addr.set_port(port);
- dout(1) << "learned my addr " << rank_addr << dendl;
+ int port = ms_addr.get_port();
+ ms_addr.addr = peer_addr_for_me.addr;
+ ms_addr.set_port(port);
+ dout(1) << "learned my addr " << ms_addr << dendl;
need_addr = false;
lock.Unlock();
}
* the destructor will lead to badness.
*/
-/* Rank - per-process
- */
class SimpleMessenger : public Messenger {
public:
struct Policy {
// incoming
class Accepter : public Thread {
public:
- SimpleMessenger *rank;
+ SimpleMessenger *messenger;
bool done;
int listen_sd;
- Accepter(SimpleMessenger *r) : rank(r), done(false), listen_sd(-1) {}
+ Accepter(SimpleMessenger *r) : messenger(r), done(false), listen_sd(-1) {}
void *entry();
void stop();
// pipe
class Pipe {
public:
- SimpleMessenger *rank;
+ SimpleMessenger *messenger;
ostream& _pipe_prefix();
enum {
public:
Pipe(SimpleMessenger *r, int st) :
- rank(r),
+ messenger(r),
sd(-1), peer_type(-1),
pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
state(st),
if (!queue_items.count(priority))
queue_items[priority] = new xlist<Pipe *>::item(this);
pipe_lock.Unlock();
- rank->dispatch_queue.lock.Lock();
- if (rank->dispatch_queue.queued_pipes.empty())
- rank->dispatch_queue.cond.Signal();
- rank->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
- rank->dispatch_queue.lock.Unlock();
+ messenger->dispatch_queue.lock.Lock();
+ if (messenger->dispatch_queue.queued_pipes.empty())
+ messenger->dispatch_queue.cond.Signal();
+ messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]);
+ messenger->dispatch_queue.lock.Unlock();
pipe_lock.Lock();
}
//increment queue length counters
in_qlen++;
- rank->dispatch_queue.qlen_lock.lock();
- ++rank->dispatch_queue.qlen;
- rank->dispatch_queue.qlen_lock.unlock();
+ messenger->dispatch_queue.qlen_lock.lock();
+ ++messenger->dispatch_queue.qlen;
+ messenger->dispatch_queue.qlen_lock.unlock();
pipe_lock.Unlock();
}
// where i listen
bool need_addr;
- entity_addr_t rank_addr;
+ entity_addr_t ms_addr;
// local
bool endpoint_stopped;
Pipe *connect_rank(const entity_addr_t& addr, int type);
- const entity_addr_t &get_rank_addr() { return rank_addr; }
+ const entity_addr_t &get_ms_addr() { return ms_addr; }
void mark_down(entity_addr_t addr);
private:
class DispatchThread : public Thread {
- SimpleMessenger *rank;
+ SimpleMessenger *messenger;
public:
- DispatchThread(SimpleMessenger *_rank) : rank(_rank) {}
+ DispatchThread(SimpleMessenger *_messenger) : messenger(_messenger) {}
void *entry() {
- rank->get();
- rank->dispatch_entry();
- rank->put();
+ messenger->get();
+ messenger->dispatch_entry();
+ messenger->put();
return 0;
}
} dispatch_thread;
void dispatch_entry();
- SimpleMessenger *rank; //hack to make dout macro work, will fix
+ SimpleMessenger *messenger; //hack to make dout macro work, will fix
public:
SimpleMessenger() :
lock("SimpleMessenger::lock"), started(false), did_bind(false), need_addr(true),
endpoint_stopped(true), my_type(-1),
global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0),
- dispatch_thread(this), rank(this) {
+ dispatch_thread(this), messenger(this) {
// for local dmsg delivery
dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN);
}