if (!messenger) {
rank = new SimpleMessenger;
rank->bind();
+ rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
messenger = rank->register_entity(entity_name_t::CLIENT(-1));
messenger->set_dispatcher(this);
rank->start(true); // do not daemonize!
rank->lock.Lock();
if (rank->num_local > 0) {
Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING);
- p->policy = rank->default_policy;
p->sd = sd;
p->start_reader();
rank->pipes.insert(p);
<< " sd=" << sd
<< " pgs=" << peer_global_seq
<< " cs=" << connect_seq
+ << " ltx=" << policy.lossy_tx
<< ").";
}
rank->lock.Lock();
// note peer's type, flags
- policy = rank->policy_map[connect.host_type]; /* apply policy */
+ if (rank->policy_map.count(connect.host_type))
+ policy = rank->policy_map[connect.host_type]; /* apply policy */
+ else
+ policy = rank->default_policy;
+ dout(10) << "accept host_type " << connect.host_type
+ << ", setting policy, lossy_tx=" << policy.lossy_tx << dendl;
lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
memset(&reply, 0, sizeof(reply));
} else {
dout(20) << "submit_message " << *m << " dest " << dest << " remote, " << dest_addr << ", have pipe." << dendl;
+ /*
// if this pipe was created by an incoming connection, but we haven't received
// a message yet, then it won't have the policy set.
if (pipe->get_out_seq() == 0)
pipe->policy = policy_map[m->get_dest().type()];
+ */
pipe->_send(m);
pipe->lock.Unlock();