From: Haomai Wang Date: Fri, 19 Sep 2014 06:22:47 +0000 (+0800) Subject: Event: Simply process_event impl X-Git-Tag: v0.88~37^2~4^2~19 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=cce1ae56d3b57fc9c07542bc0fb708ba0cd6c70c;p=ceph.git Event: Simply process_event impl Signed-off-by: Haomai Wang --- diff --git a/src/msg/AsyncConnection.cc b/src/msg/AsyncConnection.cc index 0c0c0ac256af..934740122ce4 100644 --- a/src/msg/AsyncConnection.cc +++ b/src/msg/AsyncConnection.cc @@ -260,7 +260,8 @@ int AsyncConnection::_try_send(bufferlist send_bl, bool send) // trim already sent for outcoming_bl if (sended) { bufferlist bl; - outcoming_bl.splice(sended, outcoming_bl.length()-sended, &bl); + if (sended < outcoming_bl.length()) + outcoming_bl.splice(sended, outcoming_bl.length()-sended, &bl); bl.swap(outcoming_bl); } @@ -645,11 +646,10 @@ void AsyncConnection::process() // Check the signature if one should be present. A zero return indicates success. PLR // - ceph::shared_ptr auth_handler = session_security; - if (auth_handler == NULL) { + if (session_security.get() == NULL) { ldout(async_msgr->cct, 10) << __func__ << " No session security set" << dendl; } else { - if (auth_handler->check_message_signature(message)) { + if (session_security->check_message_signature(message)) { ldout(async_msgr->cct, 0) << __func__ << "Signature check failed" << dendl; goto fail; } @@ -781,6 +781,14 @@ int AsyncConnection::_process_connection() case STATE_CONNECTING: { assert(!policy.server); + + // reset connect state variables + got_bad_auth = false; + delete authorizer; + authorizer = NULL; + memset(&connect_msg, 0, sizeof(connect_msg)); + memset(&connect_reply, 0, sizeof(connect_reply)); + global_seq = async_msgr->get_global_seq(); // close old socket. this is safe because we stopped the reader thread above. if (sd >= 0) @@ -1074,12 +1082,6 @@ int AsyncConnection::_process_connection() open_write = true; } - // reset connect state variables - got_bad_auth = false; - delete authorizer; - authorizer = NULL; - memset(&connect_msg, 0, sizeof(connect_msg)); - memset(&connect_reply, 0, sizeof(connect_reply)); break; } @@ -1632,6 +1634,8 @@ int AsyncConnection::send_message(Message *m) Mutex::Locker l(lock); out_q[m->get_priority()].push_back(m); if ((state == STATE_STANDBY || state == STATE_CLOSED) && !policy.server) { + ldout(async_msgr->cct, 10) << __func__ << " state is " << get_state_name(state) + << " policy.server is false" << dendl; _connect(); } else if (sd > 0 && !open_write) { center->dispatch_event_external(write_handler); @@ -2008,7 +2012,9 @@ void AsyncConnection::handle_write() break; } } - } else { + } else if (state != STATE_CONNECTING && + state != STATE_CLOSED && + state != STATE_STANDBY) { // send_message may call this even if socket is closed r = _try_send(bl); if (r < 0) { ldout(async_msgr->cct, 1) << __func__ << " send outcoming bl failed" << dendl; diff --git a/src/msg/Event.cc b/src/msg/Event.cc index 30c2c87392b5..70781827c68d 100644 --- a/src/msg/Event.cc +++ b/src/msg/Event.cc @@ -246,11 +246,6 @@ int EventCenter::process_events(int timeout_millionseconds) shortest.set_from_timeval(&tv); { - for (map::iterator it = time_to_ids.begin(); - it != time_to_ids.end(); ++it) { - ldout(cct, 10) << __func__ << " time_to_ids " << it->first << " id=" << it->second << dendl; - } - map::iterator it = time_to_ids.begin(); if (it != time_to_ids.end() && shortest > it->first) { ldout(cct, 10) << __func__ << " shortest is " << shortest << " it->first is " << it->first << dendl; @@ -291,7 +286,7 @@ int EventCenter::process_events(int timeout_millionseconds) event->write_cb->do_request(fired_events[j].fd); } - ldout(cct, 20) << __func__ << " event_wq queue fd is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl; + ldout(cct, 20) << __func__ << " event_wq process is " << fired_events[j].fd << " mask is " << fired_events[j].mask << dendl; } if (trigger_time)