rank->lock.Lock();
{
if (rank->rank_pipe.count(inst.addr) == 0)
- rank->connect_rank(inst.addr, rank->policy_map[inst.name.type()]);
+ rank->connect_rank(inst.addr, rank->get_policy(inst.name.type()));
}
rank->lock.Unlock();
}
rank->lock.Lock();
// note peer's type, flags
- if (rank->policy_map.count(connect.host_type))
- policy = rank->policy_map[connect.host_type]; /* apply policy */
- else
- policy = rank->default_policy;
+ policy = rank->get_policy(connect.host_type);
dout(10) << "accept host_type " << connect.host_type
<< ", setting policy, lossy_tx=" << policy.lossy_tx << dendl;
lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
lock.Lock();
while (state != STATE_CLOSED) {// && state != STATE_WAIT) {
+ dout(10) << "writer: state = " << state << " policy.server=" << policy.server << dendl;
+
// standby?
- if (!q.empty() && state == STATE_STANDBY)
+ if (!q.empty() && state == STATE_STANDBY && !policy.server)
state = STATE_CONNECTING;
// connect?
if (state == STATE_CONNECTING) {
- if (policy.server) {
- state = STATE_STANDBY;
- } else {
- connect();
- continue;
- }
+ connect();
+ continue;
}
if (state == STATE_CLOSING) {
} else {
dout(20) << "submit_message " << *m << " remote, " << dest_addr << ", new pipe." << dendl;
// not connected.
- pipe = connect_rank(dest_proc_addr, policy_map[dest.name.type()]);
+ pipe = connect_rank(dest_proc_addr, get_policy(dest.name.type()));
pipe->send(m);
}
}
void reaper();
+ Policy get_policy(int t) {
+ if (policy_map.count(t))
+ return policy_map[t];
+ else
+ return default_policy;
+ }
+
public:
SimpleMessenger() : accepter(this),
lock("SimpleMessenger::lock"), started(false), need_addr(true),