// Now existing connection will be alive and the current connection will
// exchange socket with existing connection because we want to maintain
// original "connection_state"
- existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
+ if (existing->sd > 0)
+ existing->center->delete_file_event(existing->sd, EVENT_READABLE|EVENT_WRITABLE);
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
existing->center->create_file_event(sd, EVENT_READABLE, existing->read_handler);
while (!done) {
ldout(cct, 20) << __func__ << " calling event process" << dendl;
- int r = center.process_events(30000000);
+ int r = center.process_events(EventMaxWaitUs);
if (r < 0) {
ldout(cct, 20) << __func__ << " process events failed: "
<< cpp_strerror(errno) << dendl;
class WorkerPool;
class Worker : public Thread {
+ static const uint64_t InitEventNumber = 5000;
+ static const uint64_t EventMaxWaitUs = 30000000;
CephContext *cct;
WorkerPool *pool;
bool done;
EventCenter center;
Worker(CephContext *c, WorkerPool *p, int i)
: cct(c), pool(p), done(false), id(i), center(c) {
- center.init(5000);
+ center.init(InitEventNumber);
}
void *entry();
void stop();
{
int r = 0;
Mutex::Locker l(file_lock);
- if (fd > nevent) {
+ if (fd >= nevent) {
int new_size = nevent << 2;
while (fd > new_size)
new_size <<= 2;
void EventCenter::delete_file_event(int fd, int mask)
{
+ assert(fd > 0);
Mutex::Locker l(file_lock);
if (fd > nevent) {
ldout(cct, 1) << __func__ << " delete event fd=" << fd << " exceed nevent=" << nevent
::close(listen_sd);
}
+class FakeEvent : public EventCallback {
+
+ public:
+ void do_request(int fd_or_id) {}
+};
+
+TEST(EventCenterTest, FileEventExpansion) {
+ vector<int> sds;
+ EventCenter center;
+ center.init(100);
+ EventCallbackRef e(new FakeEvent());
+ for (int i = 0; i < 10000; i++) {
+ int s = ::socket(AF_INET, SOCK_STREAM, 0);
+ center.create_file_event(i, EVENT_READABLE, e);
+ sds.push_back(::socket(AF_INET, SOCK_STREAM, 0));
+ }
+
+ for (vector<int>::iterator it = sds.begin(); it != sds.end(); ++it)
+ center.delete_file_event(*it, EVENT_READABLE);
+}
+
INSTANTIATE_TEST_CASE_P(
AsyncMessenger,
EventDriverTest,