From 8a0d4864cf880efba1f8b0dcaaad40e0b601a6c5 Mon Sep 17 00:00:00 2001 From: Colin Patrick McCabe Date: Wed, 15 Jun 2011 15:24:29 -0700 Subject: [PATCH] SimpleMessenger: de-globalize Signed-off-by: Colin McCabe --- src/msg/SimpleMessenger.cc | 692 +++++++++++++++++++------------------ src/msg/SimpleMessenger.h | 54 +-- 2 files changed, 375 insertions(+), 371 deletions(-) diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 8a768be4336e4..083d0992cd6d2 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -41,9 +41,9 @@ #define DOUT_SUBSYS ms #undef dout_prefix -#define dout_prefix _prefix(_dout, messenger) -static ostream& _prefix(std::ostream *_dout, SimpleMessenger *messenger) { - return *_dout << "-- " << messenger->ms_addr << " "; +#define dout_prefix _prefix(_dout, msgr) +static ostream& _prefix(std::ostream *_dout, SimpleMessenger *msgr) { + return *_dout << "-- " << msgr->ms_addr << " "; } @@ -55,10 +55,11 @@ static ostream& _prefix(std::ostream *_dout, SimpleMessenger *messenger) { int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, int avoid_port1, int avoid_port2) { + const md_config_t *conf = msgr->cct->_conf; // bind to a socket - dout(10) << "accepter.bind" << dendl; + ldout(msgr->cct,10) << "accepter.bind" << dendl; - int family = g_conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; + int family = conf->ms_bind_ipv6 ? AF_INET6 : AF_INET; switch (bind_addr.get_family()) { case AF_INET: case AF_INET6: @@ -70,7 +71,7 @@ int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, in listen_sd = ::socket(family, SOCK_STREAM, 0); if (listen_sd < 0) { char buf[80]; - dout(0) << "accepter.bind unable to create socket: " + ldout(msgr->cct,0) << "accepter.bind unable to create socket: " << strerror_r(errno, buf, sizeof(buf)) << dendl; cerr << "accepter.bind unable to create socket: " << strerror_r(errno, buf, sizeof(buf)) << std::endl; @@ -92,7 +93,7 @@ int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, in rc = ::bind(listen_sd, (struct sockaddr *) &listen_addr.ss_addr(), sizeof(listen_addr.ss_addr())); if (rc < 0) { char buf[80]; - dout(0) << "accepter.bind unable to bind to " << bind_addr.ss_addr() + ldout(msgr->cct,0) << "accepter.bind unable to bind to " << bind_addr.ss_addr() << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; cerr << "accepter.bind unable to bind to " << bind_addr.ss_addr() << ": " << strerror_r(errno, buf, sizeof(buf)) << std::endl; @@ -110,7 +111,7 @@ int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, in } if (rc < 0) { char buf[80]; - dout(0) << "accepter.bind unable to bind to " << bind_addr.ss_addr() + ldout(msgr->cct,0) << "accepter.bind unable to bind to " << bind_addr.ss_addr() << " on any port in range " << CEPH_PORT_START << "-" << CEPH_PORT_LAST << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; cerr << "accepter.bind unable to bind to " << bind_addr.ss_addr() @@ -118,55 +119,55 @@ int SimpleMessenger::Accepter::bind(uint64_t nonce, entity_addr_t &bind_addr, in << ": " << strerror_r(errno, buf, sizeof(buf)) << std::endl; return -errno; } - dout(10) << "accepter.bind bound on random port " << listen_addr << dendl; + ldout(msgr->cct,10) << "accepter.bind bound on random port " << listen_addr << dendl; } // what port did we get? socklen_t llen = sizeof(listen_addr.ss_addr()); getsockname(listen_sd, (sockaddr*)&listen_addr.ss_addr(), &llen); - dout(10) << "accepter.bind bound to " << listen_addr << dendl; + ldout(msgr->cct,10) << "accepter.bind bound to " << listen_addr << dendl; // listen! rc = ::listen(listen_sd, 128); if (rc < 0) { char buf[80]; - dout(0) << "accepter.bind unable to listen on " << bind_addr.ss_addr() + ldout(msgr->cct,0) << "accepter.bind unable to listen on " << bind_addr.ss_addr() << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; cerr << "accepter.bind unable to listen on " << bind_addr.ss_addr() << ": " << strerror_r(errno, buf, sizeof(buf)) << std::endl; return -errno; } - messenger->ms_addr = bind_addr; - if (messenger->ms_addr != entity_addr_t()) - messenger->need_addr = false; + msgr->ms_addr = bind_addr; + if (msgr->ms_addr != entity_addr_t()) + msgr->need_addr = false; else - messenger->need_addr = true; + msgr->need_addr = true; - if (messenger->ms_addr.get_port() == 0) { - messenger->ms_addr = listen_addr; - messenger->ms_addr.nonce = nonce; + if (msgr->ms_addr.get_port() == 0) { + msgr->ms_addr = listen_addr; + msgr->ms_addr.nonce = nonce; } - messenger->init_local_pipe(); + msgr->init_local_pipe(); - dout(1) << "accepter.bind ms_addr is " << messenger->ms_addr << " need_addr=" << messenger->need_addr << dendl; - messenger->did_bind = true; + ldout(msgr->cct,1) << "accepter.bind ms_addr is " << msgr->ms_addr << " need_addr=" << msgr->need_addr << dendl; + msgr->did_bind = true; return 0; } int SimpleMessenger::Accepter::rebind(int avoid_port) { - dout(1) << "accepter.rebind avoid " << avoid_port << dendl; + ldout(msgr->cct,1) << "accepter.rebind avoid " << avoid_port << dendl; stop(); - entity_addr_t addr = messenger->ms_addr; + entity_addr_t addr = msgr->ms_addr; int old_port = addr.get_port(); addr.set_port(0); - dout(10) << " will try " << addr << dendl; + ldout(msgr->cct,10) << " will try " << addr << dendl; int r = bind(addr.get_nonce(), addr, old_port, avoid_port); if (r == 0) start(); @@ -175,7 +176,7 @@ int SimpleMessenger::Accepter::rebind(int avoid_port) int SimpleMessenger::Accepter::start() { - dout(1) << "accepter.start" << dendl; + ldout(msgr->cct,1) << "accepter.start" << dendl; // start thread create(); @@ -185,7 +186,8 @@ int SimpleMessenger::Accepter::start() void *SimpleMessenger::Accepter::entry() { - dout(10) << "accepter starting" << dendl; + const md_config_t *conf = msgr->cct->_conf; + ldout(msgr->cct,10) << "accepter starting" << dendl; int errors = 0; @@ -195,16 +197,16 @@ void *SimpleMessenger::Accepter::entry() pfd.fd = listen_sd; pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP; while (!done) { - dout(20) << "accepter calling poll" << dendl; + ldout(msgr->cct,20) << "accepter calling poll" << dendl; int r = poll(&pfd, 1, -1); if (r < 0) break; - dout(20) << "accepter poll got " << r << dendl; + ldout(msgr->cct,20) << "accepter poll got " << r << dendl; if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP)) break; - dout(10) << "pfd.revents=" << pfd.revents << dendl; + ldout(msgr->cct,10) << "pfd.revents=" << pfd.revents << dendl; if (done) break; // accept @@ -213,49 +215,49 @@ void *SimpleMessenger::Accepter::entry() int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen); if (sd >= 0) { errors = 0; - dout(10) << "accepted incoming on sd " << sd << dendl; + ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl; // disable Nagle algorithm? - if (g_conf->ms_tcp_nodelay) { + if (conf->ms_tcp_nodelay) { int flag = 1; int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); if (r < 0) - dout(0) << "accepter could't set TCP_NODELAY: " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,0) << "accepter could't set TCP_NODELAY: " << strerror_r(errno, buf, sizeof(buf)) << dendl; } - messenger->lock.Lock(); + msgr->lock.Lock(); - if (!messenger->destination_stopped) { - Pipe *p = new Pipe(messenger, Pipe::STATE_ACCEPTING); + if (!msgr->destination_stopped) { + Pipe *p = new Pipe(msgr, Pipe::STATE_ACCEPTING); p->sd = sd; p->pipe_lock.Lock(); p->start_reader(); p->pipe_lock.Unlock(); - messenger->pipes.insert(p); + msgr->pipes.insert(p); } - messenger->lock.Unlock(); + msgr->lock.Unlock(); } else { - dout(0) << "accepter no incoming connection? sd = " << sd + ldout(msgr->cct,0) << "accepter no incoming connection? sd = " << sd << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl; if (++errors > 4) break; } } - dout(20) << "accepter closing" << dendl; + ldout(msgr->cct,20) << "accepter closing" << dendl; // don't close socket, in case we start up again? blech. if (listen_sd >= 0) { ::close(listen_sd); listen_sd = -1; } - dout(10) << "accepter stopping" << dendl; + ldout(msgr->cct,10) << "accepter stopping" << dendl; return 0; } void SimpleMessenger::Accepter::stop() { done = true; - dout(10) << "stop accepter" << dendl; + ldout(msgr->cct,10) << "stop accepter" << dendl; if (listen_sd >= 0) { ::shutdown(listen_sd, SHUT_RDWR); ::close(listen_sd); @@ -306,7 +308,7 @@ void SimpleMessenger::dispatch_entry() } else { pipe_list.push_back(pipe->queue_items[priority]); // move to end of list } - dout(20) << "dispatch_entry pipe " << pipe << " dequeued " << m << dendl; + ldout(cct,20) << "dispatch_entry pipe " << pipe << " dequeued " << m << dendl; dispatch_queue.lock.Unlock(); //done with the pipe queue for a while pipe->in_qlen--; @@ -339,7 +341,7 @@ void SimpleMessenger::dispatch_entry() uint64_t msize = m->get_dispatch_throttle_size(); m->set_dispatch_throttle_size(0); // clear it out, in case we requeue this message. - dout(1) << "<== " << m->get_source_inst() + ldout(cct,1) << "<== " << m->get_source_inst() << " " << m->get_seq() << " ==== " << *m << " ==== " << m->get_payload().length() << "+" << m->get_middle().length() @@ -352,7 +354,7 @@ void SimpleMessenger::dispatch_entry() dispatch_throttle_release(msize); - dout(20) << "done calling dispatch on " << m << dendl; + ldout(cct,20) << "done calling dispatch on " << m << dendl; } } dispatch_queue.lock.Lock(); @@ -371,7 +373,7 @@ void SimpleMessenger::dispatch_entry() void SimpleMessenger::ready() { - dout(10) << "ready " << get_myaddr() << dendl; + ldout(cct,10) << "ready " << get_myaddr() << dendl; assert(!dispatch_thread.is_started()); dispatch_thread.create(); } @@ -379,14 +381,14 @@ void SimpleMessenger::ready() int SimpleMessenger::shutdown() { - dout(10) << "shutdown " << get_myaddr() << dendl; + ldout(cct,10) << "shutdown " << get_myaddr() << dendl; // stop my dispatch thread if (dispatch_thread.am_self()) { - dout(10) << "shutdown i am dispatch, setting stop flag" << dendl; + ldout(cct,10) << "shutdown i am dispatch, setting stop flag" << dendl; dispatch_queue.stop = true; } else { - dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl; + ldout(cct,10) << "shutdown i am not dispatch, setting stop flag and joining thread." << dendl; dispatch_queue.lock.Lock(); dispatch_queue.stop = true; dispatch_queue.cond.Signal(); @@ -397,7 +399,7 @@ int SimpleMessenger::shutdown() void SimpleMessenger::suicide() { - dout(10) << "suicide " << get_myaddr() << dendl; + ldout(cct,10) << "suicide " << get_myaddr() << dendl; shutdown(); // hmm, or exit(0)? } @@ -419,7 +421,7 @@ int SimpleMessenger::send_message(Message *m, const entity_inst_t& dest) if (!m->get_priority()) m->set_priority(get_default_send_priority()); - dout(1) << "--> " << dest.name << " " << dest.addr + ldout(cct,1) << "--> " << dest.name << " " << dest.addr << " -- " << *m << " -- ?+" << m->get_data().length() << " " << m @@ -438,7 +440,7 @@ int SimpleMessenger::send_message(Message *m, Connection *con) SimpleMessenger::Pipe *pipe = (SimpleMessenger::Pipe *)con->get_pipe(); if (pipe) { - dout(1) << "--> " << con->get_peer_addr() << " -- " << *m + ldout(cct,1) << "--> " << con->get_peer_addr() << " -- " << *m << " -- ?+" << m->get_data().length() << " " << m << " con " << con << dendl; @@ -446,7 +448,7 @@ int SimpleMessenger::send_message(Message *m, Connection *con) submit_message(m, pipe); pipe->put(); } else { - dout(0) << "send_message dropped message " << *m << " because of no pipe on con " << con + ldout(cct,0) << "send_message dropped message " << *m << " because of no pipe on con " << con << dendl; // else we raced with reaper() m->put(); @@ -461,7 +463,7 @@ int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest) if (!m->get_priority()) m->set_priority(get_default_send_priority()); - dout(1) << "lazy " + ldout(cct,1) << "lazy " << " --> " << dest.name << " " << dest.addr << " -- " << *m << " -- ?+" << m->get_data().length() @@ -474,7 +476,7 @@ int SimpleMessenger::lazy_send_message(Message *m, const entity_inst_t& dest) entity_addr_t SimpleMessenger::get_myaddr() { - entity_addr_t a = messenger->ms_addr; + entity_addr_t a = msgr->ms_addr; return a; } @@ -488,7 +490,7 @@ entity_addr_t SimpleMessenger::get_myaddr() #undef dout_prefix #define dout_prefix _pipe_prefix(_dout) ostream& SimpleMessenger::Pipe::_pipe_prefix(std::ostream *_dout) { - return *_dout << "-- " << messenger->ms_addr << " >> " << peer_addr << " pipe(" << this + return *_dout << "-- " << msgr->ms_addr << " >> " << peer_addr << " pipe(" << this << " sd=" << sd << " pgs=" << peer_global_seq << " cs=" << connect_seq @@ -537,26 +539,26 @@ void SimpleMessenger::Pipe::queue_received(Message *m, int priority) if (queue.empty()) { // queue pipe AND message under pipe AND dispatch_queue locks. pipe_lock.Unlock(); - messenger->dispatch_queue.lock.Lock(); + msgr->dispatch_queue.lock.Lock(); pipe_lock.Lock(); if (halt_delivery) { - messenger->dispatch_queue.lock.Unlock(); + msgr->dispatch_queue.lock.Unlock(); goto halt; } if (queue.empty()) { - dout(20) << "queue_received queuing pipe" << dendl; + ldout(msgr->cct,20) << "queue_received queuing pipe" << dendl; if (!queue_items.count(priority)) queue_items[priority] = new xlist::item(this); - if (messenger->dispatch_queue.queued_pipes.empty()) - messenger->dispatch_queue.cond.Signal(); - messenger->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]); + if (msgr->dispatch_queue.queued_pipes.empty()) + msgr->dispatch_queue.cond.Signal(); + msgr->dispatch_queue.queued_pipes[priority].push_back(queue_items[priority]); } queue.push_back(m); - messenger->dispatch_queue.lock.Unlock(); + msgr->dispatch_queue.lock.Unlock(); } else { // just queue message under pipe lock. queue.push_back(m); @@ -564,7 +566,7 @@ void SimpleMessenger::Pipe::queue_received(Message *m, int priority) // increment queue length counters in_qlen++; - messenger->dispatch_queue.qlen.inc(); + msgr->dispatch_queue.qlen.inc(); return; @@ -573,7 +575,7 @@ void SimpleMessenger::Pipe::queue_received(Message *m, int priority) // this magic number should be larger than // the size of the D_CONNECT et al enum if (m>(void *)5) { - messenger->dispatch_throttle_release(m->get_dispatch_throttle_size()); + msgr->dispatch_throttle_release(m->get_dispatch_throttle_size()); m->put(); } } @@ -582,7 +584,7 @@ void SimpleMessenger::Pipe::queue_received(Message *m, int priority) int SimpleMessenger::Pipe::accept() { - dout(10) << "accept" << dendl; + ldout(msgr->cct,10) << "accept" << dendl; // my creater gave me sd via accept() assert(state == STATE_ACCEPTING); @@ -590,14 +592,14 @@ int SimpleMessenger::Pipe::accept() // announce myself. int rc = tcp_write(sd, CEPH_BANNER, strlen(CEPH_BANNER)); if (rc < 0) { - dout(10) << "accept couldn't write banner" << dendl; + ldout(msgr->cct,10) << "accept couldn't write banner" << dendl; state = STATE_CLOSED; return -1; } // and my addr bufferlist addrs; - ::encode(messenger->ms_addr, addrs); + ::encode(msgr->ms_addr, addrs); // and peer's socket addr (they might not know their ip) entity_addr_t socket_addr; @@ -605,7 +607,7 @@ int SimpleMessenger::Pipe::accept() int r = ::getpeername(sd, (sockaddr*)&socket_addr.ss_addr(), &len); if (r < 0) { char buf[80]; - dout(0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,0) << "accept failed to getpeername " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl; state = STATE_CLOSED; return -1; } @@ -613,24 +615,24 @@ int SimpleMessenger::Pipe::accept() rc = tcp_write(sd, addrs.c_str(), addrs.length()); if (rc < 0) { - dout(10) << "accept couldn't write my+peer addr" << dendl; + ldout(msgr->cct,10) << "accept couldn't write my+peer addr" << dendl; state = STATE_CLOSED; return -1; } - dout(10) << "accept sd=" << sd << dendl; + ldout(msgr->cct,10) << "accept sd=" << sd << dendl; // identify peer char banner[strlen(CEPH_BANNER)+1]; - rc = tcp_read(sd, banner, strlen(CEPH_BANNER), messenger->timeout); + rc = tcp_read(sd, banner, strlen(CEPH_BANNER), msgr->timeout); if (rc < 0) { - dout(10) << "accept couldn't read banner" << dendl; + ldout(msgr->cct,10) << "accept couldn't read banner" << dendl; state = STATE_CLOSED; return -1; } if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { banner[strlen(CEPH_BANNER)] = 0; - dout(1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl; + ldout(msgr->cct,1) << "accept peer sent bad banner '" << banner << "' (should be '" << CEPH_BANNER << "')" << dendl; state = STATE_CLOSED; return -1; } @@ -639,9 +641,9 @@ int SimpleMessenger::Pipe::accept() bufferptr tp(sizeof(peer_addr)); addrbl.push_back(tp); } - rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), messenger->timeout); + rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), msgr->timeout); if (rc < 0) { - dout(10) << "accept couldn't read peer_addr" << dendl; + ldout(msgr->cct,10) << "accept couldn't read peer_addr" << dendl; state = STATE_CLOSED; return -1; } @@ -650,13 +652,13 @@ int SimpleMessenger::Pipe::accept() ::decode(peer_addr, ti); } - dout(10) << "accept peer addr is " << peer_addr << dendl; + ldout(msgr->cct,10) << "accept peer addr is " << peer_addr << dendl; if (peer_addr.is_blank_addr()) { // peer apparently doesn't know what ip they have; figure it out for them. int port = peer_addr.get_port(); peer_addr.addr = socket_addr.addr; peer_addr.set_port(port); - dout(0) << "accept peer addr is really " << peer_addr + ldout(msgr->cct,0) << "accept peer addr is really " << peer_addr << " (socket is " << socket_addr << ")" << dendl; } set_peer_addr(peer_addr); // so that connection_state gets set up @@ -675,9 +677,9 @@ int SimpleMessenger::Pipe::accept() bool replace = false; uint64_t existing_seq = -1; while (1) { - rc = tcp_read(sd, (char*)&connect, sizeof(connect), messenger->timeout); + rc = tcp_read(sd, (char*)&connect, sizeof(connect), msgr->timeout); if (rc < 0) { - dout(10) << "accept couldn't read connect" << dendl; + ldout(msgr->cct,10) << "accept couldn't read connect" << dendl; goto fail_unlocked; } @@ -685,122 +687,122 @@ int SimpleMessenger::Pipe::accept() authorizer.clear(); if (connect.authorizer_len) { bp = buffer::create(connect.authorizer_len); - if (tcp_read(sd, bp.c_str(), connect.authorizer_len, messenger->timeout) < 0) { - dout(10) << "accept couldn't read connect authorizer" << dendl; + if (tcp_read(sd, bp.c_str(), connect.authorizer_len, msgr->timeout) < 0) { + ldout(msgr->cct,10) << "accept couldn't read connect authorizer" << dendl; goto fail_unlocked; } authorizer.push_back(bp); authorizer_reply.clear(); } - dout(20) << "accept got peer connect_seq " << connect.connect_seq + ldout(msgr->cct,20) << "accept got peer connect_seq " << connect.connect_seq << " global_seq " << connect.global_seq << dendl; - messenger->lock.Lock(); + msgr->lock.Lock(); // note peer's type, flags set_peer_type(connect.host_type); - policy = messenger->get_policy(connect.host_type); - dout(10) << "accept of host_type " << connect.host_type + policy = msgr->get_policy(connect.host_type); + ldout(msgr->cct,10) << "accept of host_type " << connect.host_type << ", policy.lossy=" << policy.lossy << dendl; memset(&reply, 0, sizeof(reply)); - reply.protocol_version = get_proto_version(messenger->my_type, peer_type, false); + reply.protocol_version = get_proto_version(msgr->my_type, peer_type, false); // mismatch? - dout(10) << "accept my proto " << reply.protocol_version + ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version << ", their proto " << connect.protocol_version << dendl; if (connect.protocol_version != reply.protocol_version) { reply.tag = CEPH_MSGR_TAG_BADPROTOVER; - messenger->lock.Unlock(); + msgr->lock.Unlock(); goto reply; } feat_missing = policy.features_required & ~(uint64_t)connect.features; if (feat_missing) { - dout(1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl; + ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl; reply.tag = CEPH_MSGR_TAG_FEATURES; - messenger->lock.Unlock(); + msgr->lock.Unlock(); goto reply; } - messenger->lock.Unlock(); - if (messenger->verify_authorizer(connection_state, peer_type, + msgr->lock.Unlock(); + if (msgr->verify_authorizer(connection_state, peer_type, connect.authorizer_protocol, authorizer, authorizer_reply, authorizer_valid) && !authorizer_valid) { - dout(0) << "accept bad authorizer" << dendl; + ldout(msgr->cct,0) << "accept bad authorizer" << dendl; reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER; goto reply; } - messenger->lock.Lock(); + msgr->lock.Lock(); // existing? - if (messenger->rank_pipe.count(peer_addr)) { - existing = messenger->rank_pipe[peer_addr]; + if (msgr->rank_pipe.count(peer_addr)) { + existing = msgr->rank_pipe[peer_addr]; existing->pipe_lock.Lock(); if (connect.global_seq < existing->peer_global_seq) { - dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq + ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq << " > " << connect.global_seq << ", RETRY_GLOBAL" << dendl; reply.tag = CEPH_MSGR_TAG_RETRY_GLOBAL; reply.global_seq = existing->peer_global_seq; // so we can send it below.. existing->pipe_lock.Unlock(); - messenger->lock.Unlock(); + msgr->lock.Unlock(); goto reply; } else { - dout(10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq + ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq << " <= " << connect.global_seq << ", looks ok" << dendl; } if (existing->policy.lossy) { - dout(0) << "accept replacing existing (lossy) channel (new one lossy=" + ldout(msgr->cct,0) << "accept replacing existing (lossy) channel (new one lossy=" << policy.lossy << ")" << dendl; existing->was_session_reset(); goto replace; } /*if (lossy_rx) { if (existing->state == STATE_STANDBY) { - dout(0) << "accept incoming lossy connection, kicking outgoing lossless " + ldout(msgr->cct,0) << "accept incoming lossy connection, kicking outgoing lossless " << existing << dendl; existing->state = STATE_CONNECTING; existing->cond.Signal(); } else { - dout(0) << "accept incoming lossy connection, our lossless " << existing + ldout(msgr->cct,0) << "accept incoming lossy connection, our lossless " << existing << " has state " << existing->state << ", doing nothing" << dendl; } existing->lock.Unlock(); goto fail; }*/ - dout(0) << "accept connect_seq " << connect.connect_seq + ldout(msgr->cct,0) << "accept connect_seq " << connect.connect_seq << " vs existing " << existing->connect_seq << " state " << existing->state << dendl; if (connect.connect_seq < existing->connect_seq) { if (connect.connect_seq == 0) { - dout(0) << "accept peer reset, then tried to connect to us, replacing" << dendl; + ldout(msgr->cct,0) << "accept peer reset, then tried to connect to us, replacing" << dendl; existing->was_session_reset(); // this resets out_queue, msg_ and connect_seq #'s goto replace; } else { // old attempt, or we sent READY but they didn't get it. - dout(10) << "accept existing " << existing << ".cseq " << existing->connect_seq + ldout(msgr->cct,10) << "accept existing " << existing << ".cseq " << existing->connect_seq << " > " << connect.connect_seq << ", RETRY_SESSION" << dendl; reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; reply.connect_seq = existing->connect_seq; // so we can send it below.. existing->pipe_lock.Unlock(); - messenger->lock.Unlock(); + msgr->lock.Unlock(); goto reply; } } if (connect.connect_seq == existing->connect_seq) { // connection race? - if (peer_addr < messenger->ms_addr || + if (peer_addr < msgr->ms_addr || existing->policy.server) { // incoming wins - dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq + ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", or we are server, replacing my attempt" << dendl; assert(existing->state == STATE_CONNECTING || existing->state == STATE_STANDBY || @@ -808,14 +810,14 @@ int SimpleMessenger::Pipe::accept() goto replace; } else { // our existing outgoing wins - dout(10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq + ldout(msgr->cct,10) << "accept connection race, existing " << existing << ".cseq " << existing->connect_seq << " == " << connect.connect_seq << ", sending WAIT" << dendl; - assert(peer_addr > messenger->ms_addr); + assert(peer_addr > msgr->ms_addr); assert(existing->state == STATE_CONNECTING || existing->state == STATE_OPEN); // this will win reply.tag = CEPH_MSGR_TAG_WAIT; existing->pipe_lock.Unlock(); - messenger->lock.Unlock(); + msgr->lock.Unlock(); goto reply; } } @@ -823,29 +825,29 @@ int SimpleMessenger::Pipe::accept() assert(connect.connect_seq > existing->connect_seq); assert(connect.global_seq >= existing->peer_global_seq); if (existing->connect_seq == 0) { - dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq + ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << ", " << existing << ".cseq = " << existing->connect_seq << "), sending RESETSESSION" << dendl; reply.tag = CEPH_MSGR_TAG_RESETSESSION; - messenger->lock.Unlock(); + msgr->lock.Unlock(); existing->pipe_lock.Unlock(); goto reply; } // reconnect - dout(10) << "accept peer sent cseq " << connect.connect_seq + ldout(msgr->cct,10) << "accept peer sent cseq " << connect.connect_seq << " > " << existing->connect_seq << dendl; goto replace; } // existing else if (connect.connect_seq > 0) { // we reset, and they are opening a new session - dout(0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; - messenger->lock.Unlock(); + ldout(msgr->cct,0) << "accept we reset (peer sent cseq " << connect.connect_seq << "), sending RESETSESSION" << dendl; + msgr->lock.Unlock(); reply.tag = CEPH_MSGR_TAG_RESETSESSION; goto reply; } else { // new session - dout(10) << "accept new session" << dendl; + ldout(msgr->cct,10) << "accept new session" << dendl; existing = NULL; goto open; } @@ -870,7 +872,7 @@ int SimpleMessenger::Pipe::accept() reply_tag = CEPH_MSGR_TAG_SEQ; existing_seq = existing->in_seq; } - dout(10) << "accept replacing " << existing << dendl; + ldout(msgr->cct,10) << "accept replacing " << existing << dendl; existing->stop(); existing->unregister_pipe(); @@ -887,7 +889,7 @@ int SimpleMessenger::Pipe::accept() out_seq = existing->out_seq; in_seq = existing->in_seq; in_seq_acked = in_seq; - dout(10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl; + ldout(msgr->cct,10) << "accept re-queuing on out_seq " << out_seq << " in_seq " << in_seq << dendl; for (map >::iterator p = existing->out_q.begin(); p != existing->out_q.end(); p++) @@ -900,12 +902,12 @@ int SimpleMessenger::Pipe::accept() connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; state = STATE_OPEN; - dout(10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; + ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; // send READY reply reply.tag = (reply_tag ? reply_tag : CEPH_MSGR_TAG_READY); reply.features = policy.features_supported; - reply.global_seq = messenger->get_global_seq(); + reply.global_seq = msgr->get_global_seq(); reply.connect_seq = connect_seq; reply.flags = 0; reply.authorizer_len = authorizer_reply.length(); @@ -913,11 +915,11 @@ int SimpleMessenger::Pipe::accept() reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY; connection_state->set_features((int)reply.features & (int)connect.features); - dout(10) << "accept features " << connection_state->get_features() << dendl; + ldout(msgr->cct,10) << "accept features " << connection_state->get_features() << dendl; // ok! register_pipe(); - messenger->lock.Unlock(); + msgr->lock.Unlock(); rc = tcp_write(sd, (char*)&reply, sizeof(reply)); if (rc < 0) { @@ -934,11 +936,11 @@ int SimpleMessenger::Pipe::accept() if (reply_tag == CEPH_MSGR_TAG_SEQ) { uint64_t newly_acked_seq = 0; if(tcp_write(sd, (char*)&existing_seq, sizeof(existing_seq)) < 0) { - dout(2) << "accept write error on in_seq" << dendl; + ldout(msgr->cct,2) << "accept write error on in_seq" << dendl; goto fail_unlocked; } if(tcp_read(sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) { - dout(2) << "accept read error on newly_acked_seq" << dendl; + ldout(msgr->cct,2) << "accept read error on newly_acked_seq" << dendl; goto fail_unlocked; } requeue_sent(newly_acked_seq); @@ -946,10 +948,10 @@ int SimpleMessenger::Pipe::accept() pipe_lock.Lock(); if (state != STATE_CLOSED) { - dout(10) << "accept starting writer, " << "state=" << state << dendl; + ldout(msgr->cct,10) << "accept starting writer, " << "state=" << state << dendl; start_writer(); } - dout(20) << "accept done" << dendl; + ldout(msgr->cct,20) << "accept done" << dendl; pipe_lock.Unlock(); return 0; // success. @@ -971,11 +973,11 @@ int SimpleMessenger::Pipe::connect() { bool got_bad_auth = false; - dout(10) << "connect " << connect_seq << dendl; + ldout(msgr->cct,10) << "connect " << connect_seq << dendl; assert(pipe_lock.is_locked()); __u32 cseq = connect_seq; - __u32 gseq = messenger->get_global_seq(); + __u32 gseq = msgr->get_global_seq(); // stop reader thrad join_reader(); @@ -992,6 +994,7 @@ int SimpleMessenger::Pipe::connect() entity_addr_t peer_addr_for_me, socket_addr; AuthAuthorizer *authorizer = NULL; bufferlist addrbl, myaddrbl; + const md_config_t *conf = msgr->cct->_conf; // close old socket. this is safe because we stopped the reader thread above. if (sd >= 0) @@ -1001,7 +1004,7 @@ int SimpleMessenger::Pipe::connect() sd = ::socket(peer_addr.get_family(), SOCK_STREAM, 0); if (sd < 0) { char buf[80]; - derr << "connect couldn't created socket " << strerror_r(errno, buf, sizeof(buf)) << dendl; + lderr(msgr->cct) << "connect couldn't created socket " << strerror_r(errno, buf, sizeof(buf)) << dendl; assert(0); goto fail; } @@ -1009,31 +1012,31 @@ int SimpleMessenger::Pipe::connect() char buf[80]; // connect! - dout(10) << "connecting to " << peer_addr << dendl; + ldout(msgr->cct,10) << "connecting to " << peer_addr << dendl; rc = ::connect(sd, (sockaddr*)&peer_addr.addr, sizeof(peer_addr.addr)); if (rc < 0) { - dout(2) << "connect error " << peer_addr + ldout(msgr->cct,2) << "connect error " << peer_addr << ", " << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } // disable Nagle algorithm? - if (g_conf->ms_tcp_nodelay) { + if (conf->ms_tcp_nodelay) { int flag = 1; int r = ::setsockopt(sd, IPPROTO_TCP, TCP_NODELAY, (char*)&flag, sizeof(flag)); if (r < 0) - dout(0) << "connect couldn't set TCP_NODELAY: " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,0) << "connect couldn't set TCP_NODELAY: " << strerror_r(errno, buf, sizeof(buf)) << dendl; } // verify banner // FIXME: this should be non-blocking, or in some other way verify the banner as we get it. - rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER), messenger->timeout); + rc = tcp_read(sd, (char*)&banner, strlen(CEPH_BANNER), msgr->timeout); if (rc < 0) { - dout(2) << "connect couldn't read banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "connect couldn't read banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } if (memcmp(banner, CEPH_BANNER, strlen(CEPH_BANNER))) { - dout(0) << "connect protocol error (bad banner) on peer " << paddr << dendl; + ldout(msgr->cct,0) << "connect protocol error (bad banner) on peer " << paddr << dendl; goto fail; } @@ -1044,7 +1047,7 @@ int SimpleMessenger::Pipe::connect() msg.msg_iovlen = 1; msglen = msgvec[0].iov_len; if (do_sendmsg(sd, &msg, msglen)) { - dout(2) << "connect couldn't write my banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "connect couldn't write my banner, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } @@ -1053,9 +1056,9 @@ int SimpleMessenger::Pipe::connect() bufferptr p(sizeof(paddr) * 2); addrbl.push_back(p); } - rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), messenger->timeout); + rc = tcp_read(sd, addrbl.c_str(), addrbl.length(), msgr->timeout); if (rc < 0) { - dout(2) << "connect couldn't read peer addrs, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "connect couldn't read peer addrs, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } { @@ -1064,26 +1067,26 @@ int SimpleMessenger::Pipe::connect() ::decode(peer_addr_for_me, p); } - dout(20) << "connect read peer addr " << paddr << " on socket " << sd << dendl; + ldout(msgr->cct,20) << "connect read peer addr " << paddr << " on socket " << sd << dendl; if (peer_addr != paddr) { if (paddr.is_blank_addr() && peer_addr.get_port() == paddr.get_port() && peer_addr.get_nonce() == paddr.get_nonce()) { - dout(0) << "connect claims to be " + ldout(msgr->cct,0) << "connect claims to be " << paddr << " not " << peer_addr << " - presumably this is the same node!" << dendl; } else { - dout(0) << "connect claims to be " + ldout(msgr->cct,0) << "connect claims to be " << paddr << " not " << peer_addr << " - wrong node!" << dendl; goto fail; } } - dout(20) << "connect peer addr for me is " << peer_addr_for_me << dendl; + ldout(msgr->cct,20) << "connect peer addr for me is " << peer_addr_for_me << dendl; - if (messenger->need_addr) - messenger->learned_addr(peer_addr_for_me); + if (msgr->need_addr) + msgr->learned_addr(peer_addr_for_me); - ::encode(messenger->ms_addr, myaddrbl); + ::encode(msgr->ms_addr, myaddrbl); memset(&msg, 0, sizeof(msg)); msgvec[0].iov_base = myaddrbl.c_str(); @@ -1092,27 +1095,27 @@ int SimpleMessenger::Pipe::connect() msg.msg_iovlen = 1; msglen = msgvec[0].iov_len; if (do_sendmsg(sd, &msg, msglen)) { - dout(2) << "connect couldn't write my addr, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "connect couldn't write my addr, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } - dout(10) << "connect sent my addr " << messenger->ms_addr << dendl; + ldout(msgr->cct,10) << "connect sent my addr " << msgr->ms_addr << dendl; while (1) { delete authorizer; - authorizer = messenger->get_authorizer(peer_type, false); + authorizer = msgr->get_authorizer(peer_type, false); bufferlist authorizer_reply; ceph_msg_connect connect; connect.features = policy.features_supported; - connect.host_type = messenger->my_type; + connect.host_type = msgr->my_type; connect.global_seq = gseq; connect.connect_seq = cseq; - connect.protocol_version = get_proto_version(messenger->my_type, peer_type, true); + connect.protocol_version = get_proto_version(msgr->my_type, peer_type, true); connect.authorizer_protocol = authorizer ? authorizer->protocol : 0; connect.authorizer_len = authorizer ? authorizer->bl.length() : 0; if (authorizer) - dout(10) << "connect.authorizer_len=" << connect.authorizer_len + ldout(msgr->cct,10) << "connect.authorizer_len=" << connect.authorizer_len << " protocol=" << connect.authorizer_protocol << dendl; connect.flags = 0; if (policy.lossy) @@ -1130,20 +1133,20 @@ int SimpleMessenger::Pipe::connect() msglen += msgvec[1].iov_len; } - dout(10) << "connect sending gseq=" << gseq << " cseq=" << cseq + ldout(msgr->cct,10) << "connect sending gseq=" << gseq << " cseq=" << cseq << " proto=" << connect.protocol_version << dendl; if (do_sendmsg(sd, &msg, msglen)) { - dout(2) << "connect couldn't write gseq, cseq, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "connect couldn't write gseq, cseq, " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } - dout(20) << "connect wrote (self +) cseq, waiting for reply" << dendl; + ldout(msgr->cct,20) << "connect wrote (self +) cseq, waiting for reply" << dendl; ceph_msg_connect_reply reply; - if (tcp_read(sd, (char*)&reply, sizeof(reply), messenger->timeout) < 0) { - dout(2) << "connect read reply " << strerror_r(errno, buf, sizeof(buf)) << dendl; + if (tcp_read(sd, (char*)&reply, sizeof(reply), msgr->timeout) < 0) { + ldout(msgr->cct,2) << "connect read reply " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } - dout(20) << "connect got reply tag " << (int)reply.tag + ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag << " connect_seq " << reply.connect_seq << " global_seq " << reply.global_seq << " proto " << reply.protocol_version @@ -1153,10 +1156,10 @@ int SimpleMessenger::Pipe::connect() authorizer_reply.clear(); if (reply.authorizer_len) { - dout(10) << "reply.authorizer_len=" << reply.authorizer_len << dendl; + ldout(msgr->cct,10) << "reply.authorizer_len=" << reply.authorizer_len << dendl; bufferptr bp = buffer::create(reply.authorizer_len); - if (tcp_read(sd, bp.c_str(), reply.authorizer_len, messenger->timeout) < 0) { - dout(10) << "connect couldn't read connect authorizer_reply" << dendl; + if (tcp_read(sd, bp.c_str(), reply.authorizer_len, msgr->timeout) < 0) { + ldout(msgr->cct,10) << "connect couldn't read connect authorizer_reply" << dendl; goto fail; } authorizer_reply.push_back(bp); @@ -1165,19 +1168,19 @@ int SimpleMessenger::Pipe::connect() if (authorizer) { bufferlist::iterator iter = authorizer_reply.begin(); if (!authorizer->verify_reply(iter)) { - dout(0) << "failed verifying authorize reply" << dendl; + ldout(msgr->cct,0) << "failed verifying authorize reply" << dendl; goto fail; } } pipe_lock.Lock(); if (state != STATE_CONNECTING) { - dout(0) << "connect got RESETSESSION but no longer connecting" << dendl; + ldout(msgr->cct,0) << "connect got RESETSESSION but no longer connecting" << dendl; goto stop_locked; } if (reply.tag == CEPH_MSGR_TAG_FEATURES) { - dout(0) << "connect protocol feature mismatch, my " << std::hex + ldout(msgr->cct,0) << "connect protocol feature mismatch, my " << std::hex << connect.features << " < peer " << reply.features << " missing " << (reply.features & ~policy.features_supported) << std::dec << dendl; @@ -1185,22 +1188,22 @@ int SimpleMessenger::Pipe::connect() } if (reply.tag == CEPH_MSGR_TAG_BADPROTOVER) { - dout(0) << "connect protocol version mismatch, my " << connect.protocol_version + ldout(msgr->cct,0) << "connect protocol version mismatch, my " << connect.protocol_version << " != " << reply.protocol_version << dendl; goto fail_locked; } if (reply.tag == CEPH_MSGR_TAG_BADAUTHORIZER) { - dout(0) << "connect got BADAUTHORIZER" << dendl; + ldout(msgr->cct,0) << "connect got BADAUTHORIZER" << dendl; if (got_bad_auth) goto stop_locked; got_bad_auth = true; pipe_lock.Unlock(); - authorizer = messenger->get_authorizer(peer_type, true); // try harder + authorizer = msgr->get_authorizer(peer_type, true); // try harder continue; } if (reply.tag == CEPH_MSGR_TAG_RESETSESSION) { - dout(0) << "connect got RESETSESSION" << dendl; + ldout(msgr->cct,0) << "connect got RESETSESSION" << dendl; was_session_reset(); halt_delivery = false; cseq = 0; @@ -1208,15 +1211,15 @@ int SimpleMessenger::Pipe::connect() continue; } if (reply.tag == CEPH_MSGR_TAG_RETRY_GLOBAL) { - gseq = messenger->get_global_seq(reply.global_seq); - dout(10) << "connect got RETRY_GLOBAL " << reply.global_seq + gseq = msgr->get_global_seq(reply.global_seq); + ldout(msgr->cct,10) << "connect got RETRY_GLOBAL " << reply.global_seq << " chose new " << gseq << dendl; pipe_lock.Unlock(); continue; } if (reply.tag == CEPH_MSGR_TAG_RETRY_SESSION) { assert(reply.connect_seq > connect_seq); - dout(10) << "connect got RETRY_SESSION " << connect_seq + ldout(msgr->cct,10) << "connect got RETRY_SESSION " << connect_seq << " -> " << reply.connect_seq << dendl; cseq = connect_seq = reply.connect_seq; pipe_lock.Unlock(); @@ -1224,7 +1227,7 @@ int SimpleMessenger::Pipe::connect() } if (reply.tag == CEPH_MSGR_TAG_WAIT) { - dout(3) << "connect got WAIT (connection race)" << dendl; + ldout(msgr->cct,3) << "connect got WAIT (connection race)" << dendl; state = STATE_WAIT; goto stop_locked; } @@ -1233,20 +1236,20 @@ int SimpleMessenger::Pipe::connect() reply.tag == CEPH_MSGR_TAG_SEQ) { uint64_t feat_missing = policy.features_required & ~(uint64_t)reply.features; if (feat_missing) { - dout(1) << "missing required features " << std::hex << feat_missing << std::dec << dendl; + ldout(msgr->cct,1) << "missing required features " << std::hex << feat_missing << std::dec << dendl; goto fail_locked; } if (reply.tag == CEPH_MSGR_TAG_SEQ) { - dout(10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; + ldout(msgr->cct,10) << "got CEPH_MSGR_TAG_SEQ, reading acked_seq and writing in_seq" << dendl; uint64_t newly_acked_seq = 0; if (tcp_read(sd, (char*)&newly_acked_seq, sizeof(newly_acked_seq)) < 0) { - dout(2) << "connect read error on newly_acked_seq" << dendl; + ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << dendl; goto fail_locked; } handle_ack(newly_acked_seq); if (tcp_write(sd, (char*)&in_seq, sizeof(in_seq)) < 0) { - dout(2) << "connect write error on in_seq" << dendl; + ldout(msgr->cct,2) << "connect write error on in_seq" << dendl; goto fail_locked; } } @@ -1261,18 +1264,18 @@ int SimpleMessenger::Pipe::connect() assert(connect_seq == reply.connect_seq); backoff = utime_t(); connection_state->set_features((unsigned)reply.features & (unsigned)connect.features); - dout(10) << "connect success " << connect_seq << ", lossy = " << policy.lossy + ldout(msgr->cct,10) << "connect success " << connect_seq << ", lossy = " << policy.lossy << ", features " << connection_state->get_features() << dendl; - if (!messenger->destination_stopped) { + if (!msgr->destination_stopped) { Connection * cstate = connection_state->get(); pipe_lock.Unlock(); - messenger->dispatch_queue.queue_connect(cstate); + msgr->dispatch_queue.queue_connect(cstate); pipe_lock.Lock(); } if (!reader_running) { - dout(20) << "connect starting reader" << dendl; + ldout(msgr->cct,20) << "connect starting reader" << dendl; start_reader(); } delete authorizer; @@ -1280,7 +1283,7 @@ int SimpleMessenger::Pipe::connect() } // protocol error - dout(0) << "connect got bad tag " << (int)tag << dendl; + ldout(msgr->cct,0) << "connect got bad tag " << (int)tag << dendl; goto fail_locked; } @@ -1290,7 +1293,7 @@ int SimpleMessenger::Pipe::connect() if (state == STATE_CONNECTING) fault(); else - dout(3) << "connect fault, but state != connecting, stopping" << dendl; + ldout(msgr->cct,3) << "connect fault, but state != connecting, stopping" << dendl; stop_locked: delete authorizer; @@ -1299,21 +1302,21 @@ int SimpleMessenger::Pipe::connect() void SimpleMessenger::Pipe::register_pipe() { - dout(10) << "register_pipe" << dendl; - assert(messenger->lock.is_locked()); - assert(messenger->rank_pipe.count(peer_addr) == 0); - messenger->rank_pipe[peer_addr] = this; + ldout(msgr->cct,10) << "register_pipe" << dendl; + assert(msgr->lock.is_locked()); + assert(msgr->rank_pipe.count(peer_addr) == 0); + msgr->rank_pipe[peer_addr] = this; } void SimpleMessenger::Pipe::unregister_pipe() { - assert(messenger->lock.is_locked()); - if (messenger->rank_pipe.count(peer_addr) && - messenger->rank_pipe[peer_addr] == this) { - dout(10) << "unregister_pipe" << dendl; - messenger->rank_pipe.erase(peer_addr); + assert(msgr->lock.is_locked()); + if (msgr->rank_pipe.count(peer_addr) && + msgr->rank_pipe[peer_addr] == this) { + ldout(msgr->cct,10) << "unregister_pipe" << dendl; + msgr->rank_pipe.erase(peer_addr); } else { - dout(10) << "unregister_pipe - not registered" << dendl; + ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl; } } @@ -1328,7 +1331,7 @@ void SimpleMessenger::Pipe::requeue_sent(uint64_t max_acked) Message *m = sent.back(); if (m->get_seq() > max_acked) { sent.pop_back(); - dout(10) << "requeue_sent " << *m << " for resend seq " << out_seq + ldout(msgr->cct,10) << "requeue_sent " << *m << " for resend seq " << out_seq << " (" << m->get_seq() << ")" << dendl; rq.push_front(m); out_seq--; @@ -1343,12 +1346,12 @@ void SimpleMessenger::Pipe::requeue_sent(uint64_t max_acked) */ void SimpleMessenger::Pipe::discard_queue() { - dout(10) << "discard_queue" << dendl; + ldout(msgr->cct,10) << "discard_queue" << dendl; halt_delivery = true; // dequeue pipe - DispatchQueue& q = messenger->dispatch_queue; + DispatchQueue& q = msgr->dispatch_queue; pipe_lock.Unlock(); q.lock.Lock(); pipe_lock.Lock(); @@ -1371,26 +1374,26 @@ void SimpleMessenger::Pipe::discard_queue() q.lock.Unlock(); - dout(20) << " dequeued pipe " << dendl; + ldout(msgr->cct,20) << " dequeued pipe " << dendl; // adjust qlen q.qlen.sub(in_qlen); for (list::iterator p = sent.begin(); p != sent.end(); p++) { - dout(20) << " discard " << *p << dendl; + ldout(msgr->cct,20) << " discard " << *p << dendl; (*p)->put(); } sent.clear(); for (map >::iterator p = out_q.begin(); p != out_q.end(); p++) for (list::iterator r = p->second.begin(); r != p->second.end(); r++) { - dout(20) << " discard " << *r << dendl; + ldout(msgr->cct,20) << " discard " << *r << dendl; (*r)->put(); } out_q.clear(); for (map >::iterator p = in_q.begin(); p != in_q.end(); p++) for (list::iterator r = p->second.begin(); r != p->second.end(); r++) { - messenger->dispatch_throttle_release((*r)->get_dispatch_throttle_size()); - dout(20) << " discard " << *r << dendl; + msgr->dispatch_throttle_release((*r)->get_dispatch_throttle_size()); + ldout(msgr->cct,20) << " discard " << *r << dendl; (*r)->put(); } in_q.clear(); @@ -1400,20 +1403,21 @@ void SimpleMessenger::Pipe::discard_queue() void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) { + const md_config_t *conf = msgr->cct->_conf; assert(pipe_lock.is_locked()); cond.Signal(); if (onread && state == STATE_CONNECTING) { - dout(10) << "fault already connecting, reader shutting down" << dendl; + ldout(msgr->cct,10) << "fault already connecting, reader shutting down" << dendl; return; } char buf[80]; - if (!onconnect) dout(2) << "fault " << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; + if (!onconnect) ldout(msgr->cct,2) << "fault " << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; if (state == STATE_CLOSED || state == STATE_CLOSING) { - dout(10) << "fault already closed|closing" << dendl; + ldout(msgr->cct,10) << "fault already closed|closing" << dendl; return; } @@ -1421,7 +1425,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) // lossy channel? if (policy.lossy) { - dout(10) << "fault on lossy channel, failing" << dendl; + ldout(msgr->cct,10) << "fault on lossy channel, failing" << dendl; fail(); return; } @@ -1431,10 +1435,10 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) if (!is_queued()) { if (state == STATE_CLOSING || onconnect) { - dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl; + ldout(msgr->cct,10) << "fault on connect, or already closing, and q empty: setting closed." << dendl; state = STATE_CLOSED; } else { - dout(0) << "fault with nothing to send, going to standby" << dendl; + ldout(msgr->cct,0) << "fault with nothing to send, going to standby" << dendl; state = STATE_STANDBY; } return; @@ -1443,37 +1447,37 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread) if (state != STATE_CONNECTING) { if (!onconnect) - dout(0) << "fault initiating reconnect" << dendl; + ldout(msgr->cct,0) << "fault initiating reconnect" << dendl; connect_seq++; state = STATE_CONNECTING; backoff = utime_t(); } else if (backoff == utime_t()) { if (!onconnect) - dout(0) << "fault first fault" << dendl; - backoff.set_from_double(g_conf->ms_initial_backoff); + ldout(msgr->cct,0) << "fault first fault" << dendl; + backoff.set_from_double(conf->ms_initial_backoff); } else { - dout(10) << "fault waiting " << backoff << dendl; + ldout(msgr->cct,10) << "fault waiting " << backoff << dendl; cond.WaitInterval(&g_ceph_context, pipe_lock, backoff); backoff += backoff; - if (backoff > g_conf->ms_max_backoff) - backoff.set_from_double(g_conf->ms_max_backoff); - dout(10) << "fault done waiting or woke up" << dendl; + if (backoff > conf->ms_max_backoff) + backoff.set_from_double(conf->ms_max_backoff); + ldout(msgr->cct,10) << "fault done waiting or woke up" << dendl; } } void SimpleMessenger::Pipe::fail() { - dout(10) << "fail" << dendl; + ldout(msgr->cct,10) << "fail" << dendl; assert(pipe_lock.is_locked()); stop(); discard_queue(); - if (!messenger->destination_stopped) { + if (!msgr->destination_stopped) { Connection * cstate = connection_state->get(); pipe_lock.Unlock(); - messenger->dispatch_queue.queue_reset(cstate); + msgr->dispatch_queue.queue_reset(cstate); pipe_lock.Lock(); } } @@ -1482,13 +1486,13 @@ void SimpleMessenger::Pipe::was_session_reset() { assert(pipe_lock.is_locked()); - dout(10) << "was_session_reset" << dendl; + ldout(msgr->cct,10) << "was_session_reset" << dendl; discard_queue(); - if (!messenger->destination_stopped) { + if (!msgr->destination_stopped) { Connection * cstate = connection_state->get(); pipe_lock.Unlock(); - messenger->dispatch_queue.queue_remote_reset(cstate); + msgr->dispatch_queue.queue_remote_reset(cstate); pipe_lock.Lock(); } @@ -1499,7 +1503,7 @@ void SimpleMessenger::Pipe::was_session_reset() void SimpleMessenger::Pipe::stop() { - dout(10) << "stop" << dendl; + ldout(msgr->cct,10) << "stop" << dendl; assert(pipe_lock.is_locked()); state = STATE_CLOSED; cond.Signal(); @@ -1524,7 +1528,7 @@ void SimpleMessenger::Pipe::reader() // sleep if (re)connecting if (state == STATE_STANDBY) { - dout(20) << "reader sleeping during reconnect|standby" << dendl; + ldout(msgr->cct,20) << "reader sleeping during reconnect|standby" << dendl; cond.Wait(pipe_lock); continue; } @@ -1533,29 +1537,29 @@ void SimpleMessenger::Pipe::reader() char buf[80]; char tag = -1; - dout(20) << "reader reading tag..." << dendl; - int rc = tcp_read(sd, (char*)&tag, 1, messenger->timeout); + ldout(msgr->cct,20) << "reader reading tag..." << dendl; + int rc = tcp_read(sd, (char*)&tag, 1, msgr->timeout); if (rc < 0) { pipe_lock.Lock(); - dout(2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "reader couldn't read tag, " << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(false, true); continue; } if (tag == CEPH_MSGR_TAG_KEEPALIVE) { - dout(20) << "reader got KEEPALIVE" << dendl; + ldout(msgr->cct,20) << "reader got KEEPALIVE" << dendl; pipe_lock.Lock(); continue; } // open ... if (tag == CEPH_MSGR_TAG_ACK) { - dout(20) << "reader got ACK" << dendl; + ldout(msgr->cct,20) << "reader got ACK" << dendl; ceph_le64 seq; - int rc = tcp_read( sd, (char*)&seq, sizeof(seq), messenger->timeout); + int rc = tcp_read( sd, (char*)&seq, sizeof(seq), msgr->timeout); pipe_lock.Lock(); if (rc < 0) { - dout(2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "reader couldn't read ack seq, " << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(false, true); } else if (state != STATE_CLOSED) { handle_ack(seq); @@ -1564,7 +1568,7 @@ void SimpleMessenger::Pipe::reader() } else if (tag == CEPH_MSGR_TAG_MSG) { - dout(20) << "reader got MSG" << dendl; + ldout(msgr->cct,20) << "reader got MSG" << dendl; Message *m = 0; int r = read_message(&m); @@ -1578,7 +1582,7 @@ void SimpleMessenger::Pipe::reader() if (state == STATE_CLOSED || state == STATE_CONNECTING) { - messenger->dispatch_throttle_release(m->get_dispatch_throttle_size()); + msgr->dispatch_throttle_release(m->get_dispatch_throttle_size()); m->put(); continue; } @@ -1589,10 +1593,10 @@ void SimpleMessenger::Pipe::reader() // occasionally pull a message out of the sent queue to send elsewhere. in that case // it doesn't matter if we "got" it or not. if (m->get_seq() <= in_seq) { - dout(0) << "reader got old message " + ldout(msgr->cct,0) << "reader got old message " << m->get_seq() << " <= " << in_seq << " " << m << " " << *m << ", discarding" << dendl; - messenger->dispatch_throttle_release(m->get_dispatch_throttle_size()); + msgr->dispatch_throttle_release(m->get_dispatch_throttle_size()); m->put(); continue; } @@ -1604,14 +1608,14 @@ void SimpleMessenger::Pipe::reader() cond.Signal(); // wake up writer, to ack this - dout(10) << "reader got message " + ldout(msgr->cct,10) << "reader got message " << m->get_seq() << " " << m << " " << *m << dendl; queue_received(m); } else if (tag == CEPH_MSGR_TAG_CLOSE) { - dout(20) << "reader got CLOSE" << dendl; + ldout(msgr->cct,20) << "reader got CLOSE" << dendl; pipe_lock.Lock(); if (state == STATE_CLOSING) state = STATE_CLOSED; @@ -1621,7 +1625,7 @@ void SimpleMessenger::Pipe::reader() break; } else { - dout(0) << "reader bad tag " << (int)tag << dendl; + ldout(msgr->cct,0) << "reader bad tag " << (int)tag << dendl; pipe_lock.Lock(); fault(false, true); } @@ -1631,7 +1635,7 @@ void SimpleMessenger::Pipe::reader() // reap? reader_running = false; unlock_maybe_reap(); - dout(10) << "reader done" << dendl; + ldout(msgr->cct,10) << "reader done" << dendl; } /* write msgs to socket. @@ -1643,7 +1647,7 @@ void SimpleMessenger::Pipe::writer() pipe_lock.Lock(); while (state != STATE_CLOSED) {// && state != STATE_WAIT) { - dout(10) << "writer: state = " << state << " policy.server=" << policy.server << dendl; + ldout(msgr->cct,10) << "writer: state = " << state << " policy.server=" << policy.server << dendl; // standby? if (is_queued() && state == STATE_STANDBY && !policy.server) { @@ -1663,7 +1667,7 @@ void SimpleMessenger::Pipe::writer() if (state == STATE_CLOSING) { // write close tag - dout(20) << "writer writing CLOSE tag" << dendl; + ldout(msgr->cct,20) << "writer writing CLOSE tag" << dendl; char tag = CEPH_MSGR_TAG_CLOSE; state = STATE_CLOSED; pipe_lock.Unlock(); @@ -1685,7 +1689,7 @@ void SimpleMessenger::Pipe::writer() int rc = write_keepalive(); pipe_lock.Lock(); if (rc < 0) { - dout(2) << "writer couldn't write keepalive, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "writer couldn't write keepalive, " << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(); continue; } @@ -1699,7 +1703,7 @@ void SimpleMessenger::Pipe::writer() int rc = write_ack(send_seq); pipe_lock.Lock(); if (rc < 0) { - dout(2) << "writer couldn't write ack, " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,2) << "writer couldn't write ack, " << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(); continue; } @@ -1717,7 +1721,7 @@ void SimpleMessenger::Pipe::writer() } pipe_lock.Unlock(); - dout(20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; + ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; // associate message with Connection (for benefit of encode_payload) m->set_connection(connection_state->get()); @@ -1725,12 +1729,12 @@ void SimpleMessenger::Pipe::writer() // encode and copy out of *m m->encode(); - dout(20) << "writer sending " << m->get_seq() << " " << m << dendl; + ldout(msgr->cct,20) << "writer sending " << m->get_seq() << " " << m << dendl; int rc = write_message(m); pipe_lock.Lock(); if (rc < 0) { - dout(1) << "writer error sending " << m << ", " + ldout(msgr->cct,1) << "writer error sending " << m << ", " << errno << ": " << strerror_r(errno, buf, sizeof(buf)) << dendl; fault(); } @@ -1741,23 +1745,23 @@ void SimpleMessenger::Pipe::writer() if (sent.empty() && close_on_empty) { // this is slightly hacky - dout(10) << "writer out and sent queues empty, closing" << dendl; + ldout(msgr->cct,10) << "writer out and sent queues empty, closing" << dendl; policy.lossy = true; fault(); continue; } // wait - dout(20) << "writer sleeping" << dendl; + ldout(msgr->cct,20) << "writer sleeping" << dendl; cond.Wait(pipe_lock); } - dout(20) << "writer finishing" << dendl; + ldout(msgr->cct,20) << "writer finishing" << dendl; // reap? writer_running = false; unlock_maybe_reap(); - dout(10) << "writer done" << dendl; + ldout(msgr->cct,10) << "writer done" << dendl; } void SimpleMessenger::Pipe::unlock_maybe_reap() @@ -1765,7 +1769,7 @@ void SimpleMessenger::Pipe::unlock_maybe_reap() if (!reader_running && !writer_running) { shutdown_socket(); pipe_lock.Unlock(); - messenger->queue_reap(this); + msgr->queue_reap(this); } else { pipe_lock.Unlock(); } @@ -1799,19 +1803,19 @@ int SimpleMessenger::Pipe::read_message(Message **pm) { int ret = -1; // envelope - //dout(10) << "receiver.read_message from sd " << sd << dendl; + //ldout(msgr->cct,10) << "receiver.read_message from sd " << sd << dendl; ceph_msg_header header; ceph_msg_footer footer; __u32 header_crc; if (connection_state->has_feature(CEPH_FEATURE_NOSRCADDR)) { - if (tcp_read( sd, (char*)&header, sizeof(header), messenger->timeout ) < 0) + if (tcp_read( sd, (char*)&header, sizeof(header), msgr->timeout ) < 0) return -1; header_crc = ceph_crc32c_le(0, (unsigned char *)&header, sizeof(header) - sizeof(header.crc)); } else { ceph_msg_header_old oldheader; - if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader), messenger->timeout ) < 0) + if (tcp_read( sd, (char*)&oldheader, sizeof(oldheader), msgr->timeout ) < 0) return -1; // this is fugly memcpy(&header, &oldheader, sizeof(header)); @@ -1821,7 +1825,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) header_crc = ceph_crc32c_le(0, (unsigned char *)&oldheader, sizeof(oldheader) - sizeof(oldheader.crc)); } - dout(20) << "reader got envelope type=" << header.type + ldout(msgr->cct,20) << "reader got envelope type=" << header.type << " src " << entity_name_t(header.src) << " front=" << header.front_len << " data=" << header.data_len @@ -1830,7 +1834,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) // verify header crc if (header_crc != header.crc) { - dout(0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; + ldout(msgr->cct,0) << "reader got bad header crc " << header_crc << " != " << header.crc << dendl; return -1; } @@ -1843,7 +1847,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) uint64_t message_size = header.front_len + header.middle_len + header.data_len; if (message_size) { if (policy.throttler) { - dout(10) << "reader wants " << message_size << " from policy throttler " + ldout(msgr->cct,10) << "reader wants " << message_size << " from policy throttler " << policy.throttler->get_current() << "/" << policy.throttler->get_max() << dendl; policy.throttler->get(message_size); @@ -1853,30 +1857,30 @@ int SimpleMessenger::Pipe::read_message(Message **pm) // policy throttle, as this one does not deadlock (unless dispatch // blocks indefinitely, which it shouldn't). in contrast, the // policy throttle carries for the lifetime of the message. - dout(10) << "reader wants " << message_size << " from dispatch throttler " - << messenger->dispatch_throttler.get_current() << "/" - << messenger->dispatch_throttler.get_max() << dendl; - messenger->dispatch_throttler.get(message_size); + ldout(msgr->cct,10) << "reader wants " << message_size << " from dispatch throttler " + << msgr->dispatch_throttler.get_current() << "/" + << msgr->dispatch_throttler.get_max() << dendl; + msgr->dispatch_throttler.get(message_size); } // read front front_len = header.front_len; if (front_len) { bufferptr bp = buffer::create(front_len); - if (tcp_read( sd, bp.c_str(), front_len, messenger->timeout ) < 0) + if (tcp_read( sd, bp.c_str(), front_len, msgr->timeout ) < 0) goto out_dethrottle; front.push_back(bp); - dout(20) << "reader got front " << front.length() << dendl; + ldout(msgr->cct,20) << "reader got front " << front.length() << dendl; } // read middle middle_len = header.middle_len; if (middle_len) { bufferptr bp = buffer::create(middle_len); - if (tcp_read( sd, bp.c_str(), middle_len, messenger->timeout ) < 0) + if (tcp_read( sd, bp.c_str(), middle_len, msgr->timeout ) < 0) goto out_dethrottle; middle.push_back(bp); - dout(20) << "reader got middle " << middle.length() << dendl; + ldout(msgr->cct,20) << "reader got middle " << middle.length() << dendl; } @@ -1893,7 +1897,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) while (left > 0) { // wait for data - if (tcp_read_wait(sd, messenger->timeout) < 0) + if (tcp_read_wait(sd, msgr->timeout) < 0) goto out_dethrottle; // get a buffer @@ -1901,7 +1905,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) map >::iterator p = connection_state->rx_buffers.find(header.tid); if (p != connection_state->rx_buffers.end()) { if (rxbuf.length() == 0 || p->second.second != rxbuf_version) { - dout(10) << "reader seleting rx buffer v " << p->second.second + ldout(msgr->cct,10) << "reader seleting rx buffer v " << p->second.second << " at offset " << offset << " len " << p->second.first.length() << dendl; rxbuf = p->second.first; @@ -1914,7 +1918,7 @@ int SimpleMessenger::Pipe::read_message(Message **pm) } } else { if (!newbuf.length()) { - dout(20) << "reader allocating new rx buffer at offset " << offset << dendl; + ldout(msgr->cct,20) << "reader allocating new rx buffer at offset " << offset << dendl; alloc_aligned_buffer(newbuf, data_len, data_off); blp = newbuf.begin(); blp.advance(offset); @@ -1922,9 +1926,9 @@ int SimpleMessenger::Pipe::read_message(Message **pm) } bufferptr bp = blp.get_current_ptr(); int read = MIN(bp.length(), left); - dout(20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl; + ldout(msgr->cct,20) << "reader reading nonblocking into " << (void*)bp.c_str() << " len " << bp.length() << dendl; int got = tcp_read_nonblocking(sd, bp.c_str(), read); - dout(30) << "reader read " << got << " of " << read << dendl; + ldout(msgr->cct,30) << "reader read " << got << " of " << read << dendl; connection_state->lock.Unlock(); if (got < 0) goto out_dethrottle; @@ -1938,19 +1942,19 @@ int SimpleMessenger::Pipe::read_message(Message **pm) } // footer - if (tcp_read(sd, (char*)&footer, sizeof(footer), messenger->timeout) < 0) + if (tcp_read(sd, (char*)&footer, sizeof(footer), msgr->timeout) < 0) goto out_dethrottle; aborted = (footer.flags & CEPH_MSG_FOOTER_COMPLETE) == 0; - dout(10) << "aborted = " << aborted << dendl; + ldout(msgr->cct,10) << "aborted = " << aborted << dendl; if (aborted) { - dout(0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() + ldout(msgr->cct,0) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() << " byte message.. ABORTED" << dendl; ret = 0; goto out_dethrottle; } - dout(20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() + ldout(msgr->cct,20) << "reader got " << front.length() << " + " << middle.length() << " + " << data.length() << " byte message" << dendl; message = decode_message(header, footer, front, middle, data); if (!message) { @@ -1971,13 +1975,13 @@ int SimpleMessenger::Pipe::read_message(Message **pm) // release bytes reserved from the throttlers on failure if (message_size) { if (policy.throttler) { - dout(10) << "reader releasing " << message_size << " to policy throttler " + ldout(msgr->cct,10) << "reader releasing " << message_size << " to policy throttler " << policy.throttler->get_current() << "/" << policy.throttler->get_max() << dendl; policy.throttler->put(message_size); } - messenger->dispatch_throttle_release(message_size); + msgr->dispatch_throttle_release(message_size); } return ret; } @@ -1996,13 +2000,13 @@ int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len, bool int r = ::sendmsg(sd, msg, MSG_NOSIGNAL | (more ? MSG_MORE : 0)); if (r == 0) - dout(10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl; + ldout(msgr->cct,10) << "do_sendmsg hmm do_sendmsg got r==0!" << dendl; if (r < 0) { - dout(1) << "do_sendmsg error " << strerror_r(errno, buf, sizeof(buf)) << dendl; + ldout(msgr->cct,1) << "do_sendmsg error " << strerror_r(errno, buf, sizeof(buf)) << dendl; return -1; } if (state == STATE_CLOSED) { - dout(10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl; + ldout(msgr->cct,10) << "do_sendmsg oh look, state == CLOSED, giving up" << dendl; errno = EINTR; return -1; // close enough } @@ -2012,7 +2016,7 @@ int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len, bool struct iovec *v = msg->msg_iov; size_t left = r; size_t vpos = 0; - dout(0) << "do_sendmsg wrote " << r << " bytes, hexdump:\n"; + ldout(msgr->cct,0) << "do_sendmsg wrote " << r << " bytes, hexdump:\n"; int pos = 0; int col = 0; char buf[20]; @@ -2045,17 +2049,17 @@ int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len, bool if (len == 0) break; // hrmph. trim r bytes off the front of our message. - dout(20) << "do_sendmsg short write did " << r << ", still have " << len << dendl; + ldout(msgr->cct,20) << "do_sendmsg short write did " << r << ", still have " << len << dendl; while (r > 0) { if (msg->msg_iov[0].iov_len <= (size_t)r) { // lose this whole item - //dout(30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl; + //ldout(msgr->cct,30) << "skipping " << msg->msg_iov[0].iov_len << ", " << (msg->msg_iovlen-1) << " v, " << r << " left" << dendl; r -= msg->msg_iov[0].iov_len; msg->msg_iov++; msg->msg_iovlen--; } else { // partial! - //dout(30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl; + //ldout(msgr->cct,30) << "adjusting " << msg->msg_iov[0].iov_len << ", " << msg->msg_iovlen << " v, " << r << " left" << dendl; msg->msg_iov[0].iov_base = (char *)msg->msg_iov[0].iov_base + r; msg->msg_iov[0].iov_len -= r; break; @@ -2068,7 +2072,7 @@ int SimpleMessenger::Pipe::do_sendmsg(int sd, struct msghdr *msg, int len, bool int SimpleMessenger::Pipe::write_ack(uint64_t seq) { - dout(10) << "write_ack " << seq << dendl; + ldout(msgr->cct,10) << "write_ack " << seq << dendl; char c = CEPH_MSGR_TAG_ACK; ceph_le64 s; @@ -2091,7 +2095,7 @@ int SimpleMessenger::Pipe::write_ack(uint64_t seq) int SimpleMessenger::Pipe::write_keepalive() { - dout(10) << "write_keepalive" << dendl; + ldout(msgr->cct,10) << "write_keepalive" << dendl; char c = CEPH_MSGR_TAG_KEEPALIVE; @@ -2126,7 +2130,7 @@ int SimpleMessenger::Pipe::write_message(Message *m) blist.append(m->get_middle()); blist.append(m->get_data()); - dout(20) << "write_message " << m << dendl; + ldout(msgr->cct,20) << "write_message " << m << dendl; // set up msghdr and iovecs struct msghdr msg; @@ -2172,11 +2176,11 @@ int SimpleMessenger::Pipe::write_message(Message *m) while (left > 0) { int donow = MIN(left, (int)pb->length()-b_off); if (donow == 0) { - dout(0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() + ldout(msgr->cct,0) << "donow = " << donow << " left " << left << " pb->length " << pb->length() << " b_off " << b_off << dendl; } assert(donow > 0); - dout(30) << " bl_pos " << bl_pos << " b_off " << b_off + ldout(msgr->cct,30) << " bl_pos " << bl_pos << " b_off " << b_off << " leftinchunk " << left << " buffer len " << pb->length() << " writing " << donow @@ -2241,23 +2245,23 @@ int SimpleMessenger::Pipe::write_message(Message *m) void SimpleMessenger::dispatch_throttle_release(uint64_t msize) { if (msize) { - dout(10) << "dispatch_throttle_release " << msize << " to dispatch throttler " - << messenger->dispatch_throttler.get_current() << "/" - << messenger->dispatch_throttler.get_max() << dendl; + ldout(cct,10) << "dispatch_throttle_release " << msize << " to dispatch throttler " + << msgr->dispatch_throttler.get_current() << "/" + << msgr->dispatch_throttler.get_max() << dendl; dispatch_throttler.put(msize); } } void SimpleMessenger::reaper_entry() { - dout(10) << "reaper_entry start" << dendl; + ldout(cct,10) << "reaper_entry start" << dendl; lock.Lock(); while (!reaper_stop) { reaper(); reaper_cond.Wait(lock); } lock.Unlock(); - dout(10) << "reaper_entry done" << dendl; + ldout(cct,10) << "reaper_entry done" << dendl; } /* @@ -2265,13 +2269,13 @@ void SimpleMessenger::reaper_entry() */ void SimpleMessenger::reaper() { - dout(10) << "reaper" << dendl; + ldout(cct,10) << "reaper" << dendl; assert(lock.is_locked()); while (!pipe_reap_queue.empty()) { Pipe *p = pipe_reap_queue.front(); pipe_reap_queue.pop_front(); - dout(10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; + ldout(cct,10) << "reaper reaping pipe " << p << " " << p->get_peer_addr() << dendl; p->pipe_lock.Lock(); p->discard_queue(); p->pipe_lock.Unlock(); @@ -2281,18 +2285,18 @@ void SimpleMessenger::reaper() p->join(); if (p->sd >= 0) ::close(p->sd); - dout(10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; + ldout(cct,10) << "reaper reaped pipe " << p << " " << p->get_peer_addr() << dendl; if (p->connection_state) p->connection_state->clear_pipe(); p->put(); - dout(10) << "reaper deleted pipe " << p << dendl; + ldout(cct,10) << "reaper deleted pipe " << p << dendl; } - dout(10) << "reaper done" << dendl; + ldout(cct,10) << "reaper done" << dendl; } void SimpleMessenger::queue_reap(Pipe *pipe) { - dout(10) << "queue_reap " << pipe << dendl; + ldout(cct,10) << "queue_reap " << pipe << dendl; lock.Lock(); pipe_reap_queue.push_back(pipe); reaper_cond.Signal(); @@ -2305,11 +2309,11 @@ int SimpleMessenger::bind(entity_addr_t bind_addr, int64_t nonce) { lock.Lock(); if (started) { - dout(10) << "rank.bind already started" << dendl; + ldout(cct,10) << "rank.bind already started" << dendl; lock.Unlock(); return -1; } - dout(10) << "rank.bind" << dendl; + ldout(cct,10) << "rank.bind" << dendl; lock.Unlock(); // bind to a socket @@ -2318,7 +2322,7 @@ int SimpleMessenger::bind(entity_addr_t bind_addr, int64_t nonce) int SimpleMessenger::rebind(int avoid_port) { - dout(1) << "rebind avoid " << avoid_port << dendl; + ldout(cct,1) << "rebind avoid " << avoid_port << dendl; mark_down_all(); return accepter.rebind(avoid_port); } @@ -2326,7 +2330,7 @@ int SimpleMessenger::rebind(int avoid_port) int SimpleMessenger::start_with_nonce(uint64_t nonce) { lock.Lock(); - dout(1) << "messenger.start" << dendl; + ldout(cct,1) << "messenger.start" << dendl; // register at least one entity, first! assert(my_type >= 0); @@ -2356,7 +2360,7 @@ SimpleMessenger::Pipe *SimpleMessenger::connect_rank(const entity_addr_t& addr, assert(lock.is_locked()); assert(addr != ms_addr); - dout(10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; + ldout(cct,10) << "connect_rank to " << addr << ", creating pipe and registering" << dendl; // create pipe Pipe *pipe = new Pipe(this, Pipe::STATE_CONNECTING); @@ -2395,7 +2399,7 @@ bool SimpleMessenger::verify_authorizer(Connection *con, int peer_type, */ bool SimpleMessenger::register_entity(entity_name_t name) { - dout(10) << "register_entity " << name << dendl; + ldout(cct,10) << "register_entity " << name << dendl; lock.Lock(); if (!destination_stopped) { //already have a working entity set @@ -2413,9 +2417,9 @@ bool SimpleMessenger::register_entity(entity_name_t name) destination_stopped = false; - dout(10) << "register_entity " << name << " at " << get_myaddr() << dendl; + ldout(cct,10) << "register_entity " << name << " at " << get_myaddr() << dendl; - messenger->init_local_pipe(); + msgr->init_local_pipe(); lock.Unlock(); return true; @@ -2425,17 +2429,17 @@ void SimpleMessenger::submit_message(Message *m, Pipe *pipe) { lock.Lock(); if (pipe == dispatch_queue.local_pipe) { - dout(20) << "submit_message " << *m << " local" << dendl; + ldout(cct,20) << "submit_message " << *m << " local" << dendl; dispatch_queue.local_delivery(m, m->get_priority()); } else { pipe->pipe_lock.Lock(); if (pipe->state == Pipe::STATE_CLOSED) { - dout(20) << "submit_message " << *m << " ignoring closed pipe " << pipe->peer_addr << dendl; + ldout(cct,20) << "submit_message " << *m << " ignoring closed pipe " << pipe->peer_addr << dendl; pipe->unregister_pipe(); pipe->pipe_lock.Unlock(); m->put(); } else { - dout(20) << "submit_message " << *m << " remote " << pipe->peer_addr << dendl; + ldout(cct,20) << "submit_message " << *m << " remote " << pipe->peer_addr << dendl; pipe->_send(m); pipe->pipe_lock.Unlock(); } @@ -2486,7 +2490,7 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, assert(m->nref.read() == 1); if (dest_addr == entity_addr_t()) { - dout(0) << "submit_message message " << *m << " with empty dest " << dest_addr << dendl; + ldout(cct,0) << "submit_message message " << *m << " with empty dest " << dest_addr << dendl; m->put(); return; } @@ -2497,10 +2501,10 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, if (ms_addr == dest_addr) { if (!destination_stopped) { // local - dout(20) << "submit_message " << *m << " local" << dendl; + ldout(cct,20) << "submit_message " << *m << " local" << dendl; dispatch_queue.local_delivery(m, m->get_priority()); } else { - dout(0) << "submit_message " << *m << " " << dest_addr << " local but no local endpoint, dropping." << dendl; + ldout(cct,0) << "submit_message " << *m << " " << dest_addr << " local but no local endpoint, dropping." << dendl; assert(0); // hmpf, this is probably mds->mon beacon from newsyn. m->put(); } @@ -2511,12 +2515,12 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, pipe = rank_pipe[ dest_addr ]; pipe->pipe_lock.Lock(); if (pipe->state == Pipe::STATE_CLOSED) { - dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring closed pipe." << dendl; + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", ignoring closed pipe." << dendl; pipe->unregister_pipe(); pipe->pipe_lock.Unlock(); pipe = 0; } else { - dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl; + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", have pipe." << dendl; pipe->_send(m); pipe->pipe_lock.Unlock(); @@ -2525,14 +2529,14 @@ void SimpleMessenger::submit_message(Message *m, const entity_addr_t& dest_addr, if (!pipe) { Policy& policy = get_policy(dest_type); if (policy.lossy && policy.server) { - dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type " + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lossy server for target type " << ceph_entity_type_name(dest_type) << ", no session, dropping." << dendl; m->put(); } else if (lazy) { - dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", lazy, dropping." << dendl; + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", lazy, dropping." << dendl; m->put(); } else { - dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl; + ldout(cct,20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl; // not connected. pipe = connect_rank(dest_addr, dest_type); pipe->send(m); @@ -2560,18 +2564,18 @@ int SimpleMessenger::send_keepalive(const entity_inst_t& dest) pipe = rank_pipe[ dest_proc_addr ]; pipe->pipe_lock.Lock(); if (pipe->state == Pipe::STATE_CLOSED) { - dout(20) << "send_keepalive remote, " << dest_addr << ", ignoring old closed pipe." << dendl; + ldout(cct,20) << "send_keepalive remote, " << dest_addr << ", ignoring old closed pipe." << dendl; pipe->unregister_pipe(); pipe->pipe_lock.Unlock(); pipe = 0; } else { - dout(20) << "send_keepalive remote, " << dest_addr << ", have pipe." << dendl; + ldout(cct,20) << "send_keepalive remote, " << dest_addr << ", have pipe." << dendl; pipe->_send_keepalive(); pipe->pipe_lock.Unlock(); } } if (!pipe) - dout(20) << "send_keepalive no pipe for " << dest_addr << ", doing nothing." << dendl; + ldout(cct,20) << "send_keepalive no pipe for " << dest_addr << ", doing nothing." << dendl; } } lock.Unlock(); @@ -2588,35 +2592,35 @@ void SimpleMessenger::wait() return; } while (!destination_stopped) { - dout(10) << "wait: still active" << dendl; + ldout(cct,10) << "wait: still active" << dendl; wait_cond.Wait(lock); - dout(10) << "wait: woke up" << dendl; + ldout(cct,10) << "wait: woke up" << dendl; } - dout(10) << "wait: everything stopped" << dendl; + ldout(cct,10) << "wait: everything stopped" << dendl; lock.Unlock(); // done! clean up. if (did_bind) { - dout(20) << "wait: stopping accepter thread" << dendl; + ldout(cct,20) << "wait: stopping accepter thread" << dendl; accepter.stop(); - dout(20) << "wait: stopped accepter thread" << dendl; + ldout(cct,20) << "wait: stopped accepter thread" << dendl; } if (reaper_started) { - dout(20) << "wait: stopping reaper thread" << dendl; + ldout(cct,20) << "wait: stopping reaper thread" << dendl; lock.Lock(); reaper_cond.Signal(); reaper_stop = true; lock.Unlock(); reaper_thread.join(); reaper_started = false; - dout(20) << "wait: stopped reaper thread" << dendl; + ldout(cct,20) << "wait: stopped reaper thread" << dendl; } // close+reap all pipes lock.Lock(); { - dout(10) << "wait: closing pipes" << dendl; + ldout(cct,10) << "wait: closing pipes" << dendl; while (!rank_pipe.empty()) { Pipe *p = rank_pipe.begin()->second; @@ -2627,7 +2631,7 @@ void SimpleMessenger::wait() } reaper(); - dout(10) << "wait: waiting for pipes " << pipes << " to close" << dendl; + ldout(cct,10) << "wait: waiting for pipes " << pipes << " to close" << dendl; while (!pipes.empty()) { reaper_cond.Wait(lock); reaper(); @@ -2635,8 +2639,8 @@ void SimpleMessenger::wait() } lock.Unlock(); - dout(10) << "wait: done." << dendl; - dout(1) << "shutdown complete." << dendl; + ldout(cct,10) << "wait: done." << dendl; + ldout(cct,1) << "shutdown complete." << dendl; started = false; did_bind = false; my_type = -1; @@ -2645,12 +2649,12 @@ void SimpleMessenger::wait() void SimpleMessenger::mark_down_all() { - dout(1) << "mark_down_all" << dendl; + ldout(cct,1) << "mark_down_all" << dendl; lock.Lock(); while (!rank_pipe.empty()) { hash_map::iterator it = rank_pipe.begin(); Pipe *p = it->second; - dout(5) << "mark_down_all " << it->first << " " << p << dendl; + ldout(cct,5) << "mark_down_all " << it->first << " " << p << dendl; rank_pipe.erase(it); p->unregister_pipe(); p->pipe_lock.Lock(); @@ -2665,13 +2669,13 @@ void SimpleMessenger::mark_down(const entity_addr_t& addr) lock.Lock(); if (rank_pipe.count(addr)) { Pipe *p = rank_pipe[addr]; - dout(1) << "mark_down " << addr << " -- " << p << dendl; + ldout(cct,1) << "mark_down " << addr << " -- " << p << dendl; p->unregister_pipe(); p->pipe_lock.Lock(); p->stop(); p->pipe_lock.Unlock(); } else { - dout(1) << "mark_down " << addr << " -- pipe dne" << dendl; + ldout(cct,1) << "mark_down " << addr << " -- pipe dne" << dendl; } lock.Unlock(); } @@ -2681,14 +2685,14 @@ void SimpleMessenger::mark_down(Connection *con) lock.Lock(); Pipe *p = (Pipe *)con->get_pipe(); if (p) { - dout(1) << "mark_down " << con << " -- " << p << dendl; + ldout(cct,1) << "mark_down " << con << " -- " << p << dendl; p->unregister_pipe(); p->pipe_lock.Lock(); p->stop(); p->pipe_lock.Unlock(); p->put(); } else { - dout(1) << "mark_down " << con << " -- pipe dne" << dendl; + ldout(cct,1) << "mark_down " << con << " -- pipe dne" << dendl; } lock.Unlock(); } @@ -2701,16 +2705,16 @@ void SimpleMessenger::mark_down_on_empty(Connection *con) p->pipe_lock.Lock(); p->unregister_pipe(); if (p->out_q.empty()) { - dout(1) << "mark_down_on_empty " << con << " -- " << p << " closing (queue is empty)" << dendl; + ldout(cct,1) << "mark_down_on_empty " << con << " -- " << p << " closing (queue is empty)" << dendl; p->stop(); } else { - dout(1) << "mark_down_on_empty " << con << " -- " << p << " marking (queue is not empty)" << dendl; + ldout(cct,1) << "mark_down_on_empty " << con << " -- " << p << " marking (queue is not empty)" << dendl; p->close_on_empty = true; } p->pipe_lock.Unlock(); p->put(); } else { - dout(1) << "mark_down_on_empty " << con << " -- pipe dne" << dendl; + ldout(cct,1) << "mark_down_on_empty " << con << " -- pipe dne" << dendl; } lock.Unlock(); } @@ -2720,14 +2724,14 @@ void SimpleMessenger::mark_disposable(Connection *con) lock.Lock(); Pipe *p = (Pipe *)con->get_pipe(); if (p) { - dout(1) << "mark_disposable " << con << " -- " << p << dendl; + ldout(cct,1) << "mark_disposable " << con << " -- " << p << dendl; p->pipe_lock.Lock(); p->policy.lossy = true; p->disposable = true; p->pipe_lock.Unlock(); p->put(); } else { - dout(1) << "mark_disposable " << con << " -- pipe dne" << dendl; + ldout(cct,1) << "mark_disposable " << con << " -- pipe dne" << dendl; } lock.Unlock(); } @@ -2738,7 +2742,7 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) int port = ms_addr.get_port(); ms_addr.addr = peer_addr_for_me.addr; ms_addr.set_port(port); - dout(1) << "learned my addr " << ms_addr << dendl; + ldout(cct,1) << "learned my addr " << ms_addr << dendl; need_addr = false; init_local_pipe(); lock.Unlock(); @@ -2746,6 +2750,6 @@ void SimpleMessenger::learned_addr(const entity_addr_t &peer_addr_for_me) void SimpleMessenger::init_local_pipe() { - dispatch_queue.local_pipe->connection_state->peer_addr = messenger->ms_addr; - dispatch_queue.local_pipe->connection_state->peer_type = messenger->my_type; + dispatch_queue.local_pipe->connection_state->peer_addr = msgr->ms_addr; + dispatch_queue.local_pipe->connection_state->peer_type = msgr->my_type; } diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index b2ac38d9e104c..34d2d2185b472 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -96,11 +96,11 @@ private: // incoming class Accepter : public Thread { public: - SimpleMessenger *messenger; + SimpleMessenger *msgr; bool done; int listen_sd; - Accepter(SimpleMessenger *r) : messenger(r), done(false), listen_sd(-1) {} + Accepter(SimpleMessenger *r) : msgr(r), done(false), listen_sd(-1) {} void *entry(); void stop(); @@ -114,7 +114,7 @@ private: // pipe class Pipe : public RefCountedObject { public: - SimpleMessenger *messenger; + SimpleMessenger *msgr; ostream& _pipe_prefix(std::ostream *_dout); enum { @@ -178,20 +178,20 @@ private: /* Clean up sent list */ void handle_ack(uint64_t seq) { - dout(15) << "reader got ack seq " << seq << dendl; + ldout(msgr->cct, 15) << "reader got ack seq " << seq << dendl; // trim sent list while (!sent.empty() && sent.front()->get_seq() <= seq) { Message *m = sent.front(); sent.pop_front(); - dout(10) << "reader got ack seq " + ldout(msgr->cct, 10) << "reader got ack seq " << seq << " >= " << m->get_seq() << " on " << m << " " << *m << dendl; m->put(); } if (sent.empty() && close_on_empty) { // this is slightly hacky - dout(10) << "reader got last ack, queue empty, closing" << dendl; + ldout(msgr->cct, 10) << "reader got last ack, queue empty, closing" << dendl; policy.lossy = true; fault(); } @@ -216,7 +216,7 @@ private: public: Pipe(SimpleMessenger *r, int st) : - messenger(r), + msgr(r), sd(-1), peer_type(-1), pipe_lock("SimpleMessenger::Pipe::pipe_lock"), @@ -229,9 +229,9 @@ private: out_seq(0), in_seq(0), in_seq_acked(0), reader_thread(this), writer_thread(this) { connection_state->pipe = get(); - messenger->timeout = g_conf->ms_tcp_read_timeout * 1000; //convert to ms - if (messenger->timeout == 0) - messenger->timeout = -1; + msgr->timeout = msgr->cct->_conf->ms_tcp_read_timeout * 1000; //convert to ms + if (msgr->timeout == 0) + msgr->timeout = -1; } ~Pipe() { for (map::item* >::iterator i = queue_items.begin(); @@ -251,13 +251,13 @@ private: assert(pipe_lock.is_locked()); assert(!reader_running); reader_running = true; - reader_thread.create(g_conf->ms_rwthread_stack_bytes); + reader_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes); } void start_writer() { assert(pipe_lock.is_locked()); assert(!writer_running); writer_running = true; - writer_thread.create(g_conf->ms_rwthread_stack_bytes); + writer_thread.create(msgr->cct->_conf->ms_rwthread_stack_bytes); } void join_reader() { if (!reader_running) @@ -281,7 +281,7 @@ private: void queue_received(Message *m, int priority); void queue_received(Message *m) { - m->set_recv_stamp(ceph_clock_now(&g_ceph_context)); + m->set_recv_stamp(ceph_clock_now(msgr->cct)); // this is just to make sure that a changeset is working // properly; if you start using the refcounting more and have @@ -485,13 +485,13 @@ private: // reaper class ReaperThread : public Thread { - SimpleMessenger *messenger; + SimpleMessenger *msgr; public: - ReaperThread(SimpleMessenger *m) : messenger(m) {} + ReaperThread(SimpleMessenger *m) : msgr(m) {} void *entry() { - messenger->get(); - messenger->reaper_entry(); - messenger->put(); + msgr->get(); + msgr->reaper_entry(); + msgr->put(); return 0; } } reaper_thread; @@ -533,20 +533,20 @@ private: private: class DispatchThread : public Thread { - SimpleMessenger *messenger; + SimpleMessenger *msgr; public: - DispatchThread(SimpleMessenger *_messenger) : messenger(_messenger) {} + DispatchThread(SimpleMessenger *_messenger) : msgr(_messenger) {} void *entry() { - messenger->get(); - messenger->dispatch_entry(); - messenger->put(); + msgr->get(); + msgr->dispatch_entry(); + msgr->put(); return 0; } } dispatch_thread; void dispatch_entry(); - SimpleMessenger *messenger; //hack to make dout macro work, will fix + SimpleMessenger *msgr; //hack to make dout macro work, will fix int timeout; public: @@ -554,11 +554,11 @@ public: Messenger(cct, entity_name_t()), accepter(this), lock("SimpleMessenger::lock"), started(false), did_bind(false), - dispatch_throttler(g_conf->ms_dispatch_throttle_bytes), need_addr(true), + dispatch_throttler(cct->_conf->ms_dispatch_throttle_bytes), need_addr(true), destination_stopped(true), my_type(-1), global_seq_lock("SimpleMessenger::global_seq_lock"), global_seq(0), reaper_thread(this), reaper_started(false), reaper_stop(false), - dispatch_thread(this), messenger(this) { + dispatch_thread(this), msgr(this) { // for local dmsg delivery dispatch_queue.local_pipe = new Pipe(this, Pipe::STATE_OPEN); } @@ -570,7 +570,7 @@ public: int bind(entity_addr_t bind_addr, int64_t nonce); int bind(uint64_t nonce) { - return bind(g_conf->public_addr, nonce); + return bind(cct->_conf->public_addr, nonce); } int start_with_nonce(uint64_t nonce); // if we didn't bind int start() { // if we did -- 2.39.5