From: Haomai Wang Date: Fri, 12 Sep 2014 07:53:27 +0000 (+0800) Subject: Add support for mon library X-Git-Tag: v0.88~37^2~4^2~31 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=ec913d0e914cecf9eece05a78c449455663081da;p=ceph.git Add support for mon library Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index 730cf484918..a1f71de8207 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -56,6 +56,17 @@ class C_handle_reset : public EventCallback { } }; +class C_handle_dispatch : public EventCallback { + AsyncMessenger *msgr; + Message *m; + + public: + C_handle_dispatch(AsyncMessenger *msgr, Message *m): msgr(msgr), m(m) {} + void do_request(int id) { + msgr->ms_deliver_dispatch(m); + } +}; + static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) { // create a buffer to read into that matches the data alignment @@ -83,8 +94,9 @@ static void alloc_aligned_buffer(bufferlist& data, unsigned len, unsigned off) AsyncConnection::AsyncConnection(CephContext *cct, AsyncMessenger *m) : Connection(cct, m), async_msgr(m), global_seq(0), connect_seq(0), out_seq(0), in_seq(0), in_seq_acked(0), state(STATE_NONE), state_after_send(0), sd(-1), - lock("AsyncConnection::lock"), - got_bad_auth(false), authorizer(NULL), state_offset(0), net(cct), center(&m->center) { } + lock("AsyncConnection::lock"), open_write(false), + got_bad_auth(false), authorizer(NULL), + state_buffer(4096), state_offset(0), net(cct), center(&m->center) { } AsyncConnection::~AsyncConnection() { @@ -100,13 +112,13 @@ int AsyncConnection::read_bulk(int fd, char *buf, int len) if (errno == EAGAIN || errno == EINTR) { nread = 0; } else { - ldout(async_msgr->cct, 1) << __func__ << " Reading from fd %d " << fd + ldout(async_msgr->cct, 1) << __func__ << " Reading from fd=" << fd << " : "<< strerror(errno) << dendl; return -1; } } else if (nread == 0) { - ldout(async_msgr->cct, 1) << __func__ << " Peer close file descriptor %d " - << fd << dendl; + ldout(async_msgr->cct, 1) << __func__ << " Peer close file descriptor " + << fd << dendl; return -1; } return nread; @@ -185,6 +197,7 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) struct iovec *msgvec = new iovec[size]; memset(&msg, 0, sizeof(msg)); msg.msg_iovlen = 0; + msg.msg_iov = msgvec; int msglen = 0; while (size > 0) { msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str()); @@ -199,10 +212,10 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) if (r < 0) return r; + delete msgvec; // "r" is the remaining length sended += msglen - r; if (r > 0) { - center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this)); ldout(async_msgr->cct, 5) << __func__ << " remaining " << r << " needed to be sent, creating event for writing" << dendl; @@ -218,8 +231,15 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) bl.swap(outcoming_bl); } - if (!outcoming_bl.length()) + if (!open_write && is_queued()) { + center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this)); + open_write = true; + } + + if (open_write && !is_queued()) { center->delete_file_event(sd, EVENT_WRITABLE); + open_write = false; + } return outcoming_bl.length(); } @@ -258,12 +278,12 @@ int AsyncConnection::read_until(uint64_t needed, bufferptr &p) void AsyncConnection::process() { int r = 0; - int prev_state; + int prev_state = state; Mutex::Locker l(lock); do { - prev_state = state; ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state) << ", prev state is " << get_state_name(prev_state) << dendl; + prev_state = state; switch (state) { case STATE_OPEN: { @@ -404,11 +424,10 @@ void AsyncConnection::process() } // Reset state + data_buf.clear(); front.clear(); middle.clear(); data.clear(); - memset(&connect_msg, 0, sizeof(connect_msg)); - memset(&connect_reply, 0, sizeof(connect_reply)); recv_stamp = ceph_clock_now(async_msgr->cct); current_header = header; state = STATE_OPEN_MESSAGE_THROTTLE_MESSAGE; @@ -422,10 +441,10 @@ void AsyncConnection::process() << policy.throttler_messages->get_current() << "/" << policy.throttler_messages->get_max() << dendl; // FIXME: may block - if (policy.throttler_messages->get()) - state = STATE_OPEN_MESSAGE_THROTTLE_BYTES; + policy.throttler_messages->get(); } + state = STATE_OPEN_MESSAGE_THROTTLE_BYTES; break; } @@ -438,11 +457,11 @@ void AsyncConnection::process() << policy.throttler_bytes->get_current() << "/" << policy.throttler_bytes->get_max() << dendl; // FIXME: may block - if (policy.throttler_bytes->get(message_size)) - state = STATE_OPEN_MESSAGE_READ_FRONT; + policy.throttler_bytes->get(message_size); } } + state = STATE_OPEN_MESSAGE_READ_FRONT; break; } @@ -493,26 +512,23 @@ void AsyncConnection::process() // read data uint64_t data_len = le32_to_cpu(current_header.data_len); int data_off = le32_to_cpu(current_header.data_off); - bufferlist bl; if (data_len) { // get a buffer - lock.Lock(); map >::iterator p = rx_buffers.find(current_header.tid); if (p != rx_buffers.end()) { ldout(async_msgr->cct,10) << __func__ << " seleting rx buffer v " << p->second.second << " at offset " << data_off << " len " << p->second.first.length() << dendl; - bl = p->second.first; + data_buf = p->second.first; // make sure it's big enough - if (bl.length() < data_len) - bl.push_back(buffer::create(data_len - bl.length())); - data_blp = bl.begin(); + if (data_buf.length() < data_len) + data_buf.push_back(buffer::create(data_len - data_buf.length())); + data_blp = data_buf.begin(); } else { ldout(async_msgr->cct,20) << __func__ << " allocating new rx buffer at offset " << data_off << dendl; - alloc_aligned_buffer(bl, data_len, data_off); - data_blp = bl.begin(); + alloc_aligned_buffer(data_buf, data_len, data_off); + data_blp = data_buf.begin(); } - lock.Unlock(); } msg_left = data_len; @@ -522,7 +538,7 @@ void AsyncConnection::process() case STATE_OPEN_MESSAGE_READ_DATA: { - do { + while (msg_left > 0) { bufferptr bp = data_blp.get_current_ptr(); uint64_t read = MIN(bp.length(), msg_left); r = read_until(read, bp); @@ -535,7 +551,8 @@ void AsyncConnection::process() data_blp.advance(read); data.append(bp, 0, read); - } while (msg_left > 0); + msg_left -= read; + } if (msg_left == 0) state = STATE_OPEN_MESSAGE_READ_FOOTER_AND_DISPATCH; @@ -593,7 +610,7 @@ void AsyncConnection::process() // ceph::shared_ptr auth_handler = session_security; - if (auth_handler) { + if (auth_handler == NULL) { ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl; } else { if (auth_handler->check_message_signature(message)) { @@ -638,7 +655,7 @@ void AsyncConnection::process() if (async_msgr->ms_can_fast_dispatch(message)) { async_msgr->ms_fast_dispatch(message); } else { - async_msgr->ms_deliver_dispatch(message); + center->create_time_event(0, new C_handle_dispatch(async_msgr, message)); } state = STATE_OPEN; @@ -662,13 +679,6 @@ void AsyncConnection::process() case STATE_CLOSED: { ldout(async_msgr->cct, 20) << __func__ << " socket closed" << dendl; - center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); - break; - } - - case STATE_FAULT: - { - ldout(async_msgr->cct, 20) << __func__ << " socket is in error" << dendl; break; } @@ -676,9 +686,12 @@ void AsyncConnection::process() { if (_process_connection() < 0) goto fail; + break; } } + continue; + fail: // clean up state internal variables and states if (state >= STATE_CONNECTING_SEND_CONNECT_MSG && @@ -709,7 +722,6 @@ fail: } } fault(); - state = STATE_FAULT; } while (prev_state != state); } @@ -766,6 +778,8 @@ int AsyncConnection::_process_connection() goto fail; } + ldout(async_msgr->cct, 10) << __func__ << " get banner, ready to send banner" << dendl; + bufferlist bl; bl.append(state_buffer.c_str(), strlen(CEPH_BANNER)); r = _try_send(bl); @@ -852,30 +866,28 @@ int AsyncConnection::_process_connection() delete authorizer; authorizer = async_msgr->get_authorizer(peer_type, false); } - assert(authorizer); bufferlist bl; - ceph_msg_connect connect; - connect.features = policy.features_supported; - connect.host_type = async_msgr->get_myinst().name.type(); - connect.global_seq = global_seq; - connect.connect_seq = connect_seq; - connect.protocol_version = async_msgr->get_proto_version(peer_type, true); - connect.authorizer_protocol = authorizer ? authorizer->protocol : 0; - connect.authorizer_len = authorizer ? authorizer->bl.length() : 0; + connect_msg.features = policy.features_supported; + connect_msg.host_type = async_msgr->get_myinst().name.type(); + connect_msg.global_seq = global_seq; + connect_msg.connect_seq = connect_seq; + connect_msg.protocol_version = async_msgr->get_proto_version(peer_type, true); + connect_msg.authorizer_protocol = authorizer ? authorizer->protocol : 0; + connect_msg.authorizer_len = authorizer ? authorizer->bl.length() : 0; if (authorizer) - ldout(async_msgr->cct, 10) << __func__ << "connect.authorizer_len=" - << connect.authorizer_len << " protocol=" - << connect.authorizer_protocol << dendl; - connect.flags = 0; + ldout(async_msgr->cct, 10) << __func__ << "connect_msg.authorizer_len=" + << connect_msg.authorizer_len << " protocol=" + << connect_msg.authorizer_protocol << dendl; + connect_msg.flags = 0; if (policy.lossy) - connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides! - bl.append((char*)&connect, sizeof(connect)); + connect_msg.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides! + bl.append((char*)&connect_msg, sizeof(connect_msg)); if (authorizer) { bl.append(authorizer->bl.c_str(), authorizer->bl.length()); } ldout(async_msgr->cct, 10) << __func__ << " connect sending gseq=" << global_seq << " cseq=" - << connect_seq << " proto=" << connect.protocol_version << dendl; + << connect_seq << " proto=" << connect_msg.protocol_version << dendl; r = _try_send(bl); if (r == 0) { @@ -932,7 +944,7 @@ int AsyncConnection::_process_connection() authorizer_reply.push_back(state_buffer); bufferlist::iterator iter = authorizer_reply.begin(); - if (!authorizer->verify_reply(iter)) { + if (authorizer && !authorizer->verify_reply(iter)) { ldout(async_msgr->cct, 0) << __func__ << " failed verifying authorize reply" << dendl; goto fail; } @@ -1021,7 +1033,9 @@ int AsyncConnection::_process_connection() got_bad_auth = false; delete authorizer; authorizer = NULL; - return 0; + memset(&connect_msg, 0, sizeof(connect_msg)); + memset(&connect_reply, 0, sizeof(connect_reply)); + break; } case STATE_ACCEPTING: @@ -1176,6 +1190,7 @@ int AsyncConnection::_process_connection() { ldout(async_msgr->cct, 20) << __func__ << " accept done" << dendl; state = STATE_OPEN; + memset(&connect_msg, 0, sizeof(connect_msg)); break; } @@ -1216,7 +1231,6 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co if (got_bad_auth) goto fail; got_bad_auth = true; - assert(authorizer); delete authorizer; authorizer = async_msgr->get_authorizer(peer_type, true); // try harder state = STATE_CONNECTING_SEND_CONNECT_MSG; @@ -1446,11 +1460,11 @@ int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &a lock.Lock(); } else { lock.Lock(); - existing->lock.Unlock(); + existing->lock.Lock(); } if (existing->policy.lossy) { // disconnect from the Connection - async_msgr->ms_deliver_handle_reset(existing); + async_msgr->ms_deliver_handle_reset(existing.get()); } else { // queue a reset on the new connection, which we're dumping for the old async_msgr->ms_deliver_handle_reset(this); @@ -1561,6 +1575,21 @@ void AsyncConnection::accept(int incoming) process(); } +int AsyncConnection::send_message(Message *m) +{ + m->get_header().src = async_msgr->get_myname(); + if (!m->get_priority()) + m->set_priority(async_msgr->get_default_send_priority()); + + Mutex::Locker l(lock); + out_q[m->get_priority()].push_back(m); + if (sd > 0 && !open_write) { + center->create_file_event(sd, EVENT_WRITABLE, new C_handle_write(this)); + open_write = true; + } + return 0; +} + void AsyncConnection::requeue_sent() { if (sent.empty()) @@ -1649,6 +1678,7 @@ void AsyncConnection::fault() shutdown_socket(); center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); + open_write = false; // requeue sent items requeue_sent(); @@ -1714,6 +1744,8 @@ void AsyncConnection::_stop() shutdown_socket(); discard_out_queue(); outcoming_bl.clear(); + center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE); + open_write = false; state = STATE_CLOSED; } @@ -1865,6 +1897,7 @@ void AsyncConnection::_send_keepalive_or_ack(bool ack, utime_t *tp) bl.append(CEPH_MSGR_TAG_KEEPALIVE); } + ldout(async_msgr->cct, 10) << __func__ << " try send keepalive or ack" << dendl; _try_send(bl, false); } @@ -1873,30 +1906,48 @@ void AsyncConnection::handle_write() ldout(async_msgr->cct, 10) << __func__ << " started." << dendl; Mutex::Locker l(lock); bufferlist bl; - if (in_seq > in_seq_acked) { - ceph_le64 s; - s = in_seq; - bl.append(CEPH_MSGR_TAG_ACK); - bl.append((char*)&s, sizeof(s)); - } + int r; + if (state >= STATE_OPEN && state <= STATE_OPEN_TAG_CLOSE) { + if (in_seq > in_seq_acked) { + ceph_le64 s; + s = in_seq; + bl.append(CEPH_MSGR_TAG_ACK); + bl.append((char*)&s, sizeof(s)); + ldout(async_msgr->cct, 10) << __func__ << " try send msg ack" << dendl; + } - int r = _try_send(bl); - if (r < 0) - goto fail; - else if (r > 0) - return ; + r = _try_send(bl); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send msg ack failed :" + << strerror(errno) << dendl; + goto fail; + } else if (r > 0) { + return ; + } - while (1) { - Message *m = _get_next_outgoing(); - if (!m) - break; + while (1) { + Message *m = _get_next_outgoing(); + if (!m) + break; - assert(m); - r = _send(m); - if (r < 0) + assert(m); + ldout(async_msgr->cct, 10) << __func__ << " try send msg " << m << dendl; + r = _send(m); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send msg failed :" + << strerror(errno) << dendl; + goto fail; + } else if (r > 0) { + break; + } + } + } else { + r = _try_send(bl); + if (r < 0) { + ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed :" + << strerror(errno) << dendl; goto fail; - else if (r > 0) - break; + } } return ; diff --git a/src/msg/AsyncConnection.h b/src/msg/AsyncConnection.h index a010cb0912b..dfc28c8c62f 100644 --- a/src/msg/AsyncConnection.h +++ b/src/msg/AsyncConnection.h @@ -93,8 +93,8 @@ class AsyncConnection : public Connection { ostream& _conn_prefix(std::ostream *_dout); bool is_connected() { - Mutex::Locker l(lock); - return state != STATE_CLOSED; + // FIXME? + return true; } // Only call when AsyncConnection first construct @@ -106,15 +106,12 @@ class AsyncConnection : public Connection { } // Only call when AsyncConnection first construct void accept(int sd); - int send_message(Message *m) { - Mutex::Locker l(lock); - out_q[m->get_priority()].push_back(m); - return 0; - } + int send_message(Message *m); void send_keepalive() { Mutex::Locker l(lock); - _send_keepalive_or_ack(); + if (state == STATE_OPEN) + _send_keepalive_or_ack(); } void mark_down() { Mutex::Locker l(lock); @@ -163,7 +160,6 @@ class AsyncConnection : public Connection { STATE_STANDBY, STATE_CLOSED, STATE_WAIT, // just wait for racing connection - STATE_FAULT }; static const char *get_state_name(int state) { @@ -219,6 +215,7 @@ class AsyncConnection : public Connection { list sent; Mutex lock; utime_t backoff; // backoff time + bool open_write; // Tis section are temp variables used by state transition @@ -227,13 +224,14 @@ class AsyncConnection : public Connection { utime_t throttle_stamp; uint64_t msg_left; ceph_msg_header current_header; + bufferlist data_buf; bufferlist::iterator data_blp; bufferlist front, middle, data; ceph_msg_connect connect_msg; - ceph_msg_connect_reply connect_reply; // Connecting state bool got_bad_auth; AuthAuthorizer *authorizer; + ceph_msg_connect_reply connect_reply; // Accepting state entity_addr_t socket_addr; CryptoKey session_key; diff --git a/src/msg/AsyncMessenger.cc b/src/msg/AsyncMessenger.cc index b1f69f447a3..abd27622ecc 100644 --- a/src/msg/AsyncMessenger.cc +++ b/src/msg/AsyncMessenger.cc @@ -186,7 +186,8 @@ int Processor::start() // start thread create(); - center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this)); + if (listen_sd >= 0) + center->create_file_event(listen_sd, EVENT_READABLE, new C_handle_accept(this)); return 0; } @@ -216,7 +217,7 @@ void *Processor::entry() while (!done) { ldout(msgr->cct,20) << __func__ << " calling poll" << dendl; - r = center->process_events(30000); + r = center->process_events(1000); if (r < 0) { ldout(msgr->cct,20) << __func__ << " process events failed: " << cpp_strerror(errno) << dendl; @@ -239,7 +240,8 @@ void Processor::stop() done = true; ldout(msgr->cct, 10) << __func__ << " processor" << dendl; - center->delete_file_event(listen_sd, EVENT_READABLE); + if (listen_sd >= 0) + center->delete_file_event(listen_sd, EVENT_READABLE); if (listen_sd >= 0) { ::shutdown(listen_sd, SHUT_RDWR); } @@ -269,12 +271,13 @@ AsyncMessenger::AsyncMessenger(CephContext *cct, entity_name_t name, lock("AsyncMessenger::lock"), nonce(_nonce), did_bind(false), global_seq(0), - cluster_protocol(0), + cluster_protocol(0), stopped(true), local_connection(new AsyncConnection(cct, this)), center(cct) { ceph_spin_init(&global_seq_lock); init_local_connection(); + center.init(5000); } /** @@ -291,18 +294,20 @@ void AsyncMessenger::ready() ldout(cct,10) << __func__ << " " << get_myaddr() << dendl; lock.Lock(); - if (did_bind) - processor.start(); + processor.start(); lock.Unlock(); } int AsyncMessenger::shutdown() { ldout(cct,10) << __func__ << "shutdown " << get_myaddr() << dendl; + center.stop(); mark_down_all(); // break ref cycles on the loopback connection local_connection->set_priv(NULL); + stop_cond.Signal(); + stopped = true; return 0; } @@ -330,6 +335,7 @@ int AsyncMessenger::rebind(const set& avoid_ports) { ldout(cct,1) << __func__ << " rebind avoid " << avoid_ports << dendl; assert(did_bind); + center.stop(); processor.stop(); mark_down_all(); return processor.rebind(avoid_ports); @@ -345,6 +351,7 @@ int AsyncMessenger::start() assert(!started); started = true; + stopped = false; if (!did_bind) { my_inst.addr.nonce = nonce; @@ -353,8 +360,7 @@ int AsyncMessenger::start() lock.Unlock(); - // FIXME - center.init(5000); + center.start(); return 0; } @@ -365,17 +371,15 @@ void AsyncMessenger::wait() lock.Unlock(); return; } + if (!stopped) + stop_cond.Wait(lock); lock.Unlock(); // done! clean up. - if (did_bind) { - ldout(cct,20) << __func__ << ": stopping processor thread" << dendl; - processor.stop(); - did_bind = false; - ldout(cct,20) << __func__ << ": stopped processor thread" << dendl; - } - - center.stop(); + ldout(cct,20) << __func__ << ": stopping processor thread" << dendl; + processor.stop(); + did_bind = false; + ldout(cct,20) << __func__ << ": stopped processor thread" << dendl; // close all pipes lock.Lock(); @@ -447,12 +451,6 @@ ConnectionRef AsyncMessenger::get_loopback_connection() int AsyncMessenger::_send_message(Message *m, const entity_inst_t& dest) { - // set envelope - m->get_header().src = get_myname(); - - if (!m->get_priority()) - m->set_priority(get_default_send_priority()); - ldout(cct, 1) << __func__ << "--> " << dest.name << " " << dest.addr << " -- " << *m << " -- ?+" << m->get_data().length() << " " << m << dendl; diff --git a/src/msg/AsyncMessenger.h b/src/msg/AsyncMessenger.h index efe811f52c5..751d87fdcc5 100644 --- a/src/msg/AsyncMessenger.h +++ b/src/msg/AsyncMessenger.h @@ -267,6 +267,9 @@ private: /// internal cluster protocol version, if any, for talking to entities of the same type. int cluster_protocol; + Cond stop_cond; + bool stopped; + AsyncConnectionRef _lookup_conn(const entity_addr_t& k) { assert(lock.is_locked()); ceph::unordered_map::iterator p = conns.find(k); @@ -274,7 +277,6 @@ private: return NULL; if (!p->second->is_connected()) { // FIXME - p->second->put(); conns.erase(p); return NULL; } @@ -318,8 +320,6 @@ public: void accept_conn(AsyncConnectionRef conn) { Mutex::Locker l(lock); - if (conns.count(conn->get_peer_addr())) - delete conns[conn->get_peer_addr()]; conns[conn->peer_addr] = conn; accepting_conns.erase(conn); } diff --git a/src/msg/Event.cc b/src/msg/Event.cc index 4d300a6a887..3c3a22ffb70 100644 --- a/src/msg/Event.cc +++ b/src/msg/Event.cc @@ -46,7 +46,6 @@ int EventCenter::init(int n) } nevent = n; - event_tp.start(); return 0; } @@ -93,6 +92,8 @@ int EventCenter::create_file_event(int fd, int mask, EventCallback *ctxt) delete event->write_cb; event->write_cb = ctxt; } + ldout(cct, 10) << __func__ << " create event fd=" << fd << " mask=" << mask + << " now mask is " << event->mask << dendl; return 0; } @@ -101,26 +102,43 @@ void EventCenter::delete_file_event(int fd, int mask) Mutex::Locker l(lock); EventCenter::FileEvent *event = _get_file_event(fd); - driver->del_event(fd, event ? event->mask: EVENT_NONE, mask); - if (!event) { - file_events[fd] = EventCenter::FileEvent(); - event = &file_events[fd]; - } + if (!event) + return ; + + driver->del_event(fd, event->mask, mask); - if (event->read_cb) + if (mask & EVENT_READABLE && event->read_cb) { delete event->read_cb; - if (event->write_cb) + event->read_cb = NULL; + } + if (mask & EVENT_WRITABLE && event->write_cb) { delete event->write_cb; + event->write_cb = NULL; + } event->mask = event->mask & (~mask); if (event->mask == EVENT_NONE) file_events.erase(fd); + ldout(cct, 10) << __func__ << " delete fd=" << fd << " mask=" << mask + << " now mask is " << event->mask << dendl; } uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ctxt) { Mutex::Locker l(lock); uint64_t id = time_event_next_id++; + + ldout(cct, 10) << __func__ << " id=" << id << " expire time=" << milliseconds << dendl; + // Direct dispatch + if (milliseconds == 0) { + FiredEvent e; + e.time_event.id = id; + e.time_event.time_cb = ctxt; + e.is_file = false; + event_wq.queue(e); + return id; + } + EventCenter::TimeEvent event; utime_t expire; struct timeval tv; @@ -135,7 +153,6 @@ uint64_t EventCenter::create_time_event(uint64_t milliseconds, EventCallback *ct event.time_cb = ctxt; time_to_ids[expire] = id; time_events[id] = event; - ldout(cct, 10) << __func__ << " id=" << id << " trigger time is " << expire << dendl; return id; } @@ -148,11 +165,19 @@ void EventCenter::delete_time_event(uint64_t id) if (it->second == id) { time_to_ids.erase(it); time_events.erase(id); + ldout(cct, 10) << __func__ << " id=" << id << dendl; return ; } } } +void EventCenter::start() +{ + ldout(cct, 1) << __func__ << dendl; + Mutex::Locker l(lock); + event_tp.start(); +} + void EventCenter::stop() { ldout(cct, 1) << __func__ << dendl; @@ -226,6 +251,11 @@ int EventCenter::process_events(int timeout_millionseconds) { Mutex::Locker l(lock); + for (map::iterator it = time_to_ids.begin(); + it != time_to_ids.end(); ++it) { + ldout(cct, 10) << __func__ << " time_to_ids " << it->first << " id=" << it->second << dendl; + } + map::iterator it = time_to_ids.begin(); if (it != time_to_ids.end() && shortest > it->first) { ldout(cct, 10) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl; @@ -247,6 +277,7 @@ int EventCenter::process_events(int timeout_millionseconds) e.file_event = fired_events[j]; e.is_file = true; event_wq.queue(e); + ldout(cct, 10) << __func__ << " event_wq queue fd is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl; } if (trigger_time) diff --git a/src/msg/Event.h b/src/msg/Event.h index d4127966484..19da2a28455 100644 --- a/src/msg/Event.h +++ b/src/msg/Event.h @@ -82,7 +82,7 @@ class EventCenter { uint64_t id; EventCallback *time_cb; - TimeEvent(): id(0), fd(0), time_cb(NULL) {} + TimeEvent(): id(0), time_cb(NULL) {} }; Mutex lock; @@ -195,6 +195,7 @@ class EventCenter { void delete_file_event(int fd, int mask); void delete_time_event(uint64_t id); int process_events(int timeout_milliseconds); + void start(); void stop(); FileEvent *get_file_event(int fd) { Mutex::Locker l(lock); diff --git a/src/msg/EventEpoll.cc b/src/msg/EventEpoll.cc index 865cf012d1b..6d5ec5964bc 100644 --- a/src/msg/EventEpoll.cc +++ b/src/msg/EventEpoll.cc @@ -39,9 +39,10 @@ int EpollDriver::add_event(int fd, int cur_mask, int add_mask) pos = deleted_fds.front(); deleted_fds.pop_front(); } else { - fds[fd] = pos = next_pos; + pos = next_pos; next_pos++; } + fds[fd] = pos; } else { op = cur_mask == EVENT_NONE ? EPOLL_CTL_ADD: EPOLL_CTL_MOD; } @@ -59,6 +60,9 @@ int EpollDriver::add_event(int fd, int cur_mask, int add_mask) << cpp_strerror(errno) << dendl; return -errno; } + + ldout(cct, 10) << __func__ << " add event to fd=" << fd << " mask=" << add_mask + << dendl; return 0; } @@ -77,11 +81,17 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask) ee.data.u64 = 0; /* avoid valgrind warning */ ee.data.fd = fd; if (mask != EVENT_NONE) { - epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ee); + if (epoll_ctl(epfd, EPOLL_CTL_MOD, fd, &ee) < 0) { + lderr(cct) << __func__ << " epoll_ctl: modify fd=" << fd << " mask=" << mask + << " failed." << cpp_strerror(errno) << dendl; + } } else { /* Note, Kernel < 2.6.9 requires a non null event pointer even for * EPOLL_CTL_DEL. */ - epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ee); + if (epoll_ctl(epfd, EPOLL_CTL_DEL, fd, &ee) < 0) { + lderr(cct) << __func__ << " epoll_ctl: delete fd=" << fd + << " failed." << cpp_strerror(errno) << dendl; + } if (next_pos == it->second) next_pos--; @@ -89,6 +99,8 @@ void EpollDriver::del_event(int fd, int cur_mask, int delmask) deleted_fds.push_back(it->second); fds.erase(fd); } + ldout(cct, 10) << __func__ << " del event fd=" << fd << " cur mask=" << mask + << dendl; } int EpollDriver::resize_events(int newsize)