#include <errno.h>
#include <iostream>
#include <fstream>
-#include <poll.h>
#ifdef HAVE_SCHED
#include <sched.h>
#endif
}
-class C_handle_accept : public EventCallback {
+class C_conn_accept : public EventCallback {
AsyncConnectionRef conn;
int fd;
public:
- C_handle_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {}
+ C_conn_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {}
void do_request(int id) {
conn->accept(fd);
}
};
-class C_handle_connect : public EventCallback {
- AsyncConnectionRef conn;
- const entity_addr_t addr;
- int type;
+class C_processor_accept : public EventCallback {
+ Processor *pro;
+
+ public:
+ C_processor_accept(Processor *p): pro(p) {}
+ void do_request(int id) {
+ pro->accept();
+ }
+};
+
+
+class C_processor_start : public EventCallback {
+ Processor *pro;
+ Worker *worker;
+ public:
+ C_processor_start(Processor *p, Worker *w): pro(p), worker(w) {}
+ void do_request(int id) {
+ pro->start(worker);
+ }
+};
+
+
+class C_processor_stop : public EventCallback {
+ Processor *pro;
public:
- C_handle_connect(AsyncConnectionRef c, const entity_addr_t &d, int t)
- :conn(c), addr(d), type(t) {}
+ C_processor_stop(Processor *p): pro(p) {}
void do_request(int id) {
- conn->connect(addr, type);
+ pro->stop_cb();
}
};
ldout(msgr->cct, 10) << __func__ << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl;
ldout(msgr->cct, 10) << __func__ << " will try " << addr << " and avoid ports " << new_avoid << dendl;
- int r = bind(addr, new_avoid);
- if (r == 0)
- start();
- return r;
+ return bind(addr, new_avoid);
}
-int Processor::start()
+int Processor::start(Worker *w)
{
- ldout(msgr->cct, 1) << __func__ << " start" << dendl;
+ ldout(msgr->cct, 1) << __func__ << " " << dendl;
// start thread
- if (listen_sd > 0)
- create();
+ if (listen_sd > 0) {
+ assert(w->center.get_owner() == pthread_self());
+ worker = w;
+ w->center.create_file_event(listen_sd, EVENT_READABLE,
+ EventCallbackRef(new C_processor_accept(this)));
+ }
return 0;
}
-void *Processor::entry()
+void Processor::accept()
{
- ldout(msgr->cct, 10) << __func__ << " starting" << dendl;
+ ldout(msgr->cct, 10) << __func__ << " listen_sd=" << listen_sd << dendl;
int errors = 0;
-
- struct pollfd pfd;
- pfd.fd = listen_sd;
- pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
- while (!done) {
- ldout(msgr->cct, 20) << __func__ << " calling poll" << dendl;
- int r = poll(&pfd, 1, -1);
- if (r < 0)
- break;
- ldout(msgr->cct,20) << __func__ << " poll got " << r << dendl;
-
- if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
- break;
-
- ldout(msgr->cct,10) << __func__ << " pfd.revents=" << pfd.revents << dendl;
- if (done) break;
-
- // accept
+ while (errors < 4) {
entity_addr_t addr;
socklen_t slen = sizeof(addr.ss_addr());
int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
if (sd >= 0) {
errors = 0;
- ldout(msgr->cct,10) << __func__ << "accepted incoming on sd " << sd << dendl;
+ ldout(msgr->cct, 10) << __func__ << " accepted incoming on sd " << sd << dendl;
msgr->add_accept(sd);
+ break;
} else {
- ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd
- << " errno " << errno << " " << cpp_strerror(errno) << dendl;
- if (++errors > 4)
- break;
+ ldout(msgr->cct, 0) << __func__ << " no incoming connection? sd = " << sd
+ << " errno " << errno << " " << cpp_strerror(errno) << dendl;
}
}
+}
+
+void Processor::stop_cb()
+{
+ ldout(msgr->cct,10) << __func__ << dendl;
- ldout(msgr->cct,20) << __func__ << " closing" << dendl;
- // don't close socket, in case we start up again? blech.
+ Mutex::Locker l(stop_lock);
if (listen_sd >= 0) {
+ worker->center.delete_file_event(listen_sd, EVENT_READABLE);
+ ::shutdown(listen_sd, SHUT_RDWR);
::close(listen_sd);
listen_sd = -1;
}
- ldout(msgr->cct,10) << __func__ << " stopping" << dendl;
- return 0;
+ worker = NULL;
+ stop_cond.Signal();
}
void Processor::stop()
{
- done = true;
ldout(msgr->cct,10) << __func__ << dendl;
if (listen_sd >= 0) {
- ::shutdown(listen_sd, SHUT_RDWR);
- }
-
- // wait for thread to stop before closing the socket, to avoid
- // racing against fd re-use.
- if (is_started()) {
- join();
+ assert(worker && worker->center.get_owner() != pthread_self());
+ Mutex::Locker l(stop_lock);
+ worker->center.dispatch_event_external(EventCallbackRef(new C_processor_stop(this)));
+ stop_cond.Wait(stop_lock);
+ assert(listen_sd == -1);
+ assert(worker == NULL);
}
-
- if (listen_sd >= 0) {
- ::close(listen_sd);
- listen_sd = -1;
- }
- done = false;
}
void Worker::stop()
{
ldout(cct,10) << __func__ << " " << get_myaddr() << dendl;
- lock.Lock();
- processor.start();
- lock.Unlock();
+ Mutex::Locker l(lock);
+ Worker *w = pool->get_worker();
+ w->center.dispatch_event_external(
+ EventCallbackRef(new C_processor_start(&processor, w)));
}
int AsyncMessenger::shutdown()
processor.stop();
mark_down_all();
- return processor.rebind(avoid_ports);
+ int r = processor.rebind(avoid_ports);
+ if (r == 0) {
+ Worker *w = pool->get_worker();
+ w->center.dispatch_event_external(
+ EventCallbackRef(new C_processor_start(&processor, w)));
+ }
+ return r;
}
int AsyncMessenger::start()
lock.Lock();
Worker *w = pool->get_worker();
AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
- w->center.dispatch_event_external(EventCallbackRef(new C_handle_accept(conn, sd)));
+ w->center.dispatch_event_external(EventCallbackRef(new C_conn_accept(conn, sd)));
accepting_conns.insert(conn);
lock.Unlock();
return conn;
class AsyncMessenger;
-
-/**
- * If the Messenger binds to a specific address, the Processor runs
- * and listens for incoming connections.
- */
-class Processor : public Thread {
- AsyncMessenger *msgr;
- bool done;
- int listen_sd;
- uint64_t nonce;
-
- public:
- Processor(AsyncMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n) {}
-
- void *entry();
- void stop();
- int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
- int rebind(const set<int>& avoid_port);
- int start();
- void accept();
-};
-
class WorkerPool;
class Worker : public Thread {
void stop();
};
+/**
+ * If the Messenger binds to a specific address, the Processor runs
+ * and listens for incoming connections.
+ */
+class Processor {
+ AsyncMessenger *msgr;
+ Worker *worker;
+ int listen_sd;
+ uint64_t nonce;
+ Mutex stop_lock;
+ Cond stop_cond;
+
+ public:
+ Processor(AsyncMessenger *r, uint64_t n)
+ : msgr(r), worker(NULL), listen_sd(-1), nonce(n),
+ stop_lock("AsyncMessenger::Processor::stop_lock") {}
+
+ void stop_cb();
+ void stop();
+ int bind(const entity_addr_t &bind_addr, const set<int>& avoid_ports);
+ int rebind(const set<int>& avoid_port);
+ int start(Worker *w);
+ void accept();
+};
+
class WorkerPool: CephContext::AssociatedSingletonObject {
WorkerPool(const WorkerPool &);
WorkerPool& operator=(const WorkerPool &);