}
};
+class C_handle_remote_reset : public EventCallback {
+ AsyncMessenger *msgr;
+ AsyncConnectionRef conn;
+
+ public:
+ C_handle_remote_reset(AsyncMessenger *m, AsyncConnectionRef c): msgr(m), conn(c) {}
+ void do_request(int id) {
+ msgr->ms_deliver_handle_remote_reset(conn.get());
+ }
+};
+
class C_handle_dispatch : public EventCallback {
AsyncMessenger *msgr;
Message *m;
read_handler.reset(new C_handle_read(this));
write_handler.reset(new C_handle_write(this));
reset_handler.reset(new C_handle_reset(async_msgr, this));
+ remote_reset_handler.reset(new C_handle_remote_reset(async_msgr, this));
}
AsyncConnection::~AsyncConnection()
int next_state;
if (reply_tag == CEPH_MSGR_TAG_SEQ) {
- reply_bl.append((char*)existing_seq, sizeof(existing_seq));
+ reply_bl.append((char*)&existing_seq, sizeof(existing_seq));
next_state = STATE_ACCEPTING_WAIT_SEQ;
} else {
next_state = STATE_ACCEPTING_READY;
int AsyncConnection::send_message(Message *m)
{
+ ldout(async_msgr->cct, 10) << __func__ << dendl;
m->get_header().src = async_msgr->get_myname();
if (!m->get_priority())
m->set_priority(async_msgr->get_default_send_priority());
Mutex::Locker l(lock);
out_q[m->get_priority()].push_back(m);
- if (state == STATE_STANDBY && !policy.server) {
+ if ((state == STATE_STANDBY || state == STATE_CLOSED) && !policy.server) {
_connect();
} else if (sd > 0 && !open_write) {
center->create_file_event(sd, EVENT_WRITABLE, write_handler);
discard_out_queue();
outcoming_bl.clear();
- async_msgr->ms_deliver_handle_remote_reset(this);
+ center->create_time_event(0, remote_reset_handler);
if (randomize_out_seq()) {
lsubdout(async_msgr->cct,ms,15) << __func__ << " Could not get random bytes to set seq number for session reset; set seq number to " << out_seq << dendl;
shutdown_socket();
discard_out_queue();
outcoming_bl.clear();
+ if (policy.lossy)
+ was_session_reset();
center->delete_file_event(sd, EVENT_READABLE|EVENT_WRITABLE);
open_write = false;
state = STATE_CLOSED;