};
Processor::Processor(AsyncMessenger *r, Worker *w, CephContext *c, uint64_t n)
- : msgr(r), net(c), worker(w), nonce(n),
+ : msgr(r), net(c), worker(w),
listen_handler(new C_processor_accept(this)) {}
-int Processor::bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports)
+int Processor::bind(const entity_addr_t &bind_addr,
+ const set<int>& avoid_ports,
+ entity_addr_t* bound_addr)
{
const md_config_t *conf = msgr->cct->_conf;
// bind to a socket
}
ldout(msgr->cct, 10) << __func__ << " bound to " << listen_addr << dendl;
-
- msgr->set_myaddr(bind_addr);
- if (bind_addr != entity_addr_t())
- msgr->learned_addr(bind_addr);
-
- if (msgr->get_myaddr().get_port() == 0) {
- msgr->set_myaddr(listen_addr);
- }
- entity_addr_t addr = msgr->get_myaddr();
- addr.nonce = nonce;
- msgr->set_myaddr(addr);
-
- msgr->init_local_connection();
-
- ldout(msgr->cct,1) << __func__ << " bind my_inst.addr is " << msgr->get_myaddr() << dendl;
+ *bound_addr = listen_addr;
return 0;
}
-int Processor::rebind(const set<int>& avoid_ports)
-{
- ldout(msgr->cct, 1) << __func__ << " rebind avoid " << avoid_ports << dendl;
-
- entity_addr_t addr = msgr->get_myaddr();
- set<int> new_avoid = avoid_ports;
- new_avoid.insert(addr.get_port());
- addr.set_port(0);
-
- // adjust the nonce; we want our entity_addr_t to be truly unique.
- nonce += 1000000;
- msgr->my_inst.addr.nonce = nonce;
- ldout(msgr->cct, 10) << __func__ << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl;
-
- ldout(msgr->cct, 10) << __func__ << " will try " << addr << " and avoid ports " << new_avoid << dendl;
- return bind(addr, new_avoid);
-}
-
void Processor::start()
{
ldout(msgr->cct, 1) << __func__ << dendl;
// bind to a socket
set<int> avoid_ports;
- int r = 0;
+ entity_addr_t bound_addr;
unsigned i = 0;
for (auto &&p : processors) {
- r = p->bind(bind_addr, avoid_ports);
- if (r < 0) {
+ int r = p->bind(bind_addr, avoid_ports, &bound_addr);
+ if (r) {
// Note: this is related to local tcp listen table problem.
// Posix(default kernel implementation) backend shares listen table
// in the kernel, so all threads can use the same listen table naturally
// but the second worker failed, it's not expected and we need to assert
// here
assert(i == 0);
- break;
+ return r;
}
++i;
}
- if (r >= 0)
- did_bind = true;
- return r;
+ _finish_bind(bind_addr, bound_addr);
+ return 0;
}
int AsyncMessenger::rebind(const set<int>& avoid_ports)
for (auto &&p : processors)
p->stop();
mark_down_all();
+
+ // adjust the nonce; we want our entity_addr_t to be truly unique.
+ nonce += 1000000;
+ ldout(cct, 10) << __func__ << " new nonce " << nonce
+ << " and inst " << get_myinst() << dendl;
+
+ entity_addr_t bound_addr;
+ entity_addr_t bind_addr = get_myaddr();
+ bind_addr.set_port(0);
+ set<int> new_avoid(avoid_ports);
+ new_avoid.insert(bind_addr.get_port());
+ ldout(cct, 10) << __func__ << " will try " << bind_addr
+ << " and avoid ports " << new_avoid << dendl;
unsigned i = 0;
- int r = 0;
for (auto &&p : processors) {
- r = p->rebind(avoid_ports);
- if (r == 0) {
- p->start();
- } else {
+ int r = p->bind(bind_addr, avoid_ports, &bound_addr);
+ if (r) {
assert(i == 0);
- break;
+ return r;
}
- i++;
+ ++i;
+ }
+ _finish_bind(bind_addr, bound_addr);
+ for (auto &&p : processors) {
+ p->start();
+ }
+ return 0;
+}
+
+void AsyncMessenger::_finish_bind(const entity_addr_t& bind_addr,
+ const entity_addr_t& listen_addr)
+{
+ set_myaddr(bind_addr);
+ if (bind_addr != entity_addr_t())
+ learned_addr(bind_addr);
+
+ if (get_myaddr().get_port() == 0) {
+ set_myaddr(listen_addr);
}
- return r;
+ entity_addr_t addr = get_myaddr();
+ addr.set_nonce(nonce);
+ set_myaddr(addr);
+
+ init_local_connection();
+
+ ldout(cct,1) << __func__ << " bind my_inst.addr is " << get_myaddr() << dendl;
+ did_bind = true;
}
int AsyncMessenger::start()
NetHandler net;
Worker *worker;
ServerSocket listen_socket;
- uint64_t nonce;
EventCallbackRef listen_handler;
class C_processor_accept;
~Processor() { delete listen_handler; };
void stop();
- int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
- int rebind(const set<int>& avoid_port);
+ int bind(const entity_addr_t &bind_addr,
+ const set<int>& avoid_ports,
+ entity_addr_t* bound_addr);
void start();
void accept();
};
const entity_addr_t& dest_addr, int dest_type);
int _send_message(Message *m, const entity_inst_t& dest);
+ void _finish_bind(const entity_addr_t& bind_addr,
+ const entity_addr_t& listen_addr);
private:
static const uint64_t ReapDeadConnectionThreshold = 5;