From aac1a3edd4085e5ab53b93fdc8ecdcaa5ba8ccc9 Mon Sep 17 00:00:00 2001 From: Kefu Chai Date: Thu, 8 Dec 2016 18:40:24 +0800 Subject: [PATCH] msg/async: set nonce before starting the workers otherwise workers will respond with difference nonces to peers. and remove nonce from Processor. as there is only one nonce for each Messenger at a given time. Signed-off-by: Kefu Chai --- src/msg/async/AsyncMessenger.cc | 103 ++++++++++++++++---------------- src/msg/async/AsyncMessenger.h | 8 ++- 2 files changed, 58 insertions(+), 53 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 360da05419767..f1cb9f776c4df 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -52,10 +52,12 @@ class Processor::C_processor_accept : public EventCallback { }; Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n) - : msgr(r), net(c), worker(w), nonce(n), + : msgr(r), net(c), worker(w), listen_handler(new C_processor_accept(this)) {} -int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) +int Processor::bind(const entity_addr_t &bind_addr, + const set& avoid_ports, + entity_addr_t* bound_addr) { const md_config_t *conf = msgr->cct->_conf; // bind to a socket @@ -137,42 +139,10 @@ int Processor::bind(const entity_addr_t &bind_addr, const set& avoid_ports) } ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl; - - msgr->set_myaddr(bind_addr); - if (bind_addr != entity_addr_t()) - msgr->learned_addr(bind_addr); - - if (msgr->get_myaddr().get_port() == 0) { - msgr->set_myaddr(listen_addr); - } - entity_addr_t addr = msgr->get_myaddr(); - addr.nonce = nonce; - msgr->set_myaddr(addr); - - msgr->init_local_connection(); - - ldout(msgr->cct,1) << __func__ << " bind my_inst.addr is " << msgr->get_myaddr() << dendl; + *bound_addr = listen_addr; return 0; } -int Processor::rebind(const set& avoid_ports) -{ - ldout(msgr->cct, 1) << __func__ << " rebind avoid " << avoid_ports << dendl; - - entity_addr_t addr = msgr->get_myaddr(); - set new_avoid = avoid_ports; - new_avoid.insert(addr.get_port()); - addr.set_port(0); - - // adjust the nonce; we want our entity_addr_t to be truly unique. - nonce += 1000000; - msgr->my_inst.addr.nonce = nonce; - ldout(msgr->cct, 10) << __func__ << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl; - - ldout(msgr->cct, 10) << __func__ << " will try " << addr << " and avoid ports " << new_avoid << dendl; - return bind(addr, new_avoid); -} - void Processor::start() { ldout(msgr->cct, 1) << __func__ << dendl; @@ -346,11 +316,11 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr) // bind to a socket set avoid_ports; - int r = 0; + entity_addr_t bound_addr; unsigned i = 0; for (auto &&p : processors) { - r = p->bind(bind_addr, avoid_ports); - if (r < 0) { + int r = p->bind(bind_addr, avoid_ports, &bound_addr); + if (r) { // Note: this is related to local tcp listen table problem. // Posix(default kernel implementation) backend shares listen table // in the kernel, so all threads can use the same listen table naturally @@ -361,13 +331,12 @@ int AsyncMessenger::bind(const entity_addr_t &bind_addr) // but the second worker failed, it's not expected and we need to assert // here assert(i == 0); - break; + return r; } ++i; } - if (r >= 0) - did_bind = true; - return r; + _finish_bind(bind_addr, bound_addr); + return 0; } int AsyncMessenger::rebind(const set& avoid_ports) @@ -378,19 +347,53 @@ int AsyncMessenger::rebind(const set& avoid_ports) for (auto &&p : processors) p->stop(); mark_down_all(); + + // adjust the nonce; we want our entity_addr_t to be truly unique. + nonce += 1000000; + ldout(cct, 10) << __func__ << " new nonce " << nonce + << " and inst " << get_myinst() << dendl; + + entity_addr_t bound_addr; + entity_addr_t bind_addr = get_myaddr(); + bind_addr.set_port(0); + set new_avoid(avoid_ports); + new_avoid.insert(bind_addr.get_port()); + ldout(cct, 10) << __func__ << " will try " << bind_addr + << " and avoid ports " << new_avoid << dendl; unsigned i = 0; - int r = 0; for (auto &&p : processors) { - r = p->rebind(avoid_ports); - if (r == 0) { - p->start(); - } else { + int r = p->bind(bind_addr, avoid_ports, &bound_addr); + if (r) { assert(i == 0); - break; + return r; } - i++; + ++i; + } + _finish_bind(bind_addr, bound_addr); + for (auto &&p : processors) { + p->start(); + } + return 0; +} + +void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr, + const entity_addr_t& listen_addr) +{ + set_myaddr(bind_addr); + if (bind_addr != entity_addr_t()) + learned_addr(bind_addr); + + if (get_myaddr().get_port() == 0) { + set_myaddr(listen_addr); } - return r; + entity_addr_t addr = get_myaddr(); + addr.set_nonce(nonce); + set_myaddr(addr); + + init_local_connection(); + + ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl; + did_bind = true; } int AsyncMessenger::start() diff --git a/src/msg/async/AsyncMessenger.h b/src/msg/async/AsyncMessenger.h index 308d3966b4d76..d3bb2090f74df 100644 --- a/src/msg/async/AsyncMessenger.h +++ b/src/msg/async/AsyncMessenger.h @@ -48,7 +48,6 @@ class Processor { NetHandler net; Worker *worker; ServerSocket listen_socket; - uint64_t nonce; EventCallbackRef listen_handler; class C_processor_accept; @@ -58,8 +57,9 @@ class Processor { ~Processor() { delete listen_handler; }; void stop(); - int bind(const entity_addr_t &bind_addr, const set& avoid_ports); - int rebind(const set& avoid_port); + int bind(const entity_addr_t &bind_addr, + const set& avoid_ports, + entity_addr_t* bound_addr); void start(); void accept(); }; @@ -210,6 +210,8 @@ private: const entity_addr_t& dest_addr, int dest_type); int _send_message(Message *m, const entity_inst_t& dest); + void _finish_bind(const entity_addr_t& bind_addr, + const entity_addr_t& listen_addr); private: static const uint64_t ReapDeadConnectionThreshold = 5; -- 2.39.5