{
dout(0) << "entered SimpleMessenger::Endpoint::dispatch_entry" << dendl;
map<int, xlist<Pipe *> >::reverse_iterator high_iter;
- lock.Lock();
+ endpoint_lock.Lock();
while (!stop) {
dout(0) << "in outer !stop loop of SimpleMessenger::Endpoint::dispatch_entry" << dendl;
while (!queued_pipes.empty()) {
Pipe *pipe = pipe_list.front();
dout(0) << "high priority: " << priority << " taking pipe " << pipe << dendl;
//move pipe to back of line -- or just take off if no more messages
- pipe->lock.Lock();
+ pipe->pipe_lock.Lock();
list<Message *>& m_queue = pipe->in_q[priority];
pipe_list.pop_front();
if (m_queue.size() > 1) {
if (pipe_list.empty())
queued_pipes.erase(priority);
--qlen;
- lock.Unlock(); //done with the pipe queue for a while
+ endpoint_lock.Unlock(); //done with the pipe queue for a while
//get message from pipe
Message *m = m_queue.front();
m_queue.pop_front();
- pipe->lock.Unlock(); // done with the pipe's message queue now
+ pipe->pipe_lock.Unlock(); // done with the pipe's message queue now
{
if ((long)m == D_BAD_REMOTE_RESET) {
- lock.Lock();
+ endpoint_lock.Lock();
Connection *con = remote_reset_q.front();
remote_reset_q.pop_front();
- lock.Unlock();
+ endpoint_lock.Unlock();
ms_deliver_handle_remote_reset(con);
con->put();
} else if ((long)m == D_CONNECT) {
- lock.Lock();
+ endpoint_lock.Lock();
Connection *con = connect_q.front();
connect_q.pop_front();
- lock.Unlock();
+ endpoint_lock.Unlock();
ms_deliver_handle_connect(con);
con->put();
} else if ((long)m == D_BAD_RESET) {
- lock.Lock();
+ endpoint_lock.Lock();
Connection *con = reset_q.front();
reset_q.pop_front();
- lock.Unlock();
+ endpoint_lock.Unlock();
ms_deliver_handle_reset(con);
con->put();
} else {
dout(20) << "done calling dispatch on " << m << dendl;
}
}
- lock.Lock();
+ endpoint_lock.Lock();
}
- cond.Wait(lock); //wait for something to get put on queue
+ cond.Wait(endpoint_lock); //wait for something to get put on queue
}
- lock.Unlock();
+ endpoint_lock.Unlock();
dout(15) << "dispatch: ending loop " << dendl;
// deregister
stop = true;
} else {
dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl;
- lock.Lock();
+ endpoint_lock.Lock();
stop = true;
cond.Signal();
- lock.Unlock();
+ endpoint_lock.Unlock();
}
return 0;
}
// existing?
if (rank->rank_pipe.count(peer_addr)) {
existing = rank->rank_pipe[peer_addr];
- existing->lock.Lock();
+ existing->pipe_lock.Lock();
if (connect.global_seq < existing->peer_global_seq) {
dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq
<< " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl;
reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL;
reply.global_seq = existing->peer_global_seq; // so we can send it below..
- existing->lock.Unlock();
+ existing->pipe_lock.Unlock();
rank->lock.Unlock();
goto reply;
} else {
<< " > " << connect.connect_seq << ", RETRY_SESSION" << dendl;
reply.tag = CEPH_MSGR_TAG_RETRY_SESSION;
reply.connect_seq = existing->connect_seq; // so we can send it below..
- existing->lock.Unlock();
+ existing->pipe_lock.Unlock();
rank->lock.Unlock();
goto reply;
}
assert(peer_addr > rank->rank_addr);
assert(existing->state == STATE_CONNECTING); // this will win
reply.tag = CEPH_MSGR_TAG_WAIT;
- existing->lock.Unlock();
+ existing->pipe_lock.Unlock();
rank->lock.Unlock();
goto reply;
}
<< "), sending RESETSESSION" << dendl;
reply.tag = CEPH_MSGR_TAG_RESETSESSION;
rank->lock.Unlock();
- existing->lock.Unlock();
+ existing->pipe_lock.Unlock();
goto reply;
}
p++)
out_q[p->first].splice(out_q[p->first].begin(), p->second);
- existing->lock.Unlock();
+ existing->pipe_lock.Unlock();
open:
// open
goto fail_unlocked;
}
- lock.Lock();
+ pipe_lock.Lock();
if (state != STATE_CLOSED) {
dout(10) << "accept starting writer, " << "state=" << state << dendl;
start_writer();
}
dout(20) << "accept done" << dendl;
- lock.Unlock();
+ pipe_lock.Unlock();
return 0; // success.
fail_unlocked:
- lock.Lock();
+ pipe_lock.Lock();
state = STATE_CLOSED;
fault();
- lock.Unlock();
+ pipe_lock.Unlock();
return -1;
}
bool got_bad_auth = false;
dout(10) << "connect " << connect_seq << dendl;
- assert(lock.is_locked());
+ assert(pipe_lock.is_locked());
if (sd >= 0) {
::close(sd);
// stop reader thrad
join_reader();
- lock.Unlock();
+ pipe_lock.Unlock();
char tag = -1;
int rc;
}
}
- lock.Lock();
+ pipe_lock.Lock();
if (state != STATE_CONNECTING) {
dout(0) << "connect got RESETSESSION but no longer connecting" << dendl;
goto stop_locked;
if (got_bad_auth)
goto stop_locked;
got_bad_auth = true;
- lock.Unlock();
+ pipe_lock.Unlock();
authorizer = rank->get_authorizer(peer_type, true); // try harder
continue;
}
dout(0) << "connect got RESETSESSION" << dendl;
was_session_reset();
cseq = 0;
- lock.Unlock();
+ pipe_lock.Unlock();
continue;
}
if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) {
gseq = rank->get_global_seq(reply.global_seq);
dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq
<< " chose new " << gseq << dendl;
- lock.Unlock();
+ pipe_lock.Unlock();
continue;
}
if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) {
dout(10) << "connect got RETRY_SESSION " << connect_seq
<< " -> " << reply.connect_seq << dendl;
cseq = connect_seq = reply.connect_seq;
- lock.Unlock();
+ pipe_lock.Unlock();
continue;
}
}
fail:
- lock.Lock();
+ pipe_lock.Lock();
fail_locked:
if (state == STATE_CONNECTING)
fault();
void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
{
- assert(lock.is_locked());
+ assert(pipe_lock.is_locked());
cond.Signal();
if (onread && state == STATE_CONNECTING) {
backoff.set_from_double(g_conf.ms_initial_backoff);
} else {
dout(10) << "fault waiting " << backoff << dendl;
- cond.WaitInterval(lock, backoff);
+ cond.WaitInterval(pipe_lock, backoff);
backoff += backoff;
if (backoff > g_conf.ms_max_backoff)
backoff.set_from_double(g_conf.ms_max_backoff);
void SimpleMessenger::Pipe::fail()
{
derr(10) << "fail" << dendl;
- assert(lock.is_locked());
+ assert(pipe_lock.is_locked());
stop();
void SimpleMessenger::Pipe::was_session_reset()
{
- assert(lock.is_locked());
+ assert(pipe_lock.is_locked());
dout(10) << "was_session_reset" << dendl;
discard_queue();
void SimpleMessenger::Pipe::stop()
{
dout(10) << "stop" << dendl;
- assert(lock.is_locked());
+ assert(pipe_lock.is_locked());
state = STATE_CLOSED;
cond.Signal();
if (sd >= 0) {
if (state == STATE_ACCEPTING)
accept();
- lock.Lock();
+ pipe_lock.Lock();
// loop.
while (state != STATE_CLOSED &&
state != STATE_CONNECTING) {
- assert(lock.is_locked());
+ assert(pipe_lock.is_locked());
// sleep if (re)connecting
if (state == STATE_STANDBY) {
dout(20) << "reader sleeping during reconnect|standby" << dendl;
- cond.Wait(lock);
+ cond.Wait(pipe_lock);
continue;
}
- lock.Unlock();
+ pipe_lock.Unlock();
char buf[80];
char tag = -1;
dout(20) << "reader reading tag..." << dendl;
int rc = tcp_read(sd, (char*)&tag, 1);
if (rc < 0) {
- lock.Lock();
+ pipe_lock.Lock();
dout(2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
fault(false, true);
continue;
if (tag == CEPH_MSGR_TAG_KEEPALIVE) {
dout(20) << "reader got KEEPALIVE" << dendl;
- lock.Lock();
+ pipe_lock.Lock();
continue;
}
dout(20) << "reader got ACK" << dendl;
__le64 seq;
int rc = tcp_read( sd, (char*)&seq, sizeof(seq));
- lock.Lock();
+ pipe_lock.Lock();
if (rc < 0) {
dout(2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
fault(false, true);
dout(20) << "reader got MSG" << dendl;
Message *m = read_message();
- lock.Lock();
+ pipe_lock.Lock();
if (!m) {
derr(2) << "reader read null message, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
}
cond.Signal(); // wake up writer, to ack this
- lock.Unlock();
+ pipe_lock.Unlock();
dout(10) << "reader got message "
<< m->get_seq() << " " << m << " " << *m
else derr(0) << "reader got message " << *m
<< "but there is no endpoint!" << dendl;
- lock.Lock();
+ pipe_lock.Lock();
}
else if (tag == CEPH_MSGR_TAG_CLOSE) {
dout(20) << "reader got CLOSE" << dendl;
- lock.Lock();
+ pipe_lock.Lock();
if (state == STATE_CLOSING)
state = STATE_CLOSED;
else
}
else {
dout(0) << "reader bad tag " << (int)tag << dendl;
- lock.Lock();
+ pipe_lock.Lock();
fault(false, true);
}
}
{
char buf[80];
- lock.Lock();
+ pipe_lock.Lock();
while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
dout(10) << "writer: state = " << state << " policy.server=" << policy.server << dendl;
dout(20) << "writer writing CLOSE tag" << dendl;
char tag = CEPH_MSGR_TAG_CLOSE;
state = STATE_CLOSED;
- lock.Unlock();
+ pipe_lock.Unlock();
if (sd) ::write(sd, &tag, 1);
- lock.Lock();
+ pipe_lock.Lock();
continue;
}
// keepalive?
if (keepalive) {
- lock.Unlock();
+ pipe_lock.Unlock();
int rc = write_keepalive();
- lock.Lock();
+ pipe_lock.Lock();
if (rc < 0) {
dout(2) << "writer couldn't write keepalive, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
fault();
// send ack?
if (in_seq > in_seq_acked) {
int send_seq = in_seq;
- lock.Unlock();
+ pipe_lock.Unlock();
int rc = write_ack(send_seq);
- lock.Lock();
+ pipe_lock.Lock();
if (rc < 0) {
dout(2) << "writer couldn't write ack, " << strerror_r(errno, buf, sizeof(buf)) << dendl;
fault();
m->set_seq(++out_seq);
sent.push_back(m); // move to sent list
m->get();
- lock.Unlock();
+ pipe_lock.Unlock();
dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl;
dout(20) << "writer sending " << m->get_seq() << " " << m << dendl;
int rc = write_message(m);
- lock.Lock();
+ pipe_lock.Lock();
if (rc < 0) {
derr(1) << "writer error sending " << m << ", "
<< errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl;
// wait
dout(20) << "writer sleeping" << dendl;
- cond.Wait(lock);
+ cond.Wait(pipe_lock);
}
dout(20) << "writer finishing" << dendl;
closed_socket();
}
- lock.Unlock();
+ pipe_lock.Unlock();
// queue for reap
dout(10) << "unlock_maybe_reap queueing for reap" << dendl;
}
rank->lock.Unlock();
} else {
- lock.Unlock();
+ pipe_lock.Unlock();
}
}
Pipe *p = pipe_reap_queue.front();
pipe_reap_queue.pop_front();
dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl;
- p->lock.Lock();
- p->lock.Unlock();
+ p->pipe_lock.Lock();
+ p->pipe_lock.Unlock();
p->unregister_pipe();
assert(pipes.count(p));
pipes.erase(p);
if (rank_pipe.count( dest_proc_addr )) {
// connected?
pipe = rank_pipe[ dest_proc_addr ];
- pipe->lock.Lock();
+ pipe->pipe_lock.Lock();
if (pipe->state == Pipe::STATE_CLOSED) {
dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
pipe->unregister_pipe();
- pipe->lock.Unlock();
+ pipe->pipe_lock.Unlock();
pipe = 0;
} else {
dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl;
pipe->_send(m);
- pipe->lock.Unlock();
+ pipe->pipe_lock.Unlock();
}
}
if (!pipe) {
if (rank_pipe.count( dest_proc_addr )) {
// connected?
pipe = rank_pipe[ dest_proc_addr ];
- pipe->lock.Lock();
+ pipe->pipe_lock.Lock();
if (pipe->state == Pipe::STATE_CLOSED) {
dout(20) << "send_keepalive remote, " << dest_addr << ", ignoring old closed pipe." << dendl;
pipe->unregister_pipe();
- pipe->lock.Unlock();
+ pipe->pipe_lock.Unlock();
pipe = 0;
} else {
dout(20) << "send_keepalive remote, " << dest_addr << ", have pipe." << dendl;
pipe->_send_keepalive();
- pipe->lock.Unlock();
+ pipe->pipe_lock.Unlock();
}
}
if (!pipe)
while (!rank_pipe.empty()) {
Pipe *p = rank_pipe.begin()->second;
p->unregister_pipe();
- p->lock.Lock();
+ p->pipe_lock.Lock();
p->stop();
- p->lock.Unlock();
+ p->pipe_lock.Unlock();
}
reaper();
Pipe *p = rank_pipe[addr];
dout(1) << "mark_down " << addr << " -- " << p << dendl;
p->unregister_pipe();
- p->lock.Lock();
+ p->pipe_lock.Lock();
p->stop();
- p->lock.Unlock();
+ p->pipe_lock.Unlock();
} else {
dout(1) << "mark_down " << addr << " -- pipe dne" << dendl;
}
entity_addr_t peer_addr;
Policy policy;
- Mutex lock;
+ Mutex pipe_lock;
int state;
protected:
Pipe(SimpleMessenger *r, int st) :
rank(r),
sd(-1), peer_type(-1),
- lock("SimpleMessenger::Pipe::lock"),
+ pipe_lock("SimpleMessenger::Pipe::pipe_lock"),
state(st),
connection_state(new Connection),
reader_running(false), writer_running(false),
return;
cond.Signal();
reader_thread.kill(SIGUSR2);
- lock.Unlock();
+ pipe_lock.Unlock();
reader_thread.join();
- lock.Lock();
+ pipe_lock.Lock();
}
// public constructors
dout(0) << "queuing received message " << m << "in msgr " << rank << dendl;
list<Message *>& queue = in_q[priority];
- lock.Lock();
+ pipe_lock.Lock();
queue.push_back(m);
if ( 1 == queue.size()) { //this pipe isn't on the endpoint queue
if (!queue_items.count(priority)) { //create an item for that priority
pair_item(priority, new xlist<Pipe *>::item(this));
queue_items.insert(pair_item);
}
- rank->local_endpoint->queue_lock.Lock();
+ pipe_lock.Unlock();
+ rank->local_endpoint->endpoint_lock.Lock();
rank->local_endpoint->
queued_pipes[priority].push_back(queue_items[priority]);
- rank->local_endpoint->queue_lock.Unlock();
+ rank->local_endpoint->endpoint_lock.Unlock();
+ pipe_lock.Lock();
}
- lock.Unlock();
+ pipe_lock.Unlock();
//increment queue length counter
- rank->local_endpoint->lock.Lock();
+ rank->local_endpoint->endpoint_lock.Lock();
++rank->local_endpoint->qlen;
rank->local_endpoint->cond.Signal();
- rank->local_endpoint->lock.Unlock();
+ rank->local_endpoint->endpoint_lock.Unlock();
}
void queue_received(Message *m) {
void stop();
void send(Message *m) {
- lock.Lock();
+ pipe_lock.Lock();
_send(m);
- lock.Unlock();
+ pipe_lock.Unlock();
}
void _send(Message *m) {
m->get();
cond.Signal();
}
void send_keepalive() {
- lock.Lock();
+ pipe_lock.Lock();
_send_keepalive();
- lock.Unlock();
+ pipe_lock.Unlock();
}
void _send_keepalive() {
keepalive = true;
class Endpoint : public Messenger {
SimpleMessenger *rank;
Pipe *local_pipe;
- Mutex lock;
+ Mutex endpoint_lock;
Cond cond;
- Mutex queue_lock;
map<int, xlist<Pipe *> > queued_pipes;
map<int, xlist<Pipe *>::iterator> queued_pipe_iters;
bool stop;
}
void queue_connect(Connection *con) {
- lock.Lock();
+ endpoint_lock.Lock();
connect_q.push_back(con);
local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST);
cond.Signal();
- lock.Unlock();
+ endpoint_lock.Unlock();
}
void queue_remote_reset(Connection *con) {
- lock.Lock();
+ endpoint_lock.Lock();
remote_reset_q.push_back(con);
local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST);
cond.Signal();
- lock.Unlock();
+ endpoint_lock.Unlock();
}
void queue_reset(Connection *con) {
- lock.Lock();
+ endpoint_lock.Lock();
reset_q.push_back(con);
local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST);
cond.Signal();
- lock.Unlock();
+ endpoint_lock.Unlock();
}
public:
Endpoint(SimpleMessenger *r, entity_name_t name, int rn) :
Messenger(name),
rank(r),
- lock("SimpleMessenger::Endpoint::lock"),
- queue_lock("SimpleMessenger::Endpoint:queue_lock"),
+ endpoint_lock("SimpleMessenger::Endpoint::endpoint_lock"),
stop(false),
qlen(0),
my_rank(rn),