messenger->add_dispatcher_head(&dispatcher);
rank.start();
- rank.set_default_policy(SimpleMessenger::Policy::lossy_fail_after(1.0));
mc.set_messenger(messenger);
mc.init();
rank.start();
- rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
- rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
- rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
-
// start client
client->init();
if (!m)
return 1;
- rank.set_default_policy(SimpleMessenger::Policy::stateful_server());
- rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
- rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
- rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
+ rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::stateful_server());
+ rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless_peer());
rank.start();
rank.start(); // may daemonize
rank.set_default_policy(SimpleMessenger::Policy::stateless_server());
- rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless());
- rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fail_after(2.0));
- rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fail_after(2.0));
+ rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless_peer());
mon->init();
rank.wait();
return 1;
rank.set_default_policy(SimpleMessenger::Policy::stateless_server());
- rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
- rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
+ rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::client());
+ rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless_peer());
rank.start();
SimpleMessenger rank;
cout << "starting csyn" << std::endl;
- rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(2.0));
- rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
- rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
-
list<Client*> clients;
list<SyntheticClient*> synclients;
client = new Client(rank->register_entity(entity_name_t::CLIENT()), monclient);
rank->start();
- rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
- rank->set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
- rank->set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
client->init();
}
messenger->add_dispatcher_head(this);
- rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
- rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
- rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
- rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossless()); // mds does its own timeout/markdown
-
rank.start(1);
objecter = new Objecter(messenger, &monclient, &osdmap, lock);
<< " sd=" << sd
<< " pgs=" << peer_global_seq
<< " cs=" << connect_seq
- << " ltx=" << policy.lossy_tx
+ << " l=" << policy.lossy
<< ").";
}
// note peer's type, flags
peer_type = connect.host_type;
policy = rank->get_policy(connect.host_type);
- dout(10) << "accept host_type " << connect.host_type
- << ", setting policy, lossy_tx=" << policy.lossy_tx << dendl;
- lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
+ dout(10) << "accept of host_type " << connect.host_type
+ << ", policy.lossy=" << policy.lossy
+ << dendl;
memset(&reply, 0, sizeof(reply));
reply.protocol_version = get_proto_version(rank->my_type, peer_type, false);
<< " <= " << connect.global_seq << ", looks ok" << dendl;
}
- if (existing->policy.lossy_tx) {
- dout(-10) << "accept replacing existing (lossy) channel" << dendl;
+ if (existing->policy.lossy) {
+ dout(-10) << "accept replacing existing (lossy) channel (new one lossy="
+ << policy.lossy << ")" << dendl;
existing->was_session_reset();
goto replace;
}
- if (lossy_rx) {
+ /*if (lossy_rx) {
if (existing->state == STATE_STANDBY) {
dout(-10) << "accept incoming lossy connection, kicking outgoing lossless "
<< existing << dendl;
}
existing->lock.Unlock();
goto fail;
- }
+ }*/
dout(-10) << "accept connect_seq " << connect.connect_seq
<< " vs existing " << existing->connect_seq
reply.global_seq = rank->get_global_seq();
reply.connect_seq = connect_seq;
reply.flags = 0;
- if (policy.lossy_tx)
+ if (policy.lossy)
reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
// ok!
connect.connect_seq = cseq;
connect.protocol_version = get_proto_version(rank->my_type, peer_type, true);
connect.flags = 0;
- if (policy.lossy_tx)
- connect.flags |= CEPH_MSG_CONNECT_LOSSY;
+ if (policy.lossy)
+ connect.flags |= CEPH_MSG_CONNECT_LOSSY; // this is fyi, actually, server decides!
memset(&msg, 0, sizeof(msg));
msgvec[0].iov_base = (char*)&connect;
msgvec[0].iov_len = sizeof(connect);
if (reply.tag == CEPH_MSGR_TAG_READY) {
// hooray!
peer_global_seq = reply.global_seq;
- lossy_rx = reply.flags & CEPH_MSG_CONNECT_LOSSY;
+ policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
state = STATE_OPEN;
connect_seq = cseq + 1;
assert(connect_seq == reply.connect_seq);
first_fault = last_attempt = utime_t();
- dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
+ dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
if (!reader_running) {
dout(20) << "connect starting reader" << dendl;
}
// lossy channel?
- if (policy.lossy_tx) {
+ if (policy.lossy) {
dout(10) << "fault on lossy channel, failing" << dendl;
was_session_reset();
fail();
dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
state = STATE_CLOSED;
} else {
- dout(0) << "fault nothing to send, going to standby" << dendl;
+ dout(0) << "fault with nothing to send, going to standby" << dendl;
state = STATE_STANDBY;
}
return;
utime_t now = g_clock.now();
if (state != STATE_CONNECTING) {
- if (!onconnect) dout(0) << "fault initiating reconnect" << dendl;
+ if (!onconnect)
+ dout(0) << "fault initiating reconnect" << dendl;
connect_seq++;
state = STATE_CONNECTING;
first_fault = now;
} else if (first_fault.sec() == 0) {
- if (!onconnect) dout(0) << "fault first fault" << dendl;
+ if (!onconnect)
+ dout(0) << "fault first fault" << dendl;
first_fault = now;
} else {
+
+#warning clean me up
+
utime_t failinterval = now - first_fault;
utime_t retryinterval = now - last_attempt;
if (!onconnect) dout(10) << "fault failure was " << failinterval
<< " ago, last attempt was at " << last_attempt
<< ", " << retryinterval << " ago" << dendl;
- if (policy.fail_interval > 0 && failinterval > policy.fail_interval) {
- // give up
- dout(0) << "fault giving up" << dendl;
- fail();
- } else if (retryinterval < policy.retry_interval) {
- // wait
- now += (policy.retry_interval - retryinterval);
- dout(10) << "fault waiting until " << now << dendl;
- cond.WaitUntil(lock, now);
- dout(10) << "fault done waiting or woke up" << dendl;
- }
+ // wait
+ now += 1.0;
+ dout(10) << "fault waiting until " << now << dendl;
+ cond.WaitUntil(lock, now);
+ dout(10) << "fault done waiting or woke up" << dendl;
}
last_attempt = now;
}
}
in_seq++;
- if (!lossy_rx && in_seq != m->get_seq()) {
+ if (!policy.lossy && in_seq != m->get_seq()) {
dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
<< " for " << *m << " from " << m->get_source() << dendl;
derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
class SimpleMessenger {
public:
struct Policy {
- bool lossy_tx; //
+ bool lossy;
bool server;
- float retry_interval; // initial retry interval. 0 => fail immediately (lossy_tx=true)
- float fail_interval; // before we call ms_handle_failure (lossy_tx=true)
- bool drop_msg_callback;
- bool fail_callback;
- bool remote_reset_callback;
- Policy() :
- lossy_tx(false), server(false),
- retry_interval(g_conf.ms_retry_interval),
- fail_interval(g_conf.ms_fail_interval),
- drop_msg_callback(true),
- fail_callback(true),
- remote_reset_callback(true) {}
-
- Policy(bool tx, bool sr, float r, float f, bool dmc, bool fc, bool rrc) :
- lossy_tx(tx), server(sr),
- retry_interval(r), fail_interval(f),
- drop_msg_callback(dmc),
- fail_callback(fc),
- remote_reset_callback(rrc) {}
-
- // new
- static Policy stateful_server() { return Policy(false, true, g_conf.ms_retry_interval, 0,
- true, true, true); }
- static Policy stateless_server() { return Policy(true, true, -1, -1,
- true, true, true); }
-
- // old
- static Policy lossless() { return Policy(false, false,
- g_conf.ms_retry_interval, 0,
- true, true, true); }
- static Policy lossy_fail_after(float f) {
- return Policy(true, false,
- MIN(g_conf.ms_retry_interval, f), f,
- true, true, true);
- }
- static Policy lossy_fast_fail() { return Policy(true, false, -1, -1, true, true, true); }
- /*
- static Policy fast_fail() { return Policy(-1, -1, true, true, true); }
- static Policy fail_after(float f) { return Policy(MIN(g_conf.ms_retry_interval, f), f, true, true, true); }
- static Policy retry_forever() { return Policy(g_conf.ms_retry_interval, -1, false, true, true); }
- */
+ Policy(bool l=false, bool s=false) :
+ lossy(l), server(s) {}
+
+ static Policy stateful_server() { return Policy(false, true); }
+ static Policy stateless_server() { return Policy(true, true); }
+ static Policy lossless_peer() { return Policy(false, false); }
+ static Policy client() { return Policy(false, false); }
};
int peer_type;
entity_addr_t peer_addr;
Policy policy;
- bool lossy_rx;
Mutex lock;
int state;