if (session->connection) {
messenger->send_message(m, session->connection);
} else {
- messenger->send_message(m, session->inst);
+ session->preopen_out_queue.push_back(m);
}
}
if (session->connection) {
messenger->send_message(m, session->connection);
} else {
- messenger->send_message(m, session->inst);
+ session->preopen_out_queue.push_back(m);
}
}
if (s->connection != con) {
dout(10) << " session connection " << s->connection << " -> " << con << dendl;
s->connection = con;
+
+ // send out any queued messages
+ while (!s->preopen_out_queue.empty()) {
+ messenger->send_message(s->preopen_out_queue.front(), con);
+ s->preopen_out_queue.pop_front();
+ }
}
}
}
assert(session->is_opening());
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->sessionmap.touch_session(session);
- mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
+ mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->connection);
} else if (session->is_closing() ||
session->is_killing()) {
// kill any lingering capabilities, leases, requests
dout(10) << "force_open_sessions opened " << session->inst << dendl;
mds->sessionmap.set_state(session, Session::STATE_OPEN);
mds->sessionmap.touch_session(session);
- mds->messenger->send_message(new MClientSession(CEPH_SESSION_OPEN), session->inst);
+ session->preopen_out_queue.push_back(new MClientSession(CEPH_SESSION_OPEN));
}
} else {
dout(10) << "force_open_sessions skipping already-open " << session->inst << dendl;
Connection *connection;
xlist<Session*>::item item_session_list;
+ list<Message*> preopen_out_queue; ///< messages for client, queued before they connect
+
elist<MDRequest*> requests;
interval_set<inodeno_t> pending_prealloc_inos; // journaling prealloc, will be added to prealloc_inos
lease_seq(0) { }
~Session() {
assert(!item_session_list.is_on_list());
+ while (!preopen_out_queue.empty()) {
+ preopen_out_queue.front()->put();
+ preopen_out_queue.pop_front();
+ }
}
void clear() {