From: Sage Weil Date: Tue, 25 Aug 2009 23:23:04 +0000 (-0700) Subject: msgr: server flag to avoid connecting to peer X-Git-Tag: v0.14~122 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=0cfc905e150cebf90f1a93dd67630e9dfef6c9c7;p=ceph.git msgr: server flag to avoid connecting to peer If we have a message and no socket and server=true, just queue it up. When they connect to us they'll get it. --- diff --git a/src/TODO b/src/TODO index 494a9b5717c4..e360d960be54 100644 --- a/src/TODO +++ b/src/TODO @@ -26,6 +26,9 @@ v0.14 - radosgw - uclient: fix write vs max_size? +- msgr: unidirectional option +- mds: put migration vectors in mdsmap + bugs - premature filejournal trimming? - weird osd_lock contention during osd restart? diff --git a/src/ceph.cc b/src/ceph.cc index c9dfc19f23bb..040145114d3c 100644 --- a/src/ceph.cc +++ b/src/ceph.cc @@ -628,7 +628,7 @@ int main(int argc, const char **argv, const char *envp[]) messenger->set_dispatcher(&dispatcher); rank.start(); - rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0)); + rank.set_default_policy(SimpleMessenger::Policy::lossy_fail_after(1.0)); mc.set_messenger(messenger); dispatcher.link_dispatcher(&mc); diff --git a/src/cmds.cc b/src/cmds.cc index 5f6a56099ada..a3831afa2d31 100644 --- a/src/cmds.cc +++ b/src/cmds.cc @@ -78,10 +78,10 @@ int main(int argc, const char **argv) if (!m) return 1; + rank.set_default_policy(SimpleMessenger::Policy::stateful_server()); rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fail_after(1.0)); rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossless()); rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossless()); // mds does its own timeout/markdown rank.start(); diff --git a/src/cmon.cc b/src/cmon.cc index d75e86109582..a4cdc6ddac29 100644 --- a/src/cmon.cc +++ b/src/cmon.cc @@ -143,14 +143,9 @@ int main(int argc, const char **argv) rank.start(); // may daemonize + rank.set_default_policy(SimpleMessenger::Policy::stateless_server()); rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossless()); - rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_ADMIN, SimpleMessenger::Policy::lossy_fast_fail()); - - mon->init(); rank.wait(); diff --git a/src/cosd.cc b/src/cosd.cc index 68f5fe8f7387..f96854a6726f 100644 --- a/src/cosd.cc +++ b/src/cosd.cc @@ -143,15 +143,10 @@ int main(int argc, const char **argv) if (!hbm) return 1; + rank.set_default_policy(SimpleMessenger::Policy::stateless_server()); rank.set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail()); rank.set_policy(entity_name_t::TYPE_OSD, SimpleMessenger::Policy::lossless()); - // make a _reasonable_ effort to send acks/replies to requests, but - // don't get carried away, as the sender may go away and we won't - // ever hear about it. - rank.set_policy(entity_name_t::TYPE_MDS, SimpleMessenger::Policy::lossy_fast_fail()); - rank.set_policy(entity_name_t::TYPE_CLIENT, SimpleMessenger::Policy::lossy_fast_fail()); - rank.start(); // start osd diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 542fa3948e47..22978361d0e5 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -214,6 +214,7 @@ void *SimpleMessenger::Accepter::entry() rank->lock.Lock(); if (rank->num_local > 0) { Pipe *p = new Pipe(rank, Pipe::STATE_ACCEPTING); + p->policy = rank->default_policy; p->sd = sd; p->start_reader(); rank->pipes.insert(p); @@ -1370,8 +1371,12 @@ void SimpleMessenger::Pipe::writer() // connect? if (state == STATE_CONNECTING) { - connect(); - continue; + if (policy.server) { + state = STATE_STANDBY; + } else { + connect(); + continue; + } } if (state == STATE_CLOSING) { diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index c088003b0255..6af3fc9ade87 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -40,35 +40,43 @@ class SimpleMessenger { public: struct Policy { bool lossy_tx; // + bool server; float retry_interval; // initial retry interval. 0 => fail immediately (lossy_tx=true) float fail_interval; // before we call ms_handle_failure (lossy_tx=true) bool drop_msg_callback; bool fail_callback; bool remote_reset_callback; Policy() : - lossy_tx(false), + lossy_tx(false), server(false), retry_interval(g_conf.ms_retry_interval), fail_interval(g_conf.ms_fail_interval), drop_msg_callback(true), fail_callback(true), remote_reset_callback(true) {} - Policy(bool tx, float r, float f, bool dmc, bool fc, bool rrc) : - lossy_tx(tx), + Policy(bool tx, bool sr, float r, float f, bool dmc, bool fc, bool rrc) : + lossy_tx(tx), server(sr), retry_interval(r), fail_interval(f), drop_msg_callback(dmc), fail_callback(fc), remote_reset_callback(rrc) {} - static Policy lossless() { return Policy(false, + // new + static Policy stateful_server() { return Policy(false, true, g_conf.ms_retry_interval, 0, + true, true, true); } + static Policy stateless_server() { return Policy(true, true, -1, -1, + true, true, true); } + + // old + static Policy lossless() { return Policy(false, false, g_conf.ms_retry_interval, 0, true, true, true); } static Policy lossy_fail_after(float f) { - return Policy(true, + return Policy(true, false, MIN(g_conf.ms_retry_interval, f), f, true, true, true); } - static Policy lossy_fast_fail() { return Policy(true, -1, -1, true, true, true); } + static Policy lossy_fast_fail() { return Policy(true, false, -1, -1, true, true, true); } /* static Policy fast_fail() { return Policy(-1, -1, true, true, true); } @@ -392,6 +400,7 @@ private: hash_map rank_pipe; int my_type; + Policy default_policy; map policy_map; // entity_name_t::type -> Policy set pipes; @@ -439,6 +448,9 @@ public: // create a new messenger Endpoint *new_entity(entity_name_t addr); + void set_default_policy(Policy p) { + default_policy = p; + } void set_policy(int type, Policy p) { policy_map[type] = p; }