public:
explicit C_time_wakeup(AsyncConnectionRef c): conn(c) {}
- void do_request(int fd_or_id) override {
+ void do_request(uint64_t fd_or_id) override {
conn->wakeup_from(fd_or_id);
}
};
public:
explicit C_handle_read(AsyncConnectionRef c): conn(c) {}
- void do_request(int fd_or_id) override {
+ void do_request(uint64_t fd_or_id) override {
conn->process();
}
};
public:
explicit C_handle_write(AsyncConnectionRef c): conn(c) {}
- void do_request(int fd) override {
+ void do_request(uint64_t fd) override {
conn->handle_write();
}
};
AsyncConnectionRef conn;
public:
explicit C_clean_handler(AsyncConnectionRef c): conn(c) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
conn->cleanup();
delete this;
}
public:
explicit C_tick_wakeup(AsyncConnectionRef c): conn(c) {}
- void do_request(int fd_or_id) override {
+ void do_request(uint64_t fd_or_id) override {
conn->tick(fd_or_id);
}
};
}
}
-void AsyncConnection::DelayedDelivery::do_request(int id)
+void AsyncConnection::DelayedDelivery::do_request(uint64_t id)
{
Message *m = nullptr;
{
assert(delay_queue.empty());
}
void set_center(EventCenter *c) { center = c; }
- void do_request(int id) override;
+ void do_request(uint64_t id) override;
void queue(double delay_period, utime_t release, Message *m) {
std::lock_guard<std::mutex> l(delay_lock);
delay_queue.push_back(std::make_pair(release, m));
public:
explicit C_processor_accept(Processor *p): pro(p) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
pro->accept();
}
};
public:
explicit C_handle_reap(AsyncMessenger *m): msgr(m) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
// judge whether is a time event
msgr->reap_dead();
}
public:
C_handle_notify(EventCenter *c, CephContext *cc): center(c), cct(cc) {}
- void do_request(int fd_or_id) override {
+ void do_request(uint64_t fd_or_id) override {
char c[256];
int r = 0;
do {
class EventCallback {
public:
- virtual void do_request(int fd_or_id) = 0;
+ virtual void do_request(uint64_t fd_or_id) = 0;
virtual ~EventCallback() {} // we want a virtual destructor!!!
};
public:
C_submit_event(func &&_f, bool nw)
: f(std::move(_f)), nonwait(nw) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
f();
lock.lock();
cond.notify_all();
explicit C_drain(size_t c)
: drain_lock("C_drain::drain_lock"),
drain_count(c) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
Mutex::Locker l(drain_lock);
drain_count--;
if (drain_count == 0) drain_cond.Signal();
public:
C_handle_arp_timeout(arp_for *a, l3addr addr, bool first):
arp(a), paddr(addr), first_request(first) {}
- void do_request(int r) {
+ void do_request(uint64_t r) {
arp->send_query(paddr);
auto &res = arp->_in_progress[paddr];
DPDKQueuePair *_qp;
public:
C_handle_dev_stats(DPDKQueuePair *qp): _qp(qp) { }
- void do_request(int id) {
+ void do_request(uint64_t id) {
_qp->handle_stats();
}
};
public:
C_handle_frag_timeout(ipv4 *i): _ipv4(i) {}
- void do_request(int fd_or_id) {
+ void do_request(uint64_t fd_or_id) {
_ipv4->frag_timeout();
}
};
public:
C_free_on_cpu(deleter &&d, std::function<void()> &&c):
del(std::move(d)), cb(std::move(c)) {}
- void do_request(int fd) {
+ void do_request(uint64_t fd) {
// deleter needs to be moved from lambda capture to be destroyed here
// otherwise deleter destructor will be called on a cpu that called
// create_external_event when work_item is destroyed.
public:
C_handle_delayed_ack(tcb *t): tc(t) { }
- void do_request(int r) {
+ void do_request(uint64_t r) {
tc->_nr_full_seg_received = 0;
tc->output();
}
public:
C_handle_retransmit(tcb *t): tc(t) { }
- void do_request(int r) {
+ void do_request(uint64_t r) {
tc->retransmit();
}
};
public:
C_handle_persist(tcb *t): tc(t) { }
- void do_request(int r) {
+ void do_request(uint64_t r) {
tc->persist();
}
};
public:
C_all_data_acked(tcb *t): tc(t) {}
- void do_request(int fd_or_id) {
+ void do_request(uint64_t fd_or_id) {
tc->close_final_cleanup();
}
};
lw_shared_ptr<tcb> tc;
public:
C_actual_remove_tcb(tcb *t): tc(t->shared_from_this()) {}
- void do_request(int r) {
+ void do_request(uint64_t r) {
delete this;
}
};
public:
C_handle_l2forward(std::shared_ptr<DPDKDevice> &p, unsigned &qd, Packet pkt, unsigned target)
: sdev(p), queue_depth(qd), p(std::move(pkt)), dst(target) {}
- void do_request(int fd) {
+ void do_request(uint64_t fd) {
sdev->l2receive(dst, std::move(p));
queue_depth--;
delete this;
public:
C_arp_learn(DPDKWorker *w, ethernet_address l2, ipv4_address l3)
: worker(w), l2_addr(l2), l3_addr(l3) {}
- void do_request(int id) {
+ void do_request(uint64_t id) {
worker->arp_learn(l2_addr, l3_addr);
delete this;
}
RDMADispatcher *dispatcher;
public:
C_handle_cq_async(RDMADispatcher *w): dispatcher(w) {}
- void do_request(int fd) {
+ void do_request(uint64_t fd) {
// worker->handle_tx_event();
dispatcher->handle_async_event();
}
RDMAWorker *worker;
public:
C_handle_cq_tx(RDMAWorker *w): worker(w) {}
- void do_request(int fd) {
+ void do_request(uint64_t fd) {
worker->handle_pending_message();
}
};
bool active;
public:
C_handle_connection(RDMAConnectedSocketImpl *w): csi(w), active(true) {}
- void do_request(int fd) {
+ void do_request(uint64_t fd) {
if (active)
csi->handle_connection();
}
class FakeEvent : public EventCallback {
public:
- void do_request(int fd_or_id) override {}
+ void do_request(uint64_t fd_or_id) override {}
};
TEST(EventCenterTest, FileEventExpansion) {
public:
CountEvent(std::atomic<unsigned> *atomic, Mutex *l, Cond *c): count(atomic), lock(l), cond(c) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
lock->Lock();
(*count)--;
cond->Signal();
std::atomic_bool done;
public:
C_dispatch(Worker *w, func &&_f): worker(w), f(std::move(_f)), done(false) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
f(worker);
done = true;
}
public:
C_poll(EventCenter *c): center(c), woken(false) {}
- void do_request(int r) override {
+ void do_request(uint64_t r) override {
woken = true;
}
bool poll(int milliseconds) {
T *ctxt;
public:
C_delete(T *c): ctxt(c) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
delete ctxt;
delete this;
}
Client *c;
public:
Client_read_handle(Client *_c): c(_c) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
c->do_read_request();
}
} read_ctxt;
Client *c;
public:
Client_write_handle(Client *_c): c(_c) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
c->do_write_request();
}
} write_ctxt;
Server *s;
public:
Server_read_handle(Server *_s): s(_s) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
s->do_read_request();
}
} read_ctxt;
Server *s;
public:
Server_write_handle(Server *_s): s(_s) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
s->do_write_request();
}
} write_ctxt;
public:
C_accept(StressFactory *f, ServerSocket s, ThreadData *data, Worker *w)
: factory(f), bind_socket(std::move(s)), t_data(data), worker(w) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
while (true) {
entity_addr_t cli_addr;
ConnectedSocket srv_socket;
public:
explicit CountEvent(std::atomic<int64_t> *atomic): count(atomic) {}
- void do_request(int id) override {
+ void do_request(uint64_t id) override {
(*count)--;
}
};