Called when an outgoing connection succeeds.
// how i receive messages
virtual bool ms_dispatch(Message *m) = 0;
+ // after a connection connects
+ virtual void ms_handle_connect(Connection *con) { };
+
/*
* on any connection reset.
* this indicates that the ordered+reliable delivery semantics have
<< dendl;
assert(0);
}
+ void ms_deliver_handle_connect(Connection *con) {
+ for (list<Dispatcher*>::iterator p = dispatchers.begin();
+ p != dispatchers.end();
+ p++)
+ (*p)->ms_handle_connect(con);
+ }
void ms_deliver_handle_reset(Connection *con) {
for (list<Dispatcher*>::iterator p = dispatchers.begin();
p != dispatchers.end();
}
Message *m = ls.front();
ls.pop_front();
- if ((long)m == BAD_REMOTE_RESET) {
+ if ((long)m == D_BAD_REMOTE_RESET) {
lock.Lock();
Connection *con = remote_reset_q.front();
remote_reset_q.pop_front();
lock.Unlock();
ms_deliver_handle_remote_reset(con);
con->put();
- } else if ((long)m == BAD_RESET) {
+ } else if ((long)m == D_CONNECT) {
+ lock.Lock();
+ Connection *con = connect_q.front();
+ connect_q.pop_front();
+ lock.Unlock();
+ ms_deliver_handle_connect(con);
+ con->put();
+ } else if ((long)m == D_BAD_RESET) {
lock.Lock();
Connection *con = reset_q.front();
reset_q.pop_front();
backoff = utime_t();
dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
+ for (unsigned i=0; i<rank->local.size(); i++)
+ if (rank->local[i])
+ rank->local[i]->queue_connect(connection_state->get());
+
if (!reader_running) {
dout(20) << "connect starting reader" << dendl;
start_reader();
lock.Unlock();
}
- enum { BAD_REMOTE_RESET, BAD_RESET };
+ enum { D_CONNECT, D_BAD_REMOTE_RESET, D_BAD_RESET };
+ list<Connection*> connect_q;
list<Connection*> remote_reset_q;
list<Connection*> reset_q;
+ void queue_connect(Connection *con) {
+ lock.Lock();
+ connect_q.push_back(con);
+ dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_CONNECT);
+ cond.Signal();
+ lock.Unlock();
+ }
void queue_remote_reset(Connection *con) {
lock.Lock();
remote_reset_q.push_back(con);
- dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_REMOTE_RESET);
+ dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_REMOTE_RESET);
cond.Signal();
lock.Unlock();
}
void queue_reset(Connection *con) {
lock.Lock();
reset_q.push_back(con);
- dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)BAD_RESET);
+ dispatch_queue[CEPH_MSG_PRIO_HIGHEST].push_back((Message*)D_BAD_RESET);
cond.Signal();
lock.Unlock();
}