}
}
-AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m)
+AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c)
: Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0),
state(STATE_NONE), state_after_send(0), sd(-1),
lock("AsyncConnection::lock"), open_write(false),
got_bad_auth(false), authorizer(NULL),
- state_buffer(4096), state_offset(0), net(cct), center(&m->center)
+ state_buffer(4096), state_offset(0), net(cct), center(c)
{
read_handler.reset(new C_handle_read(this));
write_handler.reset(new C_handle_write(this));
state = STATE_ACCEPTING;
center->create_file_event(sd, EVENT_READABLE, read_handler);
// rescheduler connection in order to avoid lock dep
- center->create_time_event(0, read_handler);
+ process();
}
int AsyncConnection::send_message(Message *m)
}
public:
- AsyncConnection(CephContext *cct, AsyncMessenger *m);
+ AsyncConnection(CephContext *cct, AsyncMessenger *m, EventCenter *c);
~AsyncConnection();
ostream& _conn_prefix(std::ostream *_dout);
#include <errno.h>
#include <iostream>
#include <fstream>
-
+#include <poll.h>
#include "AsyncMessenger.h"
}
static ostream& _prefix(std::ostream *_dout, Processor *p) {
- return *_dout << "-- ";
+ return *_dout << "-- Processor";
}
-/*******************
- * EventCallBack
- */
+static ostream& _prefix(std::ostream *_dout, Worker *w) {
+ return *_dout << "--";
+}
class C_handle_accept : public EventCallback {
- Processor *p;
+ AsyncConnectionRef conn;
+ int fd;
+
+ public:
+ C_handle_accept(AsyncConnectionRef c, int s): conn(c), fd(s) {}
+ void do_request(int id) {
+ conn->accept(fd);
+ }
+};
+
+class C_handle_connect : public EventCallback {
+ AsyncConnectionRef conn;
+ const entity_addr_t addr;
+ int type;
public:
- C_handle_accept(Processor *p): p(p) {}
- void do_request(int fd) {
- p->accept();
+ C_handle_connect(AsyncConnectionRef c, const entity_addr_t &d, int t)
+ :conn(c), addr(d), type(t) {}
+ void do_request(int id) {
+ conn->connect(addr, type);
}
};
// bind to a socket
ldout(msgr->cct, 10) << __func__ << dendl;
- int family, flags;
+ int family;
switch (bind_addr.get_family()) {
case AF_INET:
case AF_INET6:
return rc;
}
- if ((flags = fcntl(listen_sd, F_GETFL, 0)) < 0 ||
- fcntl(listen_sd, F_SETFL, flags | O_NONBLOCK) < 0) {
- rc = -errno;
- lderr(msgr->cct) << __func__ << " unable to setnonblock on " << listen_addr
- << ": " << cpp_strerror(rc) << dendl;
- return rc;
- }
-
msgr->set_myaddr(bind_addr);
if (bind_addr != entity_addr_t())
msgr->learned_addr(bind_addr);
// start thread
create();
- if (listen_sd >= 0)
- center->create_file_event(listen_sd, EVENT_READABLE, EventCallbackRef(new C_handle_accept(this)));
return 0;
}
-void Processor::accept()
-{
- ldout(msgr->cct, 10) << __func__ << " starting" << dendl;
- // accept
- entity_addr_t addr;
- socklen_t slen = sizeof(addr.ss_addr());
- int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
- if (sd >= 0) {
- ldout(msgr->cct,10) << __func__ << " incoming on sd " << sd << dendl;
-
- msgr->add_accept(sd);
- } else {
- ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd
- << " errno " << errno << " " << cpp_strerror(errno) << dendl;
- }
-}
-
void *Processor::entry()
{
ldout(msgr->cct, 10) << __func__ << " starting" << dendl;
- int r;
+ int errors = 0;
+ struct pollfd pfd;
+ pfd.fd = listen_sd;
+ pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
while (!done) {
- ldout(msgr->cct,20) << __func__ << " calling poll" << dendl;
-
- r = center->process_events(30000);
- if (r < 0) {
- ldout(msgr->cct,20) << __func__ << " process events failed: "
- << cpp_strerror(errno) << dendl;
- // TODO do something?
+ ldout(msgr->cct, 20) << __func__ << " calling poll" << dendl;
+ int r = poll(&pfd, 1, -1);
+ if (r < 0)
+ break;
+ ldout(msgr->cct,20) << __func__ << " poll got " << r << dendl;
+
+ if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
+ break;
+
+ ldout(msgr->cct,10) << __func__ << " pfd.revents=" << pfd.revents << dendl;
+ if (done) break;
+
+ // accept
+ entity_addr_t addr;
+ socklen_t slen = sizeof(addr.ss_addr());
+ int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
+ if (sd >= 0) {
+ errors = 0;
+ ldout(msgr->cct,10) << __func__ << "accepted incoming on sd " << sd << dendl;
+
+ msgr->add_accept(sd);
+ } else {
+ ldout(msgr->cct,0) << __func__ << " no incoming connection? sd = " << sd
+ << " errno " << errno << " " << cpp_strerror(errno) << dendl;
+ if (++errors > 4)
+ break;
}
}
void Processor::stop()
{
done = true;
- ldout(msgr->cct, 10) << __func__ << " processor" << dendl;
+ ldout(msgr->cct,10) << __func__ << dendl;
- if (listen_sd >= 0)
- center->delete_file_event(listen_sd, EVENT_READABLE);
if (listen_sd >= 0) {
::shutdown(listen_sd, SHUT_RDWR);
}
- center->stop();
// wait for thread to stop before closing the socket, to avoid
// racing against fd re-use.
done = false;
}
+void Worker::stop()
+{
+ ldout(msgr->cct, 10) << __func__ << dendl;
+ done = true;
+ center.wakeup();
+}
+
+void *Worker::entry()
+{
+ ldout(msgr->cct, 10) << __func__ << " starting" << dendl;
+ int r;
+
+ while (!done) {
+ ldout(msgr->cct, 20) << __func__ << " calling event process" << dendl;
+
+ r = center.process_events(30000);
+ if (r < 0) {
+ ldout(msgr->cct,20) << __func__ << " process events failed: "
+ << cpp_strerror(errno) << dendl;
+ // TODO do something?
+ }
+ }
+
+ return 0;
+}
/*******************
* AsyncMessenger
AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name,
string mname, uint64_t _nonce)
: SimplePolicyMessenger(cct, name,mname, _nonce),
- processor(this, _nonce, ¢er),
+ conn_id(0),
+ processor(this, _nonce),
lock("AsyncMessenger::lock"),
nonce(_nonce), did_bind(false),
global_seq(0),
- cluster_protocol(0), stopped(true),
- local_connection(new AsyncConnection(cct, this)),
- center(cct)
+ cluster_protocol(0), stopped(true)
{
ceph_spin_init(&global_seq_lock);
+ for (int i = 0; i < cct->_conf->ms_event_op_threads; ++i) {
+ Worker *w = new Worker(this, cct);
+ workers.push_back(w);
+ }
+ local_connection = new AsyncConnection(cct, this, &workers[0]->center);
init_local_connection();
- center.init(5000);
}
/**
int AsyncMessenger::shutdown()
{
ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl;
- center.stop();
+ for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it)
+ (*it)->stop();
mark_down_all();
// break ref cycles on the loopback connection
+ processor.stop();
local_connection->set_priv(NULL);
stop_cond.Signal();
stopped = true;
{
ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl;
assert(did_bind);
- center.stop();
+ for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it) {
+ (*it)->stop();
+ (*it)->join();
+ }
+
processor.stop();
mark_down_all();
return processor.rebind(avoid_ports);
_init_local_connection();
}
- lock.Unlock();
+ for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it)
+ (*it)->create();
- center.start();
+ lock.Unlock();
return 0;
}
}
if (!stopped)
stop_cond.Wait(lock);
+
+ for (vector<Worker*>::iterator it = workers.begin(); it != workers.end(); ++it)
+ (*it)->join();
lock.Unlock();
// done! clean up.
AsyncConnectionRef AsyncMessenger::add_accept(int sd)
{
lock.Lock();
- AsyncConnectionRef conn = new AsyncConnection(cct, this);
- conn->accept(sd);
+ Worker *w = workers[conn_id % workers.size()];
+ AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
+ w->center.dispatch_event_external(EventCallbackRef(new C_handle_accept(conn, sd)));
accepting_conns.insert(conn);
+ conn_id++;
lock.Unlock();
return conn;
}
<< ", creating connection and registering" << dendl;
// create connection
- AsyncConnectionRef conn = new AsyncConnection(cct, this);
- conn->connect(addr, type);
+ Worker *w = workers[conn_id % workers.size()];
+ AsyncConnectionRef conn = new AsyncConnection(cct, this, &w->center);
+ w->center.dispatch_event_external(EventCallbackRef(new C_handle_connect(conn, addr, type)));
assert(!conns.count(addr));
conns[addr] = conn;
+ conn_id++;
return conn;
}
bool done;
int listen_sd;
uint64_t nonce;
- EventCenter *center;
public:
- Processor(AsyncMessenger *r, uint64_t n, EventCenter *c) : msgr(r), done(false), listen_sd(-1), nonce(n), center(c) {}
+ Processor(AsyncMessenger *r, uint64_t n) : msgr(r), done(false), listen_sd(-1), nonce(n) {}
void *entry();
void stop();
void accept();
};
+class Worker : public Thread {
+ AsyncMessenger *msgr;
+ bool done;
+
+ public:
+ EventCenter center;
+ Worker(AsyncMessenger *m, CephContext *c): msgr(m), done(false), center(c) {
+ center.init(5000);
+ }
+ void *entry();
+ void stop();
+};
+
/*
* This class handles transmission and reception of messages. Generally
*/
Connection *create_anon_connection() {
- return new AsyncConnection(cct, this);
+ Mutex::Locker l(lock);
+ Worker *w = workers[conn_id % workers.size()];
+ conn_id++;
+ return new AsyncConnection(cct, this, &w->center);
}
/**
int _send_message(Message *m, const entity_inst_t& dest);
private:
+ vector<Worker*> workers;
+ int conn_id;
+
Processor processor;
friend class Processor;
*
* These are not yet in the conns map.
*/
+ // FIXME clear up
set<AsyncConnectionRef> accepting_conns;
/// internal cluster protocol version, if any, for talking to entities of the same type.
ceph::unordered_map<entity_addr_t, AsyncConnectionRef>::iterator p = conns.find(k);
if (p == conns.end())
return NULL;
- if (!p->second->is_connected()) {
- // FIXME
- conns.erase(p);
- return NULL;
- }
return p->second;
}
/// con used for sending messages to ourselves
ConnectionRef local_connection;
- EventCenter center;
-
/**
* @defgroup AsyncMessenger internals
* @{
int EventCenter::create_file_event(int fd, int mask, EventCallbackRef ctxt)
{
int r;
- Mutex::Locker l(lock);
if (file_events.size() > nevent) {
int new_size = nevent << 2;
ldout(cct, 10) << __func__ << " event count exceed " << nevent << ", expand to " << new_size << dendl;
void EventCenter::delete_file_event(int fd, int mask)
{
- Mutex::Locker l(lock);
-
EventCenter::FileEvent *event = _get_file_event(fd);
if (!event)
return ;
uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallbackRef ctxt)
{
- Mutex::Locker l(lock);
uint64_t id = time_event_next_id++;
ldout(cct, 10) << __func__ << " id=" << id << " expire time=" << milliseconds << dendl;
- // Direct dispatch
- if (milliseconds == 0) {
- FiredEvent e;
- e.time_event.id = id;
- e.time_event.time_cb = ctxt;
- e.is_file = false;
- event_wq.queue(e);
- return id;
- }
-
EventCenter::TimeEvent event;
utime_t expire;
struct timeval tv;
time_events[id] = event;
if (expire < next_wake) {
- char buf[1];
- buf[0] = 'c';
- // wake up "event_wait"
- int n = write(notify_send_fd, buf, 1);
- // FIXME ?
- assert(n == 1);
+ wakeup();
}
return id;
}
void EventCenter::delete_time_event(uint64_t id)
{
- Mutex::Locker l(lock);
for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
it != time_to_ids.end(); it++) {
if (it->second == id) {
}
}
-void EventCenter::start()
-{
- ldout(cct, 1) << __func__ << dendl;
- Mutex::Locker l(lock);
- event_tp.start();
- tp_stop = false;
-}
-
-void EventCenter::stop()
+void EventCenter::wakeup()
{
ldout(cct, 1) << __func__ << dendl;
- if (!tp_stop) {
- event_tp.stop();
- tp_stop = true;
- }
char buf[1];
buf[0] = 'c';
// wake up "event_wait"
int EventCenter::process_time_events()
{
- Mutex::Locker l(lock);
int processed = 0;
time_t now = time(NULL);
utime_t cur = ceph_clock_now(cct);
it != time_to_ids.end(); ) {
prev = it;
if (cur >= it->first) {
- FiredEvent e;
- e.time_event.id = it->second;
- e.time_event.time_cb = time_events[it->second].time_cb;
- e.is_file = false;
- event_wq.queue(e);
ldout(cct, 10) << __func__ << " queue time event: id=" << it->second << " time is "
<< it->first << dendl;
+ time_events[it->second].time_cb->do_request(it->first);
processed++;
++it;
time_to_ids.erase(prev);
shortest.set_from_timeval(&tv);
{
- Mutex::Locker l(lock);
for (map<utime_t, uint64_t>::iterator it = time_to_ids.begin();
it != time_to_ids.end(); ++it) {
ldout(cct, 10) << __func__ << " time_to_ids " << it->first << " id=" << it->second << dendl;
vector<FiredFileEvent> fired_events;
numevents = driver->event_wait(fired_events, &tv);
for (int j = 0; j < numevents; j++) {
- FiredEvent e;
- e.file_event = fired_events[j];
- e.is_file = true;
- event_wq.queue(e);
- ldout(cct, 10) << __func__ << " event_wq queue fd is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
+ int rfired = 0;
+ FileEvent *event = _get_file_event(fired_events[j].fd);
+ if (!event)
+ continue;
+
+ /* note the event->mask & mask & ... code: maybe an already processed
+ * event removed an element that fired and we still didn't
+ * processed, so we check if the event is still valid. */
+ if (event->mask & fired_events[j].mask & EVENT_READABLE) {
+ rfired = 1;
+ event->read_cb->do_request(fired_events[j].fd);
+ }
+ event = _get_file_event(fired_events[j].fd);
+ if (!event)
+ continue;
+
+ if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
+ if (!rfired || event->read_cb != event->write_cb)
+ event->write_cb->do_request(fired_events[j].fd);
+ }
+
+ ldout(cct, 20) << __func__ << " event_wq queue fd is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
}
if (trigger_time)
numevents += process_time_events();
+ {
+ Mutex::Locker l(lock);
+ while (!external_events.empty()) {
+ EventCallbackRef e = external_events.front();
+ external_events.pop_front();
+ e->do_request(0);
+ }
+ }
return numevents;
}
+
+void EventCenter::dispatch_event_external(EventCallbackRef e)
+{
+ lock.Lock();
+ external_events.push_back(e);
+ lock.Unlock();
+ wakeup();
+}
int mask;
};
-struct FiredTimeEvent {
- uint64_t id;
- EventCallbackRef time_cb;
-};
-
-struct FiredEvent {
- FiredFileEvent file_event;
- FiredTimeEvent time_event;
- bool is_file;
-
- FiredEvent(): is_file(true) {}
-};
-
class EventDriver {
public:
virtual ~EventDriver() {} // we want a virtual destructor!!!
TimeEvent(): id(0) {}
};
+ CephContext *cct;
+ uint64_t nevent;
+ // Used only to external event
Mutex lock;
+ deque<EventCallbackRef> external_events;
unordered_map<int, FileEvent> file_events;
+ EventDriver *driver;
// The second element is id
map<utime_t, uint64_t> time_to_ids;
// The first element is id
unordered_map<uint64_t, TimeEvent> time_events;
- EventDriver *driver;
- CephContext *cct;
- uint64_t nevent;
uint64_t time_event_next_id;
- ThreadPool event_tp;
time_t last_time; // last time process time event
int notify_receive_fd;
int notify_send_fd;
utime_t next_wake;
- bool tp_stop;
int process_time_events();
FileEvent *_get_file_event(int fd) {
return NULL;
}
- struct EventWQ : public ThreadPool::WorkQueueVal<FiredEvent> {
- EventCenter *center;
- // In order to ensure the file descriptor is unique in conn_queue,
- // pending is introduced to check
- //
- deque<FiredEvent> conn_queue;
- // used only by file event <File Descriptor, Mask>
- unordered_map<int, int> pending;
-
- EventWQ(EventCenter *c, time_t timeout, time_t suicide_timeout, ThreadPool *tp)
- : ThreadPool::WorkQueueVal<FiredEvent>("Event::EventWQ", timeout, suicide_timeout, tp), center(c) {}
-
- void _enqueue(FiredEvent e) {
- if (e.is_file) {
- // Ensure only one thread process one file descriptor
- unordered_map<int, int>::iterator it = pending.find(e.file_event.fd);
- if (it != pending.end()) {
- it->second |= e.file_event.mask;
- } else {
- pending[e.file_event.fd] = e.file_event.mask;
- conn_queue.push_back(e);
- }
- } else {
- conn_queue.push_back(e);
- }
- }
- void _enqueue_front(FiredEvent e) {
- assert(0);
- }
- void _dequeue(FiredEvent c) {
- assert(0);
- }
- bool _empty() {
- return conn_queue.empty();
- }
- FiredEvent _dequeue() {
- assert(!conn_queue.empty());
- FiredEvent e = conn_queue.front();
- conn_queue.pop_front();
- if (e.is_file) {
- e.file_event.mask = pending[e.file_event.fd];
- pending.erase(e.file_event.fd);
- }
- return e;
- }
- void _process(FiredEvent e, ThreadPool::TPHandle &handle) {
- if (e.is_file) {
- int rfired = 0;
- FileEvent *event = center->get_file_event(e.file_event.fd);
- if (!event)
- return ;
-
- /* note the event->mask & mask & ... code: maybe an already processed
- * event removed an element that fired and we still didn't
- * processed, so we check if the event is still valid. */
- if (event->mask & e.file_event.mask & EVENT_READABLE) {
- rfired = 1;
- event->read_cb->do_request(e.file_event.fd);
- }
- if (event->mask & e.file_event.mask & EVENT_WRITABLE) {
- if (!rfired || event->read_cb != event->write_cb)
- event->write_cb->do_request(e.file_event.fd);
- }
- } else {
- e.time_event.time_cb->do_request(e.time_event.id);
- }
- }
- void _clear() {
- }
- } event_wq;
-
public:
EventCenter(CephContext *c):
- lock("EventCenter::lock"), driver(NULL), cct(c), nevent(0), time_event_next_id(0),
- event_tp(c, "EventCenter::event_tp", c->_conf->ms_event_op_threads, "eventcenter_op_threads"),
- notify_receive_fd(-1), notify_send_fd(-1),tp_stop(true),
- event_wq(this, c->_conf->ms_event_thread_timeout, c->_conf->ms_event_thread_suicide_timeout, &event_tp) {
+ cct(c), nevent(0),
+ lock("AsyncMessenger::lock"),
+ driver(NULL), time_event_next_id(0),
+ notify_receive_fd(-1), notify_send_fd(-1) {
last_time = time(NULL);
}
~EventCenter();
int init(int nevent);
+ // Used by internal thread
int create_file_event(int fd, int mask, EventCallbackRef ctxt);
uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt);
void delete_file_event(int fd, int mask);
void delete_time_event(uint64_t id);
int process_events(int timeout_milliseconds);
- void start();
- void stop();
- FileEvent *get_file_event(int fd) {
- Mutex::Locker l(lock);
- return _get_file_event(fd);
- }
+ void wakeup();
+
+ // Used by external thread
+ void dispatch_event_external(EventCallbackRef e);
};
#endif