]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: server flag to avoid connecting to peer
authorSage Weil <sage@newdream.net>
Tue, 25 Aug 2009 23:23:04 +0000 (16:23 -0700)
committerSage Weil <sage@newdream.net>
Wed, 26 Aug 2009 20:09:09 +0000 (13:09 -0700)
If we have a message and no socket and server=true, just queue it
up.  When they connect to us they'll get it.

src/TODO
src/ceph.cc
src/cmds.cc
src/cmon.cc
src/cosd.cc
src/msg/SimpleMessenger.cc
src/msg/SimpleMessenger.h

index 494a9b5717c4930d0932298c71b6b477e72f0400..e360d960be543264d6c7091836faa67a097eedcf 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -26,6 +26,9 @@ v0.14
 - radosgw
 - uclient: fix write vs max_size?
 
+- msgr: unidirectional option
+- mds: put migration vectors in mdsmap
+
 bugs
 - premature filejournal trimming?
 - weird osd_lock contention during osd restart?
index c9dfc19f23bbcbd0fbdead7a991656c7777b3416..040145114d3c8dd9f7a4bde94cba4a76ff64d20c 100644 (file)
@@ -628,7 +628,7 @@ int main(int argc, const char **argv, const char *envp[])
   messenger->set_dispatcher(&dispatcher);
 
   rank.start();
-  rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
+  rank.set_default_policy(SimpleMessenger::Policy::lossy_fail_after(1.0));
 
   mc.set_messenger(messenger);
   dispatcher.link_dispatcher(&mc);
index 5f6a56099ada312c1eb5aa9f594ade91a29fae31..a3831afa2d31b7414a8f58b5802cedca7bbf41c0 100644 (file)
@@ -78,10 +78,10 @@ int main(int argc, const char **argv)
   if (!m)
     return 1;
 
+  rank.set_default_policy(SimpleMessenger::Policy::stateful_server());
   rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0));
   rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless());
   rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
-  rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossless());  // mds does its own timeout/markdown
 
   rank.start();
   
index d75e861095824a58a549c51af85790b645816986..a4cdc6ddac2902917c2699dd6e97b6fb085501d6 100644 (file)
@@ -143,14 +143,9 @@ int main(int argc, const char **argv)
 
   rank.start();  // may daemonize
 
+  rank.set_default_policy(SimpleMessenger::Policy::stateless_server());
   rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless());
 
-  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_ADMIN, SimpleMessenger::Policy::lossy_fast_fail());
-
-
   mon->init();
   rank.wait();
 
index 68f5fe8f7387dfc8790f4e7093e5f9e49529e79a..f96854a6726f2643678156bef1bb8b08db255c2c 100644 (file)
@@ -143,15 +143,10 @@ int main(int argc, const char **argv)
   if (!hbm)
     return 1;
 
+  rank.set_default_policy(SimpleMessenger::Policy::stateless_server());
   rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
   rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless());
 
-  // make a _reasonable_ effort to send acks/replies to requests, but
-  // don't get carried away, as the sender may go away and we won't
-  // ever hear about it.
-  rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail());
-  rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail());
-
   rank.start();
 
   // start osd
index 542fa3948e4703ff374646dd6e483340148371ce..22978361d0e5c643f81a02d1f2b96f75397c2da7 100644 (file)
@@ -214,6 +214,7 @@ void *SimpleMessenger::Accepter::entry()
       rank->lock.Lock();
       if (rank->num_local > 0) {
        Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING);
+       p->policy = rank->default_policy;
        p->sd = sd;
        p->start_reader();
        rank->pipes.insert(p);
@@ -1370,8 +1371,12 @@ void SimpleMessenger::Pipe::writer()
 
     // connect?
     if (state == STATE_CONNECTING) {
-      connect();
-      continue;
+      if (policy.server) {
+       state = STATE_STANDBY;
+      } else {
+       connect();
+       continue;
+      }
     }
     
     if (state == STATE_CLOSING) {
index c088003b0255353a4c8784a6a80140cd3f229ae2..6af3fc9ade87e64f7a79b0acc7f874498eb404b7 100644 (file)
@@ -40,35 +40,43 @@ class SimpleMessenger {
 public:
   struct Policy {
     bool lossy_tx;                // 
+    bool server;
     float retry_interval;         // initial retry interval.  0 => fail immediately (lossy_tx=true)
     float fail_interval;          // before we call ms_handle_failure (lossy_tx=true)
     bool drop_msg_callback;
     bool fail_callback;
     bool remote_reset_callback;
     Policy() : 
-      lossy_tx(false),
+      lossy_tx(false), server(false),
       retry_interval(g_conf.ms_retry_interval),
       fail_interval(g_conf.ms_fail_interval),
       drop_msg_callback(true),
       fail_callback(true),
       remote_reset_callback(true) {}
 
-    Policy(bool tx, float r, float f, bool dmc, bool fc, bool rrc) :
-      lossy_tx(tx),
+    Policy(bool tx, bool sr, float r, float f, bool dmc, bool fc, bool rrc) :
+      lossy_tx(tx), server(sr),
       retry_interval(r), fail_interval(f),
       drop_msg_callback(dmc),
       fail_callback(fc),
       remote_reset_callback(rrc) {}
 
-    static Policy lossless() { return Policy(false,
+    // new
+    static Policy stateful_server() { return Policy(false, true, g_conf.ms_retry_interval, 0,
+                                                   true, true, true); }
+    static Policy stateless_server() { return Policy(true, true, -1, -1,
+                                                    true, true, true); }
+
+    // old
+    static Policy lossless() { return Policy(false, false,
                                             g_conf.ms_retry_interval, 0,
                                             true, true, true); }
     static Policy lossy_fail_after(float f) {
-      return Policy(true, 
+      return Policy(true, false,
                    MIN(g_conf.ms_retry_interval, f), f,
                    true, true, true);
     }
-    static Policy lossy_fast_fail() { return Policy(true, -1, -1, true, true, true); }
+    static Policy lossy_fast_fail() { return Policy(true, false, -1, -1, true, true, true); }
 
     /*
     static Policy fast_fail() { return Policy(-1, -1, true, true, true); }
@@ -392,6 +400,7 @@ private:
   hash_map<entity_addr_t, Pipe*> rank_pipe;
  
   int my_type;
+  Policy default_policy;
   map<int, Policy> policy_map; // entity_name_t::type -> Policy
 
   set<Pipe*>      pipes;
@@ -439,6 +448,9 @@ public:
   // create a new messenger
   Endpoint *new_entity(entity_name_t addr);
 
+  void set_default_policy(Policy p) {
+    default_policy = p;
+  }
   void set_policy(int type, Policy p) {
     policy_map[type] = p;
   }