// start thread
worker->center.submit_to(worker->center.get_id(), [this]() {
- for (auto& l : listen_sockets) {
- if (l) {
- if (l.fd() == -1) {
+ for (auto& listen_socket : listen_sockets) {
+ if (listen_socket) {
+ if (listen_socket.fd() == -1) {
ldout(msgr->cct, 1) << __func__ << " Erro: processor restart after listen_socket.fd closed. " << this << dendl;
return;
}
- worker->center.create_file_event(l.fd(), EVENT_READABLE,
+ worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE,
listen_handler); }
}
}, false);
<< " time_id=" << time_event_next_id << ").";
}
-int EventCenter::init(int n, unsigned i, const std::string &t)
+int EventCenter::init(int nevent, unsigned center_id, const std::string &type)
{
// can't init multi times
- ceph_assert(nevent == 0);
+ ceph_assert(this->nevent == 0);
- type = t;
- idx = i;
+ this->type = type;
+ this->center_id = center_id;
- if (t == "dpdk") {
+ if (type == "dpdk") {
#ifdef HAVE_DPDK
driver = new DPDKDriver(cct);
#endif
return -1;
}
- int r = driver->init(this, n);
+ int r = driver->init(this, nevent);
if (r < 0) {
lderr(cct) << __func__ << " failed to init event driver." << dendl;
return r;
}
- file_events.resize(n);
- nevent = n;
+ file_events.resize(nevent);
+ this->nevent = nevent;
if (!driver->need_wakeup())
return 0;
void EventCenter::set_owner()
{
owner = pthread_self();
- ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl;
+ ldout(cct, 2) << __func__ << " center_id=" << center_id << " owner=" << owner << dendl;
if (!global_centers) {
global_centers = &cct->lookup_or_create_singleton_object<
EventCenter::AssociatedCenters>(
"AsyncMessenger::EventCenter::global_center::" + type, true);
ceph_assert(global_centers);
- global_centers->centers[idx] = this;
+ global_centers->centers[center_id] = this;
if (driver->need_wakeup()) {
notify_handler = new C_handle_notify(this, cct);
int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler);
vector<FiredFileEvent> fired_events;
numevents = driver->event_wait(fired_events, &tv);
auto working_start = ceph::mono_clock::now();
- for (int j = 0; j < numevents; j++) {
+ for (int event_id = 0; event_id < numevents; event_id++) {
int rfired = 0;
FileEvent *event;
EventCallbackRef cb;
- event = _get_file_event(fired_events[j].fd);
+ event = _get_file_event(fired_events[event_id].fd);
/* note the event->mask & mask & ... code: maybe an already processed
* event removed an element that fired and we still didn't
* processed, so we check if the event is still valid. */
- if (event->mask & fired_events[j].mask & EVENT_READABLE) {
+ if (event->mask & fired_events[event_id].mask & EVENT_READABLE) {
rfired = 1;
cb = event->read_cb;
- cb->do_request(fired_events[j].fd);
+ cb->do_request(fired_events[event_id].fd);
}
- if (event->mask & fired_events[j].mask & EVENT_WRITABLE) {
+ if (event->mask & fired_events[event_id].mask & EVENT_WRITABLE) {
if (!rfired || event->read_cb != event->write_cb) {
cb = event->write_cb;
- cb->do_request(fired_events[j].fd);
+ cb->do_request(fired_events[event_id].fd);
}
}
- ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl;
+ ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[event_id].fd
+ << " mask is " << fired_events[event_id].mask << dendl;
}
if (trigger_time)
int notify_send_fd;
NetHandler net;
EventCallbackRef notify_handler;
- unsigned idx;
+ unsigned center_id;
AssociatedCenters *global_centers = nullptr;
int process_time_events();
external_num_events(0),
driver(NULL), time_event_next_id(1),
notify_receive_fd(-1), notify_send_fd(-1), net(c),
- notify_handler(NULL), idx(0) { }
+ notify_handler(NULL), center_id(0) { }
~EventCenter();
ostream& _event_prefix(std::ostream *_dout);
- int init(int nevent, unsigned idx, const std::string &t);
+ int init(int nevent, unsigned center_id, const std::string &type);
void set_owner();
pthread_t get_owner() const { return owner; }
- unsigned get_id() const { return idx; }
+ unsigned get_id() const { return center_id; }
EventDriver *get_driver() { return driver; }
func f;
bool nonwait;
public:
- C_submit_event(func &&_f, bool nw)
- : f(std::move(_f)), nonwait(nw) {}
+ C_submit_event(func &&_f, bool nowait)
+ : f(std::move(_f)), nonwait(nowait) {}
void do_request(uint64_t id) override {
f();
lock.lock();
return -e;
}
- size = nevent;
+ this->nevent = nevent;
return 0;
}
{
int retval, numevents = 0;
- retval = epoll_wait(epfd, events, size,
+ retval = epoll_wait(epfd, events, nevent,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
- int j;
-
numevents = retval;
fired_events.resize(numevents);
- for (j = 0; j < numevents; j++) {
+
+ for (int event_id = 0; event_id < numevents; event_id++) {
int mask = 0;
- struct epoll_event *e = events + j;
+ struct epoll_event *e = &events[event_id];
if (e->events & EPOLLIN) mask |= EVENT_READABLE;
if (e->events & EPOLLOUT) mask |= EVENT_WRITABLE;
if (e->events & EPOLLERR) mask |= EVENT_READABLE|EVENT_WRITABLE;
if (e->events & EPOLLHUP) mask |= EVENT_READABLE|EVENT_WRITABLE;
- fired_events[j].fd = e->data.fd;
- fired_events[j].mask = mask;
+ fired_events[event_id].fd = e->data.fd;
+ fired_events[event_id].mask = mask;
}
}
return numevents;
int epfd;
struct epoll_event *events;
CephContext *cct;
- int size;
+ int nevent;
public:
- explicit EpollDriver(CephContext *c): epfd(-1), events(NULL), cct(c), size(0) {}
+ explicit EpollDriver(CephContext *c): epfd(-1), events(NULL), cct(c), nevent(0) {}
~EpollDriver() override {
if (epfd != -1)
close(epfd);
#undef dout_prefix
#define dout_prefix *_dout << "stack "
-std::function<void ()> NetworkStack::add_thread(unsigned i)
+std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
{
- Worker *w = workers[i];
+ Worker *w = workers[worker_id];
return [this, w]() {
char tp_name[16];
sprintf(tp_name, "msgr-worker-%u", w->id);
return nullptr;
}
-Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i)
+Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned worker_id)
{
if (type == "posix")
- return new PosixWorker(c, i);
+ return new PosixWorker(c, worker_id);
#ifdef HAVE_RDMA
else if (type == "rdma")
- return new RDMAWorker(c, i);
+ return new RDMAWorker(c, worker_id);
#endif
#ifdef HAVE_DPDK
else if (type == "dpdk")
- return new DPDKWorker(c, i);
+ return new DPDKWorker(c, worker_id);
#endif
lderr(c) << __func__ << " ms_async_transport_type " << type <<
num_workers = EventCenter::MAX_EVENTCENTER;
}
- for (unsigned i = 0; i < num_workers; ++i) {
- Worker *w = create_worker(cct, type, i);
- w->center.init(InitEventNumber, i, type);
+ for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) {
+ Worker *w = create_worker(cct, type, worker_id);
+ w->center.init(InitEventNumber, worker_id, type);
workers.push_back(w);
}
}
Worker(const Worker&) = delete;
Worker& operator=(const Worker&) = delete;
- Worker(CephContext *c, unsigned i)
- : cct(c), perf_logger(NULL), id(i), references(0), center(c) {
+ Worker(CephContext *c, unsigned worker_id)
+ : cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) {
char name[128];
sprintf(name, "AsyncMessenger::Worker-%u", id);
// initialize perf_logger
void start();
void stop();
virtual Worker *get_worker();
- Worker *get_worker(unsigned i) {
- return workers[i];
+ Worker *get_worker(unsigned worker_id) {
+ return workers[worker_id];
}
void drain();
unsigned get_num_worker() const {
polled.clear();
}
-RDMAWorker::RDMAWorker(CephContext *c, unsigned i)
- : Worker(c, i),
+RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id)
+ : Worker(c, worker_id),
tx_handler(new C_handle_cq_tx(this))
{
// initialize perf_logger