From c58f0e90e2d185e1c803e7a97c408f669f6985b2 Mon Sep 17 00:00:00 2001 From: Greg Farnum Date: Wed, 9 Dec 2009 15:12:00 -0800 Subject: [PATCH] msgr: rename locks for clarity; move some around to prevent blocking. --- src/msg/SimpleMessenger.cc | 148 ++++++++++++++++++------------------- src/msg/SimpleMessenger.h | 48 ++++++------ 2 files changed, 98 insertions(+), 98 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index e22e9cab8e497..17841c9ac8738 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -261,7 +261,7 @@ void SimpleMessenger::Endpoint::dispatch_entry() { dout(0) << "entered SimpleMessenger::Endpoint::dispatch_entry" << dendl; map >::reverse_iterator high_iter; - lock.Lock(); + endpoint_lock.Lock(); while (!stop) { dout(0) << "in outer !stop loop of SimpleMessenger::Endpoint::dispatch_entry" << dendl; while (!queued_pipes.empty()) { @@ -273,7 +273,7 @@ void SimpleMessenger::Endpoint::dispatch_entry() Pipe *pipe = pipe_list.front(); dout(0) << "high priority: " << priority << " taking pipe " << pipe << dendl; //move pipe to back of line -- or just take off if no more messages - pipe->lock.Lock(); + pipe->pipe_lock.Lock(); list& m_queue = pipe->in_q[priority]; pipe_list.pop_front(); if (m_queue.size() > 1) { @@ -283,32 +283,32 @@ void SimpleMessenger::Endpoint::dispatch_entry() if (pipe_list.empty()) queued_pipes.erase(priority); --qlen; - lock.Unlock(); //done with the pipe queue for a while + endpoint_lock.Unlock(); //done with the pipe queue for a while //get message from pipe Message *m = m_queue.front(); m_queue.pop_front(); - pipe->lock.Unlock(); // done with the pipe's message queue now + pipe->pipe_lock.Unlock(); // done with the pipe's message queue now { if ((long)m == D_BAD_REMOTE_RESET) { - lock.Lock(); + endpoint_lock.Lock(); Connection *con = remote_reset_q.front(); remote_reset_q.pop_front(); - lock.Unlock(); + endpoint_lock.Unlock(); ms_deliver_handle_remote_reset(con); con->put(); } else if ((long)m == D_CONNECT) { - lock.Lock(); + endpoint_lock.Lock(); Connection *con = connect_q.front(); connect_q.pop_front(); - lock.Unlock(); + endpoint_lock.Unlock(); ms_deliver_handle_connect(con); con->put(); } else if ((long)m == D_BAD_RESET) { - lock.Lock(); + endpoint_lock.Lock(); Connection *con = reset_q.front(); reset_q.pop_front(); - lock.Unlock(); + endpoint_lock.Unlock(); ms_deliver_handle_reset(con); con->put(); } else { @@ -325,11 +325,11 @@ void SimpleMessenger::Endpoint::dispatch_entry() dout(20) << "done calling dispatch on " << m << dendl; } } - lock.Lock(); + endpoint_lock.Lock(); } - cond.Wait(lock); //wait for something to get put on queue + cond.Wait(endpoint_lock); //wait for something to get put on queue } - lock.Unlock(); + endpoint_lock.Unlock(); dout(15) << "dispatch: ending loop " << dendl; // deregister @@ -356,10 +356,10 @@ int SimpleMessenger::Endpoint::shutdown() stop = true; } else { dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl; - lock.Lock(); + endpoint_lock.Lock(); stop = true; cond.Signal(); - lock.Unlock(); + endpoint_lock.Unlock(); } return 0; } @@ -657,14 +657,14 @@ int SimpleMessenger::Pipe::accept() // existing? if (rank->rank_pipe.count(peer_addr)) { existing = rank->rank_pipe[peer_addr]; - existing->lock.Lock(); + existing->pipe_lock.Lock(); if (connect.global_seq < existing->peer_global_seq) { dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL; reply.global_seq = existing->peer_global_seq; // so we can send it below.. - existing->lock.Unlock(); + existing->pipe_lock.Unlock(); rank->lock.Unlock(); goto reply; } else { @@ -707,7 +707,7 @@ int SimpleMessenger::Pipe::accept() << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; reply.connect_seq = existing->connect_seq; // so we can send it below.. - existing->lock.Unlock(); + existing->pipe_lock.Unlock(); rank->lock.Unlock(); goto reply; } @@ -731,7 +731,7 @@ int SimpleMessenger::Pipe::accept() assert(peer_addr > rank->rank_addr); assert(existing->state == STATE_CONNECTING); // this will win reply.tag = CEPH_MSGR_TAG_WAIT; - existing->lock.Unlock(); + existing->pipe_lock.Unlock(); rank->lock.Unlock(); goto reply; } @@ -745,7 +745,7 @@ int SimpleMessenger::Pipe::accept() << "), sending RESETSESSION" << dendl; reply.tag = CEPH_MSGR_TAG_RESETSESSION; rank->lock.Unlock(); - existing->lock.Unlock(); + existing->pipe_lock.Unlock(); goto reply; } @@ -796,7 +796,7 @@ int SimpleMessenger::Pipe::accept() p++) out_q[p->first].splice(out_q[p->first].begin(), p->second); - existing->lock.Unlock(); + existing->pipe_lock.Unlock(); open: // open @@ -827,21 +827,21 @@ int SimpleMessenger::Pipe::accept() goto fail_unlocked; } - lock.Lock(); + pipe_lock.Lock(); if (state != STATE_CLOSED) { dout(10) << "accept starting writer, " << "state=" << state << dendl; start_writer(); } dout(20) << "accept done" << dendl; - lock.Unlock(); + pipe_lock.Unlock(); return 0; // success. fail_unlocked: - lock.Lock(); + pipe_lock.Lock(); state = STATE_CLOSED; fault(); - lock.Unlock(); + pipe_lock.Unlock(); return -1; } @@ -850,7 +850,7 @@ int SimpleMessenger::Pipe::connect() bool got_bad_auth = false; dout(10) << "connect " << connect_seq << dendl; - assert(lock.is_locked()); + assert(pipe_lock.is_locked()); if (sd >= 0) { ::close(sd); @@ -863,7 +863,7 @@ int SimpleMessenger::Pipe::connect() // stop reader thrad join_reader(); - lock.Unlock(); + pipe_lock.Unlock(); char tag = -1; int rc; @@ -1049,7 +1049,7 @@ int SimpleMessenger::Pipe::connect() } } - lock.Lock(); + pipe_lock.Lock(); if (state != STATE_CONNECTING) { dout(0) << "connect got RESETSESSION but no longer connecting" << dendl; goto stop_locked; @@ -1066,7 +1066,7 @@ int SimpleMessenger::Pipe::connect() if (got_bad_auth) goto stop_locked; got_bad_auth = true; - lock.Unlock(); + pipe_lock.Unlock(); authorizer = rank->get_authorizer(peer_type, true); // try harder continue; } @@ -1074,14 +1074,14 @@ int SimpleMessenger::Pipe::connect() dout(0) << "connect got RESETSESSION" << dendl; was_session_reset(); cseq = 0; - lock.Unlock(); + pipe_lock.Unlock(); continue; } if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { gseq = rank->get_global_seq(reply.global_seq); dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq << " chose new " << gseq << dendl; - lock.Unlock(); + pipe_lock.Unlock(); continue; } if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { @@ -1089,7 +1089,7 @@ int SimpleMessenger::Pipe::connect() dout(10) << "connect got RETRY_SESSION " << connect_seq << " -> " << reply.connect_seq << dendl; cseq = connect_seq = reply.connect_seq; - lock.Unlock(); + pipe_lock.Unlock(); continue; } @@ -1125,7 +1125,7 @@ int SimpleMessenger::Pipe::connect() } fail: - lock.Lock(); + pipe_lock.Lock(); fail_locked: if (state == STATE_CONNECTING) fault(); @@ -1189,7 +1189,7 @@ void SimpleMessenger::Pipe::discard_queue() void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) { - assert(lock.is_locked()); + assert(pipe_lock.is_locked()); cond.Signal(); if (onread && state == STATE_CONNECTING) { @@ -1248,7 +1248,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) backoff.set_from_double(g_conf.ms_initial_backoff); } else { dout(10) << "fault waiting " << backoff << dendl; - cond.WaitInterval(lock, backoff); + cond.WaitInterval(pipe_lock, backoff); backoff += backoff; if (backoff > g_conf.ms_max_backoff) backoff.set_from_double(g_conf.ms_max_backoff); @@ -1259,7 +1259,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) void SimpleMessenger::Pipe::fail() { derr(10) << "fail" << dendl; - assert(lock.is_locked()); + assert(pipe_lock.is_locked()); stop(); @@ -1271,7 +1271,7 @@ void SimpleMessenger::Pipe::fail() void SimpleMessenger::Pipe::was_session_reset() { - assert(lock.is_locked()); + assert(pipe_lock.is_locked()); dout(10) << "was_session_reset" << dendl; discard_queue(); @@ -1287,7 +1287,7 @@ void SimpleMessenger::Pipe::was_session_reset() void SimpleMessenger::Pipe::stop() { dout(10) << "stop" << dendl; - assert(lock.is_locked()); + assert(pipe_lock.is_locked()); state = STATE_CLOSED; cond.Signal(); if (sd >= 0) { @@ -1309,28 +1309,28 @@ void SimpleMessenger::Pipe::reader() if (state == STATE_ACCEPTING) accept(); - lock.Lock(); + pipe_lock.Lock(); // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { - assert(lock.is_locked()); + assert(pipe_lock.is_locked()); // sleep if (re)connecting if (state == STATE_STANDBY) { dout(20) << "reader sleeping during reconnect|standby" << dendl; - cond.Wait(lock); + cond.Wait(pipe_lock); continue; } - lock.Unlock(); + pipe_lock.Unlock(); char buf[80]; char tag = -1; dout(20) << "reader reading tag..." << dendl; int rc = tcp_read(sd, (char*)&tag, 1); if (rc < 0) { - lock.Lock(); + pipe_lock.Lock(); dout(2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(false, true); continue; @@ -1338,7 +1338,7 @@ void SimpleMessenger::Pipe::reader() if (tag == CEPH_MSGR_TAG_KEEPALIVE) { dout(20) << "reader got KEEPALIVE" << dendl; - lock.Lock(); + pipe_lock.Lock(); continue; } @@ -1347,7 +1347,7 @@ void SimpleMessenger::Pipe::reader() dout(20) << "reader got ACK" << dendl; __le64 seq; int rc = tcp_read( sd, (char*)&seq, sizeof(seq)); - lock.Lock(); + pipe_lock.Lock(); if (rc < 0) { dout(2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(false, true); @@ -1370,7 +1370,7 @@ void SimpleMessenger::Pipe::reader() dout(20) << "reader got MSG" << dendl; Message *m = read_message(); - lock.Lock(); + pipe_lock.Lock(); if (!m) { derr(2) << "reader read null message, " << strerror_r(errno, buf, sizeof(buf)) << dendl; @@ -1406,7 +1406,7 @@ void SimpleMessenger::Pipe::reader() } cond.Signal(); // wake up writer, to ack this - lock.Unlock(); + pipe_lock.Unlock(); dout(10) << "reader got message " << m->get_seq() << " " << m << " " << *m @@ -1418,12 +1418,12 @@ void SimpleMessenger::Pipe::reader() else derr(0) << "reader got message " << *m << "but there is no endpoint!" << dendl; - lock.Lock(); + pipe_lock.Lock(); } else if (tag == CEPH_MSGR_TAG_CLOSE) { dout(20) << "reader got CLOSE" << dendl; - lock.Lock(); + pipe_lock.Lock(); if (state == STATE_CLOSING) state = STATE_CLOSED; else @@ -1433,7 +1433,7 @@ void SimpleMessenger::Pipe::reader() } else { dout(0) << "reader bad tag " << (int)tag << dendl; - lock.Lock(); + pipe_lock.Lock(); fault(false, true); } } @@ -1452,7 +1452,7 @@ void SimpleMessenger::Pipe::writer() { char buf[80]; - lock.Lock(); + pipe_lock.Lock(); while (state != STATE_CLOSED) {// && state != STATE_WAIT) { dout(10) << "writer: state = " << state << " policy.server=" << policy.server << dendl; @@ -1476,9 +1476,9 @@ void SimpleMessenger::Pipe::writer() dout(20) << "writer writing CLOSE tag" << dendl; char tag = CEPH_MSGR_TAG_CLOSE; state = STATE_CLOSED; - lock.Unlock(); + pipe_lock.Unlock(); if (sd) ::write(sd, &tag, 1); - lock.Lock(); + pipe_lock.Lock(); continue; } @@ -1487,9 +1487,9 @@ void SimpleMessenger::Pipe::writer() // keepalive? if (keepalive) { - lock.Unlock(); + pipe_lock.Unlock(); int rc = write_keepalive(); - lock.Lock(); + pipe_lock.Lock(); if (rc < 0) { dout(2) << "writer couldn't write keepalive, " << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(); @@ -1501,9 +1501,9 @@ void SimpleMessenger::Pipe::writer() // send ack? if (in_seq > in_seq_acked) { int send_seq = in_seq; - lock.Unlock(); + pipe_lock.Unlock(); int rc = write_ack(send_seq); - lock.Lock(); + pipe_lock.Lock(); if (rc < 0) { dout(2) << "writer couldn't write ack, " << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(); @@ -1518,7 +1518,7 @@ void SimpleMessenger::Pipe::writer() m->set_seq(++out_seq); sent.push_back(m); // move to sent list m->get(); - lock.Unlock(); + pipe_lock.Unlock(); dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; @@ -1528,7 +1528,7 @@ void SimpleMessenger::Pipe::writer() dout(20) << "writer sending " << m->get_seq() << " " << m << dendl; int rc = write_message(m); - lock.Lock(); + pipe_lock.Lock(); if (rc < 0) { derr(1) << "writer error sending " << m << ", " << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; @@ -1541,7 +1541,7 @@ void SimpleMessenger::Pipe::writer() // wait dout(20) << "writer sleeping" << dendl; - cond.Wait(lock); + cond.Wait(pipe_lock); } dout(20) << "writer finishing" << dendl; @@ -1562,7 +1562,7 @@ void SimpleMessenger::Pipe::unlock_maybe_reap() closed_socket(); } - lock.Unlock(); + pipe_lock.Unlock(); // queue for reap dout(10) << "unlock_maybe_reap queueing for reap" << dendl; @@ -1573,7 +1573,7 @@ void SimpleMessenger::Pipe::unlock_maybe_reap() } rank->lock.Unlock(); } else { - lock.Unlock(); + pipe_lock.Unlock(); } } @@ -1938,8 +1938,8 @@ void SimpleMessenger::reaper() Pipe *p = pipe_reap_queue.front(); pipe_reap_queue.pop_front(); dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; - p->lock.Lock(); - p->lock.Unlock(); + p->pipe_lock.Lock(); + p->pipe_lock.Unlock(); p->unregister_pipe(); assert(pipes.count(p)); pipes.erase(p); @@ -2215,17 +2215,17 @@ void SimpleMessenger::submit_message(Message *m, const entity_inst_t& dest, bool if (rank_pipe.count( dest_proc_addr )) { // connected? pipe = rank_pipe[ dest_proc_addr ]; - pipe->lock.Lock(); + pipe->pipe_lock.Lock(); if (pipe->state == Pipe::STATE_CLOSED) { dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring old closed pipe." << dendl; pipe->unregister_pipe(); - pipe->lock.Unlock(); + pipe->pipe_lock.Unlock(); pipe = 0; } else { dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl; pipe->_send(m); - pipe->lock.Unlock(); + pipe->pipe_lock.Unlock(); } } if (!pipe) { @@ -2259,16 +2259,16 @@ void SimpleMessenger::send_keepalive(const entity_inst_t& dest) if (rank_pipe.count( dest_proc_addr )) { // connected? pipe = rank_pipe[ dest_proc_addr ]; - pipe->lock.Lock(); + pipe->pipe_lock.Lock(); if (pipe->state == Pipe::STATE_CLOSED) { dout(20) << "send_keepalive remote, " << dest_addr << ", ignoring old closed pipe." << dendl; pipe->unregister_pipe(); - pipe->lock.Unlock(); + pipe->pipe_lock.Unlock(); pipe = 0; } else { dout(20) << "send_keepalive remote, " << dest_addr << ", have pipe." << dendl; pipe->_send_keepalive(); - pipe->lock.Unlock(); + pipe->pipe_lock.Unlock(); } } if (!pipe) @@ -2314,9 +2314,9 @@ void SimpleMessenger::wait() while (!rank_pipe.empty()) { Pipe *p = rank_pipe.begin()->second; p->unregister_pipe(); - p->lock.Lock(); + p->pipe_lock.Lock(); p->stop(); - p->lock.Unlock(); + p->pipe_lock.Unlock(); } reaper(); @@ -2346,9 +2346,9 @@ void SimpleMessenger::mark_down(entity_addr_t addr) Pipe *p = rank_pipe[addr]; dout(1) << "mark_down " << addr << " -- " << p << dendl; p->unregister_pipe(); - p->lock.Lock(); + p->pipe_lock.Lock(); p->stop(); - p->lock.Unlock(); + p->pipe_lock.Unlock(); } else { dout(1) << "mark_down " << addr << " -- pipe dne" << dendl; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index c023984fde686..c41972cdaf999 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -98,7 +98,7 @@ private: entity_addr_t peer_addr; Policy policy; - Mutex lock; + Mutex pipe_lock; int state; protected: @@ -159,7 +159,7 @@ private: Pipe(SimpleMessenger *r, int st) : rank(r), sd(-1), peer_type(-1), - lock("SimpleMessenger::Pipe::lock"), + pipe_lock("SimpleMessenger::Pipe::pipe_lock"), state(st), connection_state(new Connection), reader_running(false), writer_running(false), @@ -191,9 +191,9 @@ private: return; cond.Signal(); reader_thread.kill(SIGUSR2); - lock.Unlock(); + pipe_lock.Unlock(); reader_thread.join(); - lock.Lock(); + pipe_lock.Lock(); } // public constructors @@ -206,7 +206,7 @@ private: dout(0) << "queuing received message " << m << "in msgr " << rank << dendl; list& queue = in_q[priority]; - lock.Lock(); + pipe_lock.Lock(); queue.push_back(m); if ( 1 == queue.size()) { //this pipe isn't on the endpoint queue if (!queue_items.count(priority)) { //create an item for that priority @@ -214,18 +214,20 @@ private: pair_item(priority, new xlist::item(this)); queue_items.insert(pair_item); } - rank->local_endpoint->queue_lock.Lock(); + pipe_lock.Unlock(); + rank->local_endpoint->endpoint_lock.Lock(); rank->local_endpoint-> queued_pipes[priority].push_back(queue_items[priority]); - rank->local_endpoint->queue_lock.Unlock(); + rank->local_endpoint->endpoint_lock.Unlock(); + pipe_lock.Lock(); } - lock.Unlock(); + pipe_lock.Unlock(); //increment queue length counter - rank->local_endpoint->lock.Lock(); + rank->local_endpoint->endpoint_lock.Lock(); ++rank->local_endpoint->qlen; rank->local_endpoint->cond.Signal(); - rank->local_endpoint->lock.Unlock(); + rank->local_endpoint->endpoint_lock.Unlock(); } void queue_received(Message *m) { @@ -259,9 +261,9 @@ private: void stop(); void send(Message *m) { - lock.Lock(); + pipe_lock.Lock(); _send(m); - lock.Unlock(); + pipe_lock.Unlock(); } void _send(Message *m) { m->get(); @@ -269,9 +271,9 @@ private: cond.Signal(); } void send_keepalive() { - lock.Lock(); + pipe_lock.Lock(); _send_keepalive(); - lock.Unlock(); + pipe_lock.Unlock(); } void _send_keepalive() { keepalive = true; @@ -304,9 +306,8 @@ private: class Endpoint : public Messenger { SimpleMessenger *rank; Pipe *local_pipe; - Mutex lock; + Mutex endpoint_lock; Cond cond; - Mutex queue_lock; map > queued_pipes; map::iterator> queued_pipe_iters; bool stop; @@ -341,33 +342,32 @@ private: } void queue_connect(Connection *con) { - lock.Lock(); + endpoint_lock.Lock(); connect_q.push_back(con); local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST); cond.Signal(); - lock.Unlock(); + endpoint_lock.Unlock(); } void queue_remote_reset(Connection *con) { - lock.Lock(); + endpoint_lock.Lock(); remote_reset_q.push_back(con); local_delivery((Message*)D_BAD_REMOTE_RESET, CEPH_MSG_PRIO_HIGHEST); cond.Signal(); - lock.Unlock(); + endpoint_lock.Unlock(); } void queue_reset(Connection *con) { - lock.Lock(); + endpoint_lock.Lock(); reset_q.push_back(con); local_delivery((Message*)D_BAD_RESET, CEPH_MSG_PRIO_HIGHEST); cond.Signal(); - lock.Unlock(); + endpoint_lock.Unlock(); } public: Endpoint(SimpleMessenger *r, entity_name_t name, int rn) : Messenger(name), rank(r), - lock("SimpleMessenger::Endpoint::lock"), - queue_lock("SimpleMessenger::Endpoint:queue_lock"), + endpoint_lock("SimpleMessenger::Endpoint::endpoint_lock"), stop(false), qlen(0), my_rank(rn), -- 2.39.5