]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: simplify policy
authorSage Weil <sage@newdream.net>
Tue, 13 Oct 2009 05:45:50 +0000 (22:45 -0700)
committerSage Weil <sage@newdream.net>
Tue, 13 Oct 2009 17:27:06 +0000 (10:27 -0700)
We may be a server, and we may be lossy.  This gives us a few policies:

!server, !lossy = lossless_peer(), for bidirectional intracluster fun.
server, !lossy = lossless_server(), e.g. mds <-> client.
server, lossy = lossy_server(), e.g. mon and osd

also, the default is
!server, !lossy = client(), but that doesn't mean much.  The server
decides if the connection is lossy or not.  And !server just means we can
initiate the outgoing connection.

src/ceph.cc
src/cfuse.cc
src/cmds.cc
src/cmon.cc
src/cosd.cc
src/csyn.cc
src/libceph.cc
src/librados.cc
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 7cff7363ed876c5c5b02fe83fe00875faa2f2b4b..753e3eaec75c0aaf0bb8da0826f20daf59eed370 100644 (file)
@@ -617,7 +617,6 @@ int main(int argc, const char **argv, const char *envp[])
   messenger->add_dispatcher_head(&dispatcher);
 
   rank.start();
-  rank.set_default_policy(SimpleMessenger::Policy::lossy_fail_after(1.0));
 
   mc.set_messenger(messenger);
   mc.init();
index 0d35899043727338a54ed1ca7b19421f78399da2..f367df9fec0bf34fa4703369fbc51232d9720bcf 100644 (file)
@@ -75,10 +75,6 @@ int main(int argc, const char **argv, const char *envp[]) {
 
   rank.start();
 
-  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
-  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
-  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
-
   // start client
   client->init();
     
index e572068ceef75961179cf075b7f04cdecc700cdf..6abd9a27b4d9739dd90abc25698cae75d1b9c015 100644 (file)
@@ -78,10 +78,8 @@ int main(int argc, const char **argv)
   if (!m)
     return 1;
 
-  rank.set_default_policy(SimpleMessenger::Policy::stateful_server());
-  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
-  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
-  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
+  rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::stateful_server());
+  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless_peer());
 
   rank.start();
   
index e077f253145bfb2a69576f716818adf8c4ed4d72..ecde44f2e35219a9902a88e70ffb8d82cb6e54a6 100644 (file)
@@ -149,9 +149,7 @@ int main(int argc, const char **argv)
   rank.start();  // may daemonize
 
   rank.set_default_policy(SimpleMessenger::Policy::stateless_server());
-  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless());
-  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fail_after(2.0));
-  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fail_after(2.0));
+  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless_peer());
 
   mon->init();
   rank.wait();
index 8b0ba5d03d3f97207a1443bf005ecb2df6c11110..4336a676474d9442a4ac75eb363589135a8cdc7e 100644 (file)
@@ -145,8 +145,8 @@ int main(int argc, const char **argv)
     return 1;
 
   rank.set_default_policy(SimpleMessenger::Policy::stateless_server());
-  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
+  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::client());
+  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless_peer());
 
   rank.start();
 
index 715c3ec9e6edcadbdf450abc6ebc2ff93906bc29..4e5955d713b4479bdbc14b8d7449839f82cabae5 100644 (file)
@@ -58,10 +58,6 @@ int main(int argc, const char **argv, char *envp[])
   SimpleMessenger rank;
   cout << "starting csyn" << std::endl;
 
-  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(2.0));
-  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
-  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
-
   list<Client*> clients;
   list<SyntheticClient*> synclients;
 
index 678c02563e4d99fffbd9df4a7b5bb8960bdd15bd..ae6dc87a47140d43f930d7f88e2333943eff9b8a 100644 (file)
@@ -54,9 +54,6 @@ extern "C" int ceph_initialize(int argc, const char **argv)
     client = new Client(rank->register_entity(entity_name_t::CLIENT()), monclient);
 
     rank->start();
-    rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
-    rank->set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
-    rank->set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
 
     client->init();
   }
index d1af13dd24734b4e9072a61670a8348ef3401d98..097e3821e8ded112c16e21ce451a33c48d53a9f7 100644 (file)
@@ -290,11 +290,6 @@ bool RadosClient::init()
   
   messenger->add_dispatcher_head(this);
 
-  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
-  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
-  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossless());  // mds does its own timeout/markdown
-
   rank.start(1);
 
   objecter = new Objecter(messenger, &monclient, &osdmap, lock);
index 028d63b0fbc0091c349c1f4654382f9281801c5a..0e5e7a21c86a20683d6c356cb46ce1cd7b0d23f7 100644 (file)
@@ -470,7 +470,7 @@ ostream& SimpleMessenger::Pipe::_pipe_prefix() {
                << " sd=" << sd
                << " pgs=" << peer_global_seq
                << " cs=" << connect_seq
-               << " ltx=" << policy.lossy_tx
+               << " l=" << policy.lossy
                << ").";
 }
 
@@ -598,9 +598,9 @@ int SimpleMessenger::Pipe::accept()
     // note peer's type, flags
     peer_type = connect.host_type;
     policy = rank->get_policy(connect.host_type);
-    dout(10) << "accept host_type " << connect.host_type
-            << ", setting policy, lossy_tx=" << policy.lossy_tx << dendl;
-    lossy_rx = connect.flags & CEPH_MSG_CONNECT_LOSSY;
+    dout(10) << "accept of host_type " << connect.host_type
+            << ", policy.lossy=" << policy.lossy
+            << dendl;
 
     memset(&reply, 0, sizeof(reply));
     reply.protocol_version = get_proto_version(rank->my_type, peer_type, false);
@@ -632,12 +632,13 @@ int SimpleMessenger::Pipe::accept()
                 << " <= " << connect.global_seq << ", looks ok" << dendl;
       }
       
-      if (existing->policy.lossy_tx) {
-       dout(-10) << "accept replacing existing (lossy) channel" << dendl;
+      if (existing->policy.lossy) {
+       dout(-10) << "accept replacing existing (lossy) channel (new one lossy="
+                 << policy.lossy << ")" << dendl;
        existing->was_session_reset();
        goto replace;
       }
-      if (lossy_rx) {
+      /*if (lossy_rx) {
        if (existing->state == STATE_STANDBY) {
          dout(-10) << "accept incoming lossy connection, kicking outgoing lossless "
                    << existing << dendl;
@@ -649,7 +650,7 @@ int SimpleMessenger::Pipe::accept()
        }
        existing->lock.Unlock();
        goto fail;
-      }
+       }*/
 
       dout(-10) << "accept connect_seq " << connect.connect_seq
                << " vs existing " << existing->connect_seq
@@ -762,7 +763,7 @@ int SimpleMessenger::Pipe::accept()
   reply.global_seq = rank->get_global_seq();
   reply.connect_seq = connect_seq;
   reply.flags = 0;
-  if (policy.lossy_tx)
+  if (policy.lossy)
     reply.flags = reply.flags | CEPH_MSG_CONNECT_LOSSY;
 
   // ok!
@@ -922,8 +923,8 @@ int SimpleMessenger::Pipe::connect()
     connect.connect_seq = cseq;
     connect.protocol_version = get_proto_version(rank->my_type, peer_type, true);
     connect.flags = 0;
-    if (policy.lossy_tx)
-      connect.flags |= CEPH_MSG_CONNECT_LOSSY;
+    if (policy.lossy)
+      connect.flags |= CEPH_MSG_CONNECT_LOSSY;  // this is fyi, actually, server decides!
     memset(&msg, 0, sizeof(msg));
     msgvec[0].iov_base = (char*)&connect;
     msgvec[0].iov_len = sizeof(connect);
@@ -995,12 +996,12 @@ int SimpleMessenger::Pipe::connect()
     if (reply.tag == CEPH_MSGR_TAG_READY) {
       // hooray!
       peer_global_seq = reply.global_seq;
-      lossy_rx = reply.flags & CEPH_MSG_CONNECT_LOSSY;
+      policy.lossy = reply.flags & CEPH_MSG_CONNECT_LOSSY;
       state = STATE_OPEN;
       connect_seq = cseq + 1;
       assert(connect_seq == reply.connect_seq);
       first_fault = last_attempt = utime_t();
-      dout(20) << "connect success " << connect_seq << ", lossy_rx = " << lossy_rx << dendl;
+      dout(20) << "connect success " << connect_seq << ", lossy = " << policy.lossy << dendl;
 
       if (!reader_running) {
        dout(20) << "connect starting reader" << dendl;
@@ -1102,7 +1103,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
   }
 
   // lossy channel?
-  if (policy.lossy_tx) {
+  if (policy.lossy) {
     dout(10) << "fault on lossy channel, failing" << dendl;
     was_session_reset();
     fail();
@@ -1117,7 +1118,7 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
       dout(10) << "fault on connect, or already closing, and q empty: setting closed." << dendl;
       state = STATE_CLOSED;
     } else {
-      dout(0) << "fault nothing to send, going to standby" << dendl;
+      dout(0) << "fault with nothing to send, going to standby" << dendl;
       state = STATE_STANDBY;
     }
     return;
@@ -1125,30 +1126,29 @@ void SimpleMessenger::Pipe::fault(bool onconnect, bool onread)
 
   utime_t now = g_clock.now();
   if (state != STATE_CONNECTING) {
-    if (!onconnect) dout(0) << "fault initiating reconnect" << dendl;
+    if (!onconnect)
+      dout(0) << "fault initiating reconnect" << dendl;
     connect_seq++;
     state = STATE_CONNECTING;
     first_fault = now;
   } else if (first_fault.sec() == 0) {
-    if (!onconnect) dout(0) << "fault first fault" << dendl;
+    if (!onconnect)
+      dout(0) << "fault first fault" << dendl;
     first_fault = now;
   } else {
+
+#warning clean me up
+
     utime_t failinterval = now - first_fault;
     utime_t retryinterval = now - last_attempt;
     if (!onconnect) dout(10) << "fault failure was " << failinterval 
                             << " ago, last attempt was at " << last_attempt
                             << ", " << retryinterval << " ago" << dendl;
-    if (policy.fail_interval > 0 && failinterval > policy.fail_interval) {
-      // give up
-      dout(0) << "fault giving up" << dendl;
-      fail();
-    } else if (retryinterval < policy.retry_interval) {
-      // wait
-      now += (policy.retry_interval - retryinterval);
-      dout(10) << "fault waiting until " << now << dendl;
-      cond.WaitUntil(lock, now);
-      dout(10) << "fault done waiting or woke up" << dendl;
-    }
+    // wait
+    now += 1.0;
+    dout(10) << "fault waiting until " << now << dendl;
+    cond.WaitUntil(lock, now);
+    dout(10) << "fault done waiting or woke up" << dendl;
   }
   last_attempt = now;
 }
@@ -1309,7 +1309,7 @@ void SimpleMessenger::Pipe::reader()
       }
       in_seq++;
 
-      if (!lossy_rx && in_seq != m->get_seq()) {
+      if (!policy.lossy && in_seq != m->get_seq()) {
        dout(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
                << " for " << *m << " from " << m->get_source() << dendl;
        derr(0) << "reader got bad seq " << m->get_seq() << " expected " << in_seq
index fe599c105c564eae9e1fd7ce5ae79e4cedfc9481..230af5d55e20bc3f11d98abb1dae881d94f5407c 100644 (file)
@@ -39,50 +39,16 @@ using namespace __gnu_cxx;
 class SimpleMessenger {
 public:
   struct Policy {
-    bool lossy_tx;                // 
+    bool lossy;
     bool server;
-    float retry_interval;         // initial retry interval.  0 => fail immediately (lossy_tx=true)
-    float fail_interval;          // before we call ms_handle_failure (lossy_tx=true)
-    bool drop_msg_callback;
-    bool fail_callback;
-    bool remote_reset_callback;
-    Policy() : 
-      lossy_tx(false), server(false),
-      retry_interval(g_conf.ms_retry_interval),
-      fail_interval(g_conf.ms_fail_interval),
-      drop_msg_callback(true),
-      fail_callback(true),
-      remote_reset_callback(true) {}
-
-    Policy(bool tx, bool sr, float r, float f, bool dmc, bool fc, bool rrc) :
-      lossy_tx(tx), server(sr),
-      retry_interval(r), fail_interval(f),
-      drop_msg_callback(dmc),
-      fail_callback(fc),
-      remote_reset_callback(rrc) {}
-
-    // new
-    static Policy stateful_server() { return Policy(false, true, g_conf.ms_retry_interval, 0,
-                                                   true, true, true); }
-    static Policy stateless_server() { return Policy(true, true, -1, -1,
-                                                    true, true, true); }
-
-    // old
-    static Policy lossless() { return Policy(false, false,
-                                            g_conf.ms_retry_interval, 0,
-                                            true, true, true); }
-    static Policy lossy_fail_after(float f) {
-      return Policy(true, false,
-                   MIN(g_conf.ms_retry_interval, f), f,
-                   true, true, true);
-    }
-    static Policy lossy_fast_fail() { return Policy(true, false, -1, -1, true, true, true); }
 
-    /*
-    static Policy fast_fail() { return Policy(-1, -1, true, true, true); }
-    static Policy fail_after(float f) { return Policy(MIN(g_conf.ms_retry_interval, f), f, true, true, true); }
-    static Policy retry_forever() { return Policy(g_conf.ms_retry_interval, -1, false, true, true); }
-    */
+    Policy(bool l=false, bool s=false) :
+      lossy(l), server(s) {}
+
+    static Policy stateful_server() { return Policy(false, true); }
+    static Policy stateless_server() { return Policy(true, true); }
+    static Policy lossless_peer() { return Policy(false, false); }
+    static Policy client() { return Policy(false, false); }
   };
 
 
@@ -130,7 +96,6 @@ private:
     int peer_type;
     entity_addr_t peer_addr;
     Policy policy;
-    bool lossy_rx;
     
     Mutex lock;
     int state;