lock.Unlock();
msgr->ms_deliver_handle_connect(con);
con->put();
+ } else if ((long)m == DispatchQueue::D_ACCEPT) {
+ lock.Lock();
+ Connection *con = accept_q.front();
+ accept_q.pop_front();
+ lock.Unlock();
+ msgr->ms_deliver_handle_accept(con);
+ con->put();
} else if ((long)m == DispatchQueue::D_BAD_RESET) {
lock.Lock();
Connection *con = reset_q.front();
map<int, xlist<IncomingQueue *>::iterator> queued_pipe_iters;
atomic_t qlen;
- enum { D_CONNECT = 1, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
- list<Connection*> connect_q;
+ enum { D_CONNECT = 1, D_ACCEPT, D_BAD_REMOTE_RESET, D_BAD_RESET, D_NUM_CODES };
+ list<Connection*> connect_q, accept_q;
list<Connection*> remote_reset_q;
list<Connection*> reset_q;
lock.Unlock();
local_delivery((Message*)D_CONNECT, CEPH_MSG_PRIO_HIGHEST);
}
+ void queue_accept(Connection *con) {
+ lock.Lock();
+ if (stop) {
+ lock.Unlock();
+ return;
+ }
+ accept_q.push_back(con->get());
+ lock.Unlock();
+ local_delivery((Message*)D_ACCEPT, CEPH_MSG_PRIO_HIGHEST);
+ }
void queue_remote_reset(Connection *con) {
lock.Lock();
if (stop) {