From 940a5b5ae391894a8aa1ac86b9ded30ba4dff4d9 Mon Sep 17 00:00:00 2001 From: Changcheng Liu Date: Wed, 31 Jul 2019 09:33:51 +0800 Subject: [PATCH] msg/async/Stack: rename variable to improve readability 1. rename var i to be worker_id when creating Worker "i" is assigned to be Worker::id, it means worker's id 2. rename EventCenter::idx to EventCenter::center_id "idx" is EventCenter's index in global_centers obj. rename it to be center_id. 3. rename EventCenter::init API's parameter n to be nevent "n" is actually assigned to EventCenter::nevent. rename it to be "nevent". 4. rename EventCenter::init API's paramter t to be type "t" is corresponding to Epoll Driver's implementation's type. 5. rename EpollDriver::size to be EpollDriver::nevent "size" is actually epoll events number, rename it to be "nevent" 6. use event_id as index name to get event instead of "j" 7. rename "nw" to be "nowait" 8. Processor::start unify variable name with Processor::accept & Processor::stop ==> auto &l to be auto &listen_socket Signed-off-by: Changcheng Liu --- src/msg/async/AsyncMessenger.cc | 8 ++++---- src/msg/async/Event.cc | 35 +++++++++++++++++---------------- src/msg/async/Event.h | 12 +++++------ src/msg/async/EventEpoll.cc | 15 +++++++------- src/msg/async/EventEpoll.h | 4 ++-- src/msg/async/Stack.cc | 18 ++++++++--------- src/msg/async/Stack.h | 8 ++++---- src/msg/async/rdma/RDMAStack.cc | 4 ++-- 8 files changed, 52 insertions(+), 52 deletions(-) diff --git a/src/msg/async/AsyncMessenger.cc b/src/msg/async/AsyncMessenger.cc index 47405cf18d871..fc1107279e667 100644 --- a/src/msg/async/AsyncMessenger.cc +++ b/src/msg/async/AsyncMessenger.cc @@ -155,13 +155,13 @@ void Processor::start() // start thread worker->center.submit_to(worker->center.get_id(), [this]() { - for (auto& l : listen_sockets) { - if (l) { - if (l.fd() == -1) { + for (auto& listen_socket : listen_sockets) { + if (listen_socket) { + if (listen_socket.fd() == -1) { ldout(msgr->cct, 1) << __func__ << " Erro: processor restart after listen_socket.fd closed. " << this << dendl; return; } - worker->center.create_file_event(l.fd(), EVENT_READABLE, + worker->center.create_file_event(listen_socket.fd(), EVENT_READABLE, listen_handler); } } }, false); diff --git a/src/msg/async/Event.cc b/src/msg/async/Event.cc index 145bc5c40064e..6cd707d6f9011 100644 --- a/src/msg/async/Event.cc +++ b/src/msg/async/Event.cc @@ -100,15 +100,15 @@ ostream& EventCenter::_event_prefix(std::ostream *_dout) << " time_id=" << time_event_next_id << ")."; } -int EventCenter::init(int n, unsigned i, const std::string &t) +int EventCenter::init(int nevent, unsigned center_id, const std::string &type) { // can't init multi times - ceph_assert(nevent == 0); + ceph_assert(this->nevent == 0); - type = t; - idx = i; + this->type = type; + this->center_id = center_id; - if (t == "dpdk") { + if (type == "dpdk") { #ifdef HAVE_DPDK driver = new DPDKDriver(cct); #endif @@ -129,14 +129,14 @@ int EventCenter::init(int n, unsigned i, const std::string &t) return -1; } - int r = driver->init(this, n); + int r = driver->init(this, nevent); if (r < 0) { lderr(cct) << __func__ << " failed to init event driver." << dendl; return r; } - file_events.resize(n); - nevent = n; + file_events.resize(nevent); + this->nevent = nevent; if (!driver->need_wakeup()) return 0; @@ -190,13 +190,13 @@ EventCenter::~EventCenter() void EventCenter::set_owner() { owner = pthread_self(); - ldout(cct, 2) << __func__ << " idx=" << idx << " owner=" << owner << dendl; + ldout(cct, 2) << __func__ << " center_id=" << center_id << " owner=" << owner << dendl; if (!global_centers) { global_centers = &cct->lookup_or_create_singleton_object< EventCenter::AssociatedCenters>( "AsyncMessenger::EventCenter::global_center::" + type, true); ceph_assert(global_centers); - global_centers->centers[idx] = this; + global_centers->centers[center_id] = this; if (driver->need_wakeup()) { notify_handler = new C_handle_notify(this, cct); int r = create_file_event(notify_receive_fd, EVENT_READABLE, notify_handler); @@ -391,29 +391,30 @@ int EventCenter::process_events(unsigned timeout_microseconds, ceph::timespan * vector fired_events; numevents = driver->event_wait(fired_events, &tv); auto working_start = ceph::mono_clock::now(); - for (int j = 0; j < numevents; j++) { + for (int event_id = 0; event_id < numevents; event_id++) { int rfired = 0; FileEvent *event; EventCallbackRef cb; - event = _get_file_event(fired_events[j].fd); + event = _get_file_event(fired_events[event_id].fd); /* note the event->mask & mask & ... code: maybe an already processed * event removed an element that fired and we still didn't * processed, so we check if the event is still valid. */ - if (event->mask & fired_events[j].mask & EVENT_READABLE) { + if (event->mask & fired_events[event_id].mask & EVENT_READABLE) { rfired = 1; cb = event->read_cb; - cb->do_request(fired_events[j].fd); + cb->do_request(fired_events[event_id].fd); } - if (event->mask & fired_events[j].mask & EVENT_WRITABLE) { + if (event->mask & fired_events[event_id].mask & EVENT_WRITABLE) { if (!rfired || event->read_cb != event->write_cb) { cb = event->write_cb; - cb->do_request(fired_events[j].fd); + cb->do_request(fired_events[event_id].fd); } } - ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl; + ldout(cct, 30) << __func__ << " event_wq process is " << fired_events[event_id].fd + << " mask is " << fired_events[event_id].mask << dendl; } if (trigger_time) diff --git a/src/msg/async/Event.h b/src/msg/async/Event.h index fdfc0a3157114..66958803038f1 100644 --- a/src/msg/async/Event.h +++ b/src/msg/async/Event.h @@ -172,7 +172,7 @@ class EventCenter { int notify_send_fd; NetHandler net; EventCallbackRef notify_handler; - unsigned idx; + unsigned center_id; AssociatedCenters *global_centers = nullptr; int process_time_events(); @@ -187,14 +187,14 @@ class EventCenter { external_num_events(0), driver(NULL), time_event_next_id(1), notify_receive_fd(-1), notify_send_fd(-1), net(c), - notify_handler(NULL), idx(0) { } + notify_handler(NULL), center_id(0) { } ~EventCenter(); ostream& _event_prefix(std::ostream *_dout); - int init(int nevent, unsigned idx, const std::string &t); + int init(int nevent, unsigned center_id, const std::string &type); void set_owner(); pthread_t get_owner() const { return owner; } - unsigned get_id() const { return idx; } + unsigned get_id() const { return center_id; } EventDriver *get_driver() { return driver; } @@ -221,8 +221,8 @@ class EventCenter { func f; bool nonwait; public: - C_submit_event(func &&_f, bool nw) - : f(std::move(_f)), nonwait(nw) {} + C_submit_event(func &&_f, bool nowait) + : f(std::move(_f)), nonwait(nowait) {} void do_request(uint64_t id) override { f(); lock.lock(); diff --git a/src/msg/async/EventEpoll.cc b/src/msg/async/EventEpoll.cc index e7b4ac44981db..3eb91238461a5 100644 --- a/src/msg/async/EventEpoll.cc +++ b/src/msg/async/EventEpoll.cc @@ -47,7 +47,7 @@ int EpollDriver::init(EventCenter *c, int nevent) return -e; } - size = nevent; + this->nevent = nevent; return 0; } @@ -121,23 +121,22 @@ int EpollDriver::event_wait(vector &fired_events, struct timeval { int retval, numevents = 0; - retval = epoll_wait(epfd, events, size, + retval = epoll_wait(epfd, events, nevent, tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1); if (retval > 0) { - int j; - numevents = retval; fired_events.resize(numevents); - for (j = 0; j < numevents; j++) { + + for (int event_id = 0; event_id < numevents; event_id++) { int mask = 0; - struct epoll_event *e = events + j; + struct epoll_event *e = &events[event_id]; if (e->events & EPOLLIN) mask |= EVENT_READABLE; if (e->events & EPOLLOUT) mask |= EVENT_WRITABLE; if (e->events & EPOLLERR) mask |= EVENT_READABLE|EVENT_WRITABLE; if (e->events & EPOLLHUP) mask |= EVENT_READABLE|EVENT_WRITABLE; - fired_events[j].fd = e->data.fd; - fired_events[j].mask = mask; + fired_events[event_id].fd = e->data.fd; + fired_events[event_id].mask = mask; } } return numevents; diff --git a/src/msg/async/EventEpoll.h b/src/msg/async/EventEpoll.h index abc4b8bbbfbf2..0221f90d34c9f 100644 --- a/src/msg/async/EventEpoll.h +++ b/src/msg/async/EventEpoll.h @@ -26,10 +26,10 @@ class EpollDriver : public EventDriver { int epfd; struct epoll_event *events; CephContext *cct; - int size; + int nevent; public: - explicit EpollDriver(CephContext *c): epfd(-1), events(NULL), cct(c), size(0) {} + explicit EpollDriver(CephContext *c): epfd(-1), events(NULL), cct(c), nevent(0) {} ~EpollDriver() override { if (epfd != -1) close(epfd); diff --git a/src/msg/async/Stack.cc b/src/msg/async/Stack.cc index 5db63006c895f..6b18d1de9cb81 100644 --- a/src/msg/async/Stack.cc +++ b/src/msg/async/Stack.cc @@ -34,9 +34,9 @@ #undef dout_prefix #define dout_prefix *_dout << "stack " -std::function NetworkStack::add_thread(unsigned i) +std::function NetworkStack::add_thread(unsigned worker_id) { - Worker *w = workers[i]; + Worker *w = workers[worker_id]; return [this, w]() { char tp_name[16]; sprintf(tp_name, "msgr-worker-%u", w->id); @@ -82,17 +82,17 @@ std::shared_ptr NetworkStack::create(CephContext *c, const string return nullptr; } -Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i) +Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned worker_id) { if (type == "posix") - return new PosixWorker(c, i); + return new PosixWorker(c, worker_id); #ifdef HAVE_RDMA else if (type == "rdma") - return new RDMAWorker(c, i); + return new RDMAWorker(c, worker_id); #endif #ifdef HAVE_DPDK else if (type == "dpdk") - return new DPDKWorker(c, i); + return new DPDKWorker(c, worker_id); #endif lderr(c) << __func__ << " ms_async_transport_type " << type << @@ -115,9 +115,9 @@ NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(fa num_workers = EventCenter::MAX_EVENTCENTER; } - for (unsigned i = 0; i < num_workers; ++i) { - Worker *w = create_worker(cct, type, i); - w->center.init(InitEventNumber, i, type); + for (unsigned worker_id = 0; worker_id < num_workers; ++worker_id) { + Worker *w = create_worker(cct, type, worker_id); + w->center.init(InitEventNumber, worker_id, type); workers.push_back(w); } } diff --git a/src/msg/async/Stack.h b/src/msg/async/Stack.h index a7430da873531..d33011a7a926f 100644 --- a/src/msg/async/Stack.h +++ b/src/msg/async/Stack.h @@ -231,8 +231,8 @@ class Worker { Worker(const Worker&) = delete; Worker& operator=(const Worker&) = delete; - Worker(CephContext *c, unsigned i) - : cct(c), perf_logger(NULL), id(i), references(0), center(c) { + Worker(CephContext *c, unsigned worker_id) + : cct(c), perf_logger(NULL), id(worker_id), references(0), center(c) { char name[128]; sprintf(name, "AsyncMessenger::Worker-%u", id); // initialize perf_logger @@ -339,8 +339,8 @@ class NetworkStack { void start(); void stop(); virtual Worker *get_worker(); - Worker *get_worker(unsigned i) { - return workers[i]; + Worker *get_worker(unsigned worker_id) { + return workers[worker_id]; } void drain(); unsigned get_num_worker() const { diff --git a/src/msg/async/rdma/RDMAStack.cc b/src/msg/async/rdma/RDMAStack.cc index 76eb27692d843..8051687ec963e 100644 --- a/src/msg/async/rdma/RDMAStack.cc +++ b/src/msg/async/rdma/RDMAStack.cc @@ -530,8 +530,8 @@ void RDMADispatcher::handle_rx_event(ibv_wc *cqe, int rx_number) polled.clear(); } -RDMAWorker::RDMAWorker(CephContext *c, unsigned i) - : Worker(c, i), +RDMAWorker::RDMAWorker(CephContext *c, unsigned worker_id) + : Worker(c, worker_id), tx_handler(new C_handle_cq_tx(this)) { // initialize perf_logger -- 2.39.5