]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
msgr: change dispatcher interfaces; some monclient cleanups
authorSage Weil <sage@newdream.net>
Tue, 15 Sep 2009 22:02:17 +0000 (15:02 -0700)
committerSage Weil <sage@newdream.net>
Tue, 15 Sep 2009 22:02:17 +0000 (15:02 -0700)
22 files changed:
src/ceph.cc
src/client/Client.cc
src/client/Client.h
src/common/LogClient.h
src/cosd.cc
src/dumpjournal.cc
src/librados.cc
src/mds/MDS.cc
src/mds/MDS.h
src/mon/MonClient.cc
src/mon/MonClient.h
src/mon/Monitor.cc
src/mon/Monitor.h
src/msg/Dispatcher.h
src/msg/Messenger.h
src/msg/SimpleMessenger.cc
src/osd/OSD.cc
src/osd/OSD.h
src/osd/OSDMap.h
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/testmsgr.cc

index f336a2a16a0cfaac8eb6adac30a8e61f2cb799c1..f66f8871b2dc4be4f65a078977257e76ba15a745 100644 (file)
@@ -204,34 +204,28 @@ void handle_notify(MMonObserveNotify *notify)
   map_ver[notify->machine_id] = notify->ver;
 }
 
-static void send_observe_requests(bool);
-static bool is_timeout = false;
+static void send_observe_requests();
 
 class C_ObserverRefresh : public Context {
 public:
   bool newmon;
   C_ObserverRefresh(bool n) : newmon(n) {}
   void finish(int r) {
-    is_timeout = false;
-    send_observe_requests(newmon);
+    send_observe_requests();
   }
 };
 
-static void send_observe_requests(bool newmon)
+static void send_observe_requests()
 {
-  dout(1) << "send_observe_requests " << newmon << dendl;
+  dout(1) << "send_observe_requests " << dendl;
 
-  if (is_timeout)
-    return;
-
-  int mon = mc.monmap.pick_mon(newmon);
   bool sent = false;
   for (int i=0; i<PAXOS_NUM; i++) {
     if (registered.count(i))
       continue;
     MMonObserve *m = new MMonObserve(mc.monmap.fsid, i, map_ver[i]);
-    dout(1) << "mon" << mon << " <- observe " << get_paxos_name(i) << dendl;
-    messenger->send_message(m, mc.monmap.get_inst(mon));
+    dout(1) << "mon" << " <- observe " << get_paxos_name(i) << dendl;
+    mc.send_mon_message(m);
     sent = true;
   }
 
@@ -243,7 +237,7 @@ static void send_observe_requests(bool newmon)
     dout(1) << " refresh after " << seconds << " with same mon" << dendl;
     timer.add_event_after(seconds, new C_ObserverRefresh(false));
   } else {
-    is_timeout = true;
+    //is_timeout = true;
     dout(1) << " refresh after " << retry_seconds << " with new mon" << dendl;
     timer.add_event_after(retry_seconds, new C_ObserverRefresh(true));
   }
@@ -274,15 +268,13 @@ Context *event = 0;
 
 void get_status(bool newmon)
 {
-  int mon = mc.monmap.pick_mon(newmon);
-
   vector<string> vcmd(2);
   vcmd[0] = prefix[which];
   vcmd[1] = "stat";
   
   MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version);
   m->cmd.swap(vcmd);
-  messenger->send_message(m, mc.monmap.get_inst(mon));
+  mc.send_mon_message(m);
 
   event = new C_Refresh;
   timer.add_event_after(.2, event);
@@ -335,6 +327,17 @@ void handle_ack(MMonCommandAck *ack)
   }
 }
 
+void send_command()
+{
+  MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version);
+  m->cmd = pending_cmd;
+  m->get_data() = pending_bl;
+
+  generic_dout(0) << "mon" << " <- " << pending_cmd << dendl;
+  mc.send_mon_message(m);
+}
+
+
 class Admin : public Dispatcher {
   bool ms_dispatch(Message *m) {
     switch (m->get_type()) {
@@ -356,37 +359,21 @@ class Admin : public Dispatcher {
     return true;
   }
 
-  void ms_handle_reset(const entity_addr_t& peer, entity_name_t last) {
-    if (observe) {
-      lock.Lock();
-      send_observe_requests(true);
-      lock.Unlock();
-    }
-  }
-} dispatcher;
-
-void send_command();
+  void ms_handle_failure(Message *m, const entity_addr_t& addr) {}
 
-struct C_Resend : public Context {
-  void finish(int) {
-    mc.monmap.pick_mon(true);  // pick a new mon
-    if (!reply)
+  bool ms_handle_reset(const entity_addr_t& peer) {
+    lock.Lock();
+    if (observe)
+      send_observe_requests();
+    if (pending_cmd.size())
       send_command();
+    lock.Unlock();
+    return true;
   }
-};
-void send_command()
-{
-  MMonCommand *m = new MMonCommand(mc.monmap.fsid, last_seen_version);
-  m->cmd = pending_cmd;
-  m->get_data() = pending_bl;
 
-  int mon = mc.monmap.pick_mon();
-  generic_dout(0) << "mon" << mon << " <- " << pending_cmd << dendl;
-  messenger->send_message(m, mc.monmap.get_inst(mon));
+  void ms_handle_remote_reset(const entity_addr_t& peer) {}
 
-  resend_event = new C_Resend;
-  timer.add_event_after(15.0, resend_event);
-}
+} dispatcher;
 
 int do_command(vector<string>& cmd, bufferlist& bl, string& rs, bufferlist& rbl)
 {
@@ -619,13 +606,14 @@ int main(int argc, const char **argv, const char *envp[])
   SimpleMessenger rank;
   rank.bind();
   messenger = rank.register_entity(entity_name_t::ADMIN());
-  messenger->set_dispatcher(&dispatcher);
+  messenger->add_dispatcher_head(&dispatcher);
 
   rank.start();
   rank.set_default_policy(SimpleMessenger::Policy::lossy_fail_after(1.0));
 
   mc.set_messenger(messenger);
-  dispatcher.link_dispatcher(&mc);
+  mc.init();
+
   if (mc.get_monmap() < 0)
     return -1;
 
@@ -636,7 +624,7 @@ int main(int argc, const char **argv, const char *envp[])
   }
   if (observe) {
     lock.Lock();
-    send_observe_requests(true);
+    send_observe_requests();
     lock.Unlock();
   }
   if (!watch && !observe) {
index bdde2cbefe655ef4a2405fbe6a3af2fb4a5e6f17..a6205e36fb6d5b0a52a0db7d56f59d62445b1da7 100644 (file)
@@ -169,8 +169,6 @@ Client::~Client()
   if (osdmap) { delete osdmap; osdmap = 0; }
   if (mdsmap) { delete mdsmap; mdsmap = 0; }
 
-  unlink_dispatcher(monclient);
-
   if (messenger)
     messenger->destroy();
 }
@@ -256,8 +254,7 @@ void Client::init()
   Mutex::Locker lock(client_lock);
 
   // ok!
-  messenger->set_dispatcher(this);
-  link_dispatcher(monclient);
+  messenger->add_dispatcher_head(this);
 
   monclient->init();
 
@@ -5570,21 +5567,22 @@ int Client::enumerate_layout(int fd, vector<ObjectExtent>& result,
 
 // ===============================
 
-void Client::ms_handle_failure(Message *m, const entity_inst_t& inst)
+void Client::ms_handle_failure(Message *m, const entity_addr_t& addr)
 {
-  entity_name_t dest = inst.name;
-  dout(0) << "ms_handle_failure " << *m << " to " << inst << dendl;
+  dout(0) << "ms_handle_failure " << *m << " to " << addr << dendl;
 }
 
-void Client::ms_handle_reset(const entity_addr_t& addr, entity_name_t last
+bool Client::ms_handle_reset(const entity_addr_t& addr
 {
   dout(0) << "ms_handle_reset on " << addr << dendl;
+  return false;
 }
 
 
-void Client::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last
+void Client::ms_handle_remote_reset(const entity_addr_t& addr) 
 {
-  dout(0) << "ms_handle_remote_reset on " << addr << ", last " << last << dendl;
+  dout(0) << "ms_handle_remote_reset on " << addr << dendl;
+#if 0
   if (last.is_mds()) {
     int mds = last.num();
     dout(0) << "ms_handle_remote_reset on " << last << ", " << mds_sessions[mds].num_caps
@@ -5610,5 +5608,5 @@ void Client::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t las
   }
   else 
     objecter->ms_handle_remote_reset(addr, last);
-
+#endif
 }
index 9119c489531aa8924cdcea1561dad040532291f9..17cb7aceeca4e158a1c50fc3089996fd0de1a3bc 100644 (file)
@@ -976,6 +976,11 @@ protected:
   friend class SyntheticClient;
   bool ms_dispatch(Message *m);
 
+  bool ms_handle_reset(const entity_addr_t& peer);
+  void ms_handle_failure(Message *m, const entity_addr_t& peer);
+  void ms_handle_remote_reset(const entity_addr_t& peer);
+
+
  public:
   Client(Messenger *m, MonClient *mc);
   ~Client();
@@ -1225,11 +1230,6 @@ public:
   int ll_fsync(Fh *fh, bool syncdataonly);
   int ll_release(Fh *fh);
   int ll_statfs(vinodeno_t vino, struct statvfs *stbuf);
-
-  // failure
-  void ms_handle_failure(Message*, const entity_inst_t& inst);
-  void ms_handle_reset(const entity_addr_t& addr, entity_name_t last);
-  void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last);
 };
 
 #endif
index 4ecf66d82a1c1567f4a8d4d6eec89513fc30e089..7eb5402721a8a441daa4e75f6f0dfe07c58192c3 100644 (file)
@@ -36,6 +36,11 @@ class LogClient : public Dispatcher {
   bool is_synchronous;
   void _send_log();
 
+  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+
+
  public:
 
   // -- log --
index f67c54ca024ebd56186956f9684b70011c8a66af..7f9aa23b6ef9627e44e1f25c189b926fbf97e386 100644 (file)
@@ -83,7 +83,7 @@ int main(int argc, const char **argv)
   MonClient mc;
   if (mc.build_initial_monmap() < 0)
     return -1;
-  if (mc.get_monmap() < 0)
+  if (mc.get_monmap_privately() < 0)
     return -1;
 
   if (mkfs) {
index c6d9895bd1c3da63190964c82e197ede39f18174..2c0494cf90b4fc7ac028046bbd85bc2d709376b1 100644 (file)
@@ -62,6 +62,10 @@ class Dumper : public Dispatcher {
     }
     return true;
   }
+  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+
 } dispatcher;
 
 
@@ -92,7 +96,7 @@ int main(int argc, const char **argv, const char *envp[])
   g_conf.daemonize = false; // not us!
   rank.start();
   messenger = rank.register_entity(entity_name_t::ADMIN());
-  messenger->set_dispatcher(&dispatcher);
+  messenger->add_dispatcher_head(&dispatcher);
 
   inodeno_t ino = MDS_INO_LOG_OFFSET + mds;
   unsigned pg_pool = CEPH_METADATA_RULE;
index 56334ab1673a505d8975a263fdac776858a632a2..f4497e2ef9e7bdbe97c838f2fa2863466e8d70eb 100644 (file)
@@ -59,6 +59,10 @@ class RadosClient : public Dispatcher
   bool _dispatch(Message *m);
   bool ms_dispatch(Message *m);
 
+  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+
   Objecter *objecter;
 
   Mutex lock;
@@ -294,8 +298,6 @@ bool RadosClient::init()
 
   rank.start(1);
 
-  monclient.link_dispatcher(this);
-
   objecter = new Objecter(messenger, &monclient, &osdmap, lock);
   if (!objecter)
     return false;
index 97953a3466938ba4d65c45ffb1aafc639e1cb1ef..20d9d3ce45f4b5fcd0e0139d4cb33537d44652ec 100644 (file)
@@ -373,11 +373,11 @@ void MDS::send_message_client(Message *m, entity_inst_t clientinst)
 
 int MDS::init()
 {
-  messenger->set_dispatcher(this);
+  messenger->add_dispatcher_tail(this);
+  messenger->add_dispatcher_head(&logclient);
 
   // get monmap
   monc->set_messenger(messenger);
-  link_dispatcher(monc);
 
   monc->init();
   monc->get_monmap();
@@ -397,9 +397,6 @@ int MDS::init()
   // schedule tick
   reset_tick();
 
-  // i'm ready!
-  link_dispatcher(&logclient);
-
   mds_lock.Unlock();
   return 0;
 }
@@ -1143,7 +1140,6 @@ void MDS::suicide()
   objecter->shutdown();
   
   // shut down messenger
-  unlink_dispatcher(&logclient);
   messenger->shutdown();
 
   monc->shutdown();
@@ -1399,21 +1395,22 @@ bool MDS::_dispatch(Message *m)
 
 
 
-void MDS::ms_handle_failure(Message *m, const entity_inst_t& inst
+void MDS::ms_handle_failure(Message *m, const entity_addr_t& addr
 {
   mds_lock.Lock();
-  dout(0) << "ms_handle_failure to " << inst << " on " << *m << dendl;
+  dout(0) << "ms_handle_failure to " << addr << " on " << *m << dendl;
   mds_lock.Unlock();
 }
 
-void MDS::ms_handle_reset(const entity_addr_t& addr, entity_name_t last
+bool MDS::ms_handle_reset(const entity_addr_t& addr
 {
   dout(0) << "ms_handle_reset on " << addr << dendl;
+  return false;
 }
 
 
-void MDS::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last
+void MDS::ms_handle_remote_reset(const entity_addr_t& addr) 
 {
   dout(0) << "ms_handle_remote_reset on " << addr << dendl;
-  objecter->ms_handle_remote_reset(addr, last);
+  objecter->ms_handle_remote_reset(addr);
 }
index eefa9df439b33c46f179240363b091e4e103be2c..a0be1128da9b6139dcac7940efffa26dd5e55c79 100644 (file)
@@ -303,7 +303,8 @@ class MDS : public Dispatcher {
   int get_req_rate() { return req_rate; }
 
  private:
-  virtual bool ms_dispatch(Message *m);
+  bool ms_dispatch(Message *m);
+
  public:
   MDS(const char *n, Messenger *m, MonClient *mc);
   ~MDS();
@@ -360,9 +361,9 @@ class MDS : public Dispatcher {
   // messages
   bool _dispatch(Message *m);
   
-  void ms_handle_failure(Message *m, const entity_inst_t& inst);
-  void ms_handle_reset(const entity_addr_t& addr, entity_name_t last);
-  void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t last);
+  bool ms_handle_reset(const entity_addr_t& peer);
+  void ms_handle_failure(Message *m, const entity_addr_t& peer);
+  void ms_handle_remote_reset(const entity_addr_t& peer);
 
   // special message types
   void handle_mds_map(class MMDSMap *m);
index 7f35bf3d36131e077fac80751a3cf192d4f010df..064cf2c1c698d3decbc39dc97a2bd6e5f4198851 100644 (file)
@@ -102,11 +102,28 @@ int MonClient::build_initial_monmap()
   return -ENOENT;
 }
 
+
 int MonClient::get_monmap()
 {
   dout(10) << "get_monmap" << dendl;
   Mutex::Locker l(monc_lock);
   
+  want_monmap = true;
+  _reopen_session();
+
+  while (want_monmap)
+    map_cond.Wait(monc_lock);
+
+  dout(10) << "get_monmap done" << dendl;
+
+  return 0;
+}
+
+int MonClient::get_monmap_privately()
+{
+  dout(10) << "get_monmap_privately" << dendl;
+  Mutex::Locker l(monc_lock);
+  
   SimpleMessenger *rank = NULL; 
   bool temp_msgr = false;
   if (!messenger) {
@@ -114,17 +131,17 @@ int MonClient::get_monmap()
     rank->bind();
     rank->set_policy(entity_name_t::TYPE_MON, SimpleMessenger::Policy::lossy_fast_fail());
     messenger = rank->register_entity(entity_name_t::CLIENT(-1));
-    messenger->set_dispatcher(this);
+    messenger->add_dispatcher_head(this);
     rank->start(true);  // do not daemonize!
     temp_msgr = true; 
   }
   
   int attempt = 10;
   int i = 0;
-
   srand(getpid());
+  
   dout(10) << "have " << monmap.epoch << dendl;
-    
+  
   while (monmap.epoch == 0) {
     i = rand() % monmap.mon_inst.size();
     dout(10) << "querying " << monmap.mon_inst[i] << dendl;
@@ -143,7 +160,7 @@ int MonClient::get_monmap()
     messenger->destroy();
     messenger = 0;
   }
-
   if (monmap.epoch)
     return 0;
   return -1;
@@ -174,14 +191,27 @@ bool MonClient::ms_dispatch(Message *m)
   return false;
 }
 
+void MonClient::_finish_hunting()
+{
+  if (hunting) {
+    dout(0) << "found new mon" << cur_mon << dendl; 
+    hunting = false;
+  }
+}
+
 void MonClient::handle_monmap(MMonMap *m)
 {
   dout(10) << "handle_monmap " << *m << dendl;
   monc_lock.Lock();
 
+  _finish_hunting();
+
   bufferlist::iterator p = m->monmapbl.begin();
   ::decode(monmap, p);
+
   map_cond.Signal();
+  want_monmap = false;
+
   monc_lock.Unlock();
   delete m;
 }
@@ -201,7 +231,7 @@ void MonClient::_send_mount()
 void MonClient::init()
 {
   dout(10) << "init" << dendl;
-  messenger->set_dispatcher(this);
+  messenger->add_dispatcher_head(this);
 
   Mutex::Locker l(monc_lock);
   timer.add_event_after(10.0, new C_Tick(this));
@@ -240,7 +270,7 @@ void MonClient::handle_mount_ack(MClientMountAck* m)
 {
   dout(10) << "handle_mount_ack " << *m << dendl;
 
-  hunting = false;
+  _finish_hunting();
 
   // monmap
   bufferlist::iterator p = m->monmap_bl.begin();
@@ -259,42 +289,50 @@ void MonClient::handle_mount_ack(MClientMountAck* m)
 
 void MonClient::_send_mon_message(Message *m)
 {
-  int mon = monmap.pick_mon();
-  messenger->send_message(m, monmap.mon_inst[mon]);
+  messenger->send_message(m, monmap.mon_inst[cur_mon]);
 }
 
 void MonClient::_pick_new_mon()
 {
-  int oldmon = monmap.pick_mon();
-  messenger->mark_down(monmap.get_inst(oldmon).addr);
-  monmap.pick_mon(true);
+  if (cur_mon >= 0)
+    messenger->mark_down(monmap.get_inst(cur_mon).addr);
+  cur_mon = monmap.pick_mon(true);
+  dout(10) << "_pick_new_mon picked mon" << cur_mon << dendl;
 }
 
 
-void MonClient::ms_handle_reset(const entity_addr_t& peer)
+void MonClient::_reopen_session()
 {
-  dout(10) << "ms_handle_reset " << peer << dendl;
-  if (!hunting) {
-    dout(0) << "staring hunt for new mon" << dendl;
-    hunting = true;
-    _pick_new_mon();
-    if (mounting)
-      _send_mount();
+  dout(10) << "_reopen_session" << dendl;
+  _pick_new_mon();
+  if (mounting)
+    _send_mount();
+  if (!sub_have.empty())
     _renew_subs();
+  if (!mounting && sub_have.empty()) {
+    _send_mon_message(new MMonGetMap);
   }
 }
 
+bool MonClient::ms_handle_reset(const entity_addr_t& peer)
+{
+  dout(10) << "ms_handle_reset " << peer << dendl;
+  if (hunting)
+    return true;
+
+  dout(0) << "starting hunt for new mon" << dendl;
+  hunting = true;
+  _reopen_session();
+  return false;
+}
+
 void MonClient::tick()
 {
   dout(10) << "tick" << dendl;
 
   if (hunting) {
     dout(0) << "continuing hunt" << dendl;
-    // try new monitor
-    _pick_new_mon();
-    if (mounting)
-      _send_mount();
-    _renew_subs();
+    _reopen_session();
   } else {
     // just renew as needed
     utime_t now = g_clock.now();
@@ -306,8 +344,6 @@ void MonClient::tick()
   }
 
   timer.add_event_after(10.0, new C_Tick(this));
-  dout(10) << "tick done" << dendl;
-
 }
 
 
@@ -332,7 +368,7 @@ void MonClient::_renew_subs()
 
 void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
 {
-  hunting = false;
+  _finish_hunting();
 
   if (sub_renew_sent != utime_t()) {
     sub_renew_after = sub_renew_sent;
index 6fa102cb74a1ab020a130c13f5e72da2283e855a..74edf8874d85fd2f3249246179e4f0803adbd048 100644 (file)
@@ -36,15 +36,20 @@ public:
 private:
   Messenger *messenger;
 
+  int cur_mon;
+
   entity_addr_t my_addr;
 
   Mutex monc_lock;
   SafeTimer timer;
 
   bool ms_dispatch(Message *m);
-  void handle_monmap(MMonMap *m);
+  bool ms_handle_reset(const entity_addr_t& peer);
 
-  void ms_handle_reset(const entity_addr_t& peer);
+  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+
+  void handle_monmap(MMonMap *m);
 
 
   // monitor session
@@ -59,6 +64,9 @@ private:
   };
   void tick();
 
+  // monclient
+  bool want_monmap;
+
   // mount
 private:
   client_t clientid;
@@ -67,6 +75,8 @@ private:
   Cond mount_cond, map_cond;
   utime_t mount_started;
 
+  void _finish_hunting();
+  void _reopen_session();
   void _pick_new_mon();
   void _send_mon_message(Message *m);
   void _send_mount();
@@ -113,7 +123,7 @@ public:
   }
 
  public:
-  MonClient() : messenger(NULL),
+  MonClient() : messenger(NULL), cur_mon(-1),
                monc_lock("MonClient::monc_lock"),
                timer(monc_lock),
                hunting(false),
@@ -127,6 +137,7 @@ public:
 
   int build_initial_monmap();
   int get_monmap();
+  int get_monmap_privately();
 
   void send_mon_message(Message *m) {
     Mutex::Locker l(monc_lock);
index 9bedd6949af024042fd0facdcf804cd731f66e8c..37c0ebd32a2f7a44a45a481c7dadcdf7239eda1b 100644 (file)
@@ -125,8 +125,8 @@ void Monitor::init()
   logclient.set_synchronous(true);
   
   // i'm ready!
-  messenger->set_dispatcher(this);
-  link_dispatcher(&logclient);
+  messenger->add_dispatcher_tail(this);
+  messenger->add_dispatcher_head(&logclient);
   
   // start ticker
   reset_tick();
@@ -159,8 +159,6 @@ void Monitor::shutdown()
   timer.cancel_all();
   timer.join();  
 
-  unlink_dispatcher(&logclient);
-  
   // die.
   messenger->shutdown();
 }
index 09d26ffd0b60d12c5617f6b1d5731b051d1f7cb1..83281b54ccc5e2899c99e82ad44ff685fb348fa5 100644 (file)
@@ -167,6 +167,9 @@ public:
 
  private:
   bool ms_dispatch(Message *m);
+  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(const entity_addr_t& peer) {}
 
  public:
   Monitor(int w, MonitorStore *s, Messenger *m, MonMap *map);
index da7298a94bf8f38d7fe20bf1a116317fff24359f..a999a856f5441844cff88b785c31ba0af903a2b5 100644 (file)
 class Messenger;
 
 class Dispatcher {
-  Dispatcher *next;
-
-  // how i receive messages
-  virtual bool ms_dispatch(Message *m) = 0;
-  bool _ms_deliver_dispatch(Message *m) {
-    bool ret = false;
-    if (next)
-      ret = next->_ms_deliver_dispatch(m);
-    if (!ret)
-      ret = ms_dispatch(m);
-    return ret;
-  }
-public:
-  void ms_deliver_dispatch(Message *m) {
-    if (!_ms_deliver_dispatch(m)) {
-      generic_dout(0) << "unhandled message " << m << " " << *m
-                     << " from " << m->get_orig_source_inst()
-                     << dendl;
-      assert(0);
-    }
-  }
-
-  void ms_deliver_handle_reset(const entity_addr_t& peer) {
-    ms_handle_reset(peer);
-    if (next)
-      next->ms_handle_reset(peer);
-  }
-  void ms_deliver_handle_remote_reset(const entity_addr_t& peer) {
-    ms_handle_remote_reset(peer);
-    if (next)
-      next->ms_handle_remote_reset(peer);
-  }
-  void ms_deliver_handle_failure(Message *m, const entity_addr_t& peer) {
-    ms_handle_failure(m, peer);
-    if (next)
-      next->ms_handle_failure(m, peer);
-  }
-
-
 public:
   virtual ~Dispatcher() { }
-  Dispatcher() : next(NULL) { }
+  Dispatcher() { }
 
-  virtual void link_dispatcher(Dispatcher *disp) {
-    if (!next) {
-      next = disp;
-    } else {
-      next->link_dispatcher(disp);
-    }
-  }
-  virtual void unlink_dispatcher(Dispatcher *disp) {
-    assert(next);
-    if (next == disp)
-      next = next->next;
-    else
-      next->unlink_dispatcher(disp);
-  }
+  // how i receive messages
+  virtual bool ms_dispatch(Message *m) = 0;
 
   // how i deal with transmission failures.
-  virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) {  }
+  virtual void ms_handle_failure(Message *m, const entity_addr_t& addr) = 0;
 
   /*
    * on any connection reset.
    * this indicates that the ordered+reliable delivery semantics have 
    * been violated.  messages may have been lost.
    */
-  virtual void ms_handle_reset(const entity_addr_t& peer) { }
+  virtual bool ms_handle_reset(const entity_addr_t& peer) = 0;
 
   // on deliberate reset of connection by remote
   //  implies incoming messages dropped; possibly/probably some of our previous outgoing too.
-  virtual void ms_handle_remote_reset(const entity_addr_t& peer) { }
+  virtual void ms_handle_remote_reset(const entity_addr_t& peer) = 0;
 };
 
 #endif
index 14136e8abc494cd89da137213512bd5dc9f8a27b..c41c2d0663c84155f3d4cec80139f910af737d54 100644 (file)
@@ -34,7 +34,7 @@ class Timer;
 
 class Messenger {
  private:
-  Dispatcher          *dispatcher;
+  list<Dispatcher*> dispatchers;
 
 protected:
   entity_name_t _my_name;
@@ -43,8 +43,7 @@ protected:
   atomic_t nref;
 
  public:
-  Messenger(entity_name_t w) : dispatcher(0), 
-                              default_send_priority(CEPH_MSG_PRIO_DEFAULT),
+  Messenger(entity_name_t w) : default_send_priority(CEPH_MSG_PRIO_DEFAULT),
                               nref(1) {
     _my_name = w;
   }
@@ -77,20 +76,52 @@ protected:
   virtual int get_dispatch_queue_len() { return 0; };
 
   // setup
-  void set_dispatcher(Dispatcher *d) { 
-    if (!dispatcher) {
-      dispatcher = d; 
-      ready(); 
-    }
+  void add_dispatcher_head(Dispatcher *d) { 
+    bool first = dispatchers.empty();
+    dispatchers.push_front(d);
+    if (first)
+      ready();
   }
-  Dispatcher *get_dispatcher() { return dispatcher; }
+  void add_dispatcher_tail(Dispatcher *d) { 
+    bool first = dispatchers.empty();
+    dispatchers.push_back(d);
+    if (first)
+      ready();
+  }
+
   virtual void ready() { }
-  bool is_ready() { return dispatcher != 0; }
+  bool is_ready() { return !dispatchers.empty(); }
 
   // dispatch incoming messages
-  virtual void dispatch(Message *m) {
-    assert(dispatcher);
-    dispatcher->ms_deliver_dispatch(m);
+  void ms_deliver_dispatch(Message *m) {
+    for (list<Dispatcher*>::iterator p = dispatchers.begin();
+        p != dispatchers.end();
+        p++)
+      if ((*p)->ms_dispatch(m))
+       return;
+    generic_dout(0) << "unhandled message " << m << " " << *m
+                   << " from " << m->get_orig_source_inst()
+                   << dendl;
+    assert(0);
+  }
+  void ms_deliver_handle_reset(const entity_addr_t& peer) {
+    for (list<Dispatcher*>::iterator p = dispatchers.begin();
+        p != dispatchers.end();
+        p++)
+      if ((*p)->ms_handle_reset(peer))
+       return;
+  }
+  void ms_deliver_handle_remote_reset(const entity_addr_t& peer) {
+    for (list<Dispatcher*>::iterator p = dispatchers.begin();
+        p != dispatchers.end();
+        p++)
+      (*p)->ms_handle_remote_reset(peer);
+  }
+  void ms_deliver_handle_failure(Message *m, const entity_addr_t& peer) {
+    for (list<Dispatcher*>::iterator p = dispatchers.begin();
+        p != dispatchers.end();
+        p++)
+      (*p)->ms_handle_failure(m, peer);
   }
 
   // shutdown
index 2c1955c3b0b7eb53d7fe972d48509f854bce1013..ca7c092c260f4ea89dbbcb7ecc7d2bbaab4dbfd4 100644 (file)
@@ -287,20 +287,20 @@ void SimpleMessenger::Endpoint::dispatch_entry()
            entity_addr_t a = remote_reset_q.front();
            remote_reset_q.pop_front();
            lock.Unlock();
-           get_dispatcher()->ms_deliver_handle_remote_reset(a);
+           ms_deliver_handle_remote_reset(a);
          } else if ((long)m == BAD_RESET) {
            lock.Lock();
            entity_addr_t a = reset_q.front();
            reset_q.pop_front();
            lock.Unlock();
-           get_dispatcher()->ms_deliver_handle_reset(a);
+           ms_deliver_handle_reset(a);
          } else if ((long)m == BAD_FAILED) {
            lock.Lock();
            m = failed_q.front().first;
            entity_addr_t a = failed_q.front().second;
            failed_q.pop_front();
            lock.Unlock();
-           get_dispatcher()->ms_deliver_handle_failure(m, a);
+           ms_deliver_handle_failure(m, a);
            m->put();
          } else {
            dout(1) << "<== " << m->get_source_inst()
@@ -312,7 +312,7 @@ void SimpleMessenger::Endpoint::dispatch_entry()
                    << " " << m->get_footer().data_crc << ")"
                    << " " << m 
                    << dendl;
-           dispatch(m);
+           ms_deliver_dispatch(m);
            dout(20) << "done calling dispatch on " << m << dendl;
          }
         }
@@ -1170,7 +1170,7 @@ void SimpleMessenger::Pipe::fail()
   report_failures();
 
   for (unsigned i=0; i<rank->local.size(); i++) 
-    if (rank->local[i] && rank->local[i]->get_dispatcher())
+    if (rank->local[i])
       rank->local[i]->queue_reset(peer_addr);
 
   // unregister
@@ -1188,7 +1188,7 @@ void SimpleMessenger::Pipe::was_session_reset()
   dout(10) << "was_session_reset" << dendl;
   report_failures();
   for (unsigned i=0; i<rank->local.size(); i++) 
-    if (rank->local[i] && rank->local[i]->get_dispatcher())
+    if (rank->local[i])
       rank->local[i]->queue_remote_reset(peer_addr);
 
   out_seq = 0;
index 62cbd8e9c2afc4374f27c33b50ee6a6dee3c9240..0d462a80e3b7c32500ba8abcb90b842ec876556d 100644 (file)
@@ -415,9 +415,10 @@ int OSD::init()
 
   
   // i'm ready!
-  messenger->set_dispatcher(this);
-  link_dispatcher(&logclient);
-  heartbeat_messenger->set_dispatcher(&heartbeat_dispatcher);
+  messenger->add_dispatcher_head(this);
+  messenger->add_dispatcher_head(&logclient);
+
+  heartbeat_messenger->add_dispatcher_head(&heartbeat_dispatcher);
   
   monc->init();
 
@@ -533,7 +534,6 @@ int OSD::shutdown()
   }
   pg_map.clear();
 
-  unlink_dispatcher(&logclient);
   messenger->shutdown();
   if (heartbeat_messenger)
     heartbeat_messenger->shutdown();
@@ -1613,18 +1613,6 @@ void OSD::_dispatch(Message *m)
 }
 
 
-void OSD::ms_handle_failure(Message *m, const entity_inst_t& inst)
-{
-  entity_name_t dest = inst.name;
-
-  dout(1) << "ms_handle_failure " << inst << " on " << *m << dendl;
-  if (g_conf.ms_die_on_failure)
-    assert(0);
-}
-
-
-
-
 void OSD::handle_scrub(MOSDScrub *m)
 {
   dout(10) << "handle_scrub " << *m << dendl;
index b9052abf89687b976d36133d231f01b18e2ea8b2..5a1b88d3b775f0e33cb26c7903e8e30cd3a927e4 100644 (file)
@@ -207,6 +207,9 @@ public:
     bool ms_dispatch(Message *m) {
       return osd->heartbeat_dispatch(m);
     };
+    bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+    void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+    void ms_handle_remote_reset(const entity_addr_t& peer) {}
   public:
     OSD *osd;
     HeartbeatDispatcher(OSD *o) : osd(o) {}
@@ -812,7 +815,11 @@ protected:
   } remove_wq;
 
  private:
-  virtual bool ms_dispatch(Message *m);
+  bool ms_dispatch(Message *m);
+  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(const entity_addr_t& peer) {}
+
  public:
   OSD(int id, Messenger *m, Messenger *hbm, MonClient *mc, const char *dev = 0, const char *jdev = 0);
   ~OSD();
@@ -827,9 +834,6 @@ protected:
   int init();
   int shutdown();
 
-  // messages
-  virtual void ms_handle_failure(Message *m, const entity_inst_t& inst);
-
   void reply_op_error(MOSDOp *op, int r);
 
   void handle_scrub(class MOSDScrub *m);
index b223b2f5b067e66a505b4b1af769b778e8799772..013d30d38431189b8fb04c775368308f5a84cf41 100644 (file)
@@ -368,6 +368,14 @@ private:
   bool is_out(int osd) { return !exists(osd) || get_weight(osd) == CEPH_OSD_OUT; }
   bool is_in(int osd) { return exists(osd) && !is_out(osd); }
   
+  bool have_addr(const entity_addr_t& addr) const {
+    for (vector<entity_addr_t>::const_iterator p = osd_addr.begin();
+        p != osd_addr.end();
+        p++)
+      if (*p == addr)
+       return true;
+    return false;
+  }
   bool have_inst(int osd) {
     return exists(osd) && is_up(osd); 
   }
index 22d617f017d04b953554edaeb722533c07d723e5..a1da74a30b5e186fef1612d8e671d0cb5e3e72da 100644 (file)
@@ -934,28 +934,10 @@ void Objecter::_sg_read_finish(vector<ObjectExtent>& extents, vector<bufferlist>
 }
 
 
-void Objecter::ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest)
+void Objecter::ms_handle_remote_reset(const entity_addr_t& addr)
 {
-  entity_inst_t inst;
-  inst.name = dest;
-  inst.addr = addr;
-
-  if (dest.is_osd()) {
-    if (!osdmap->have_inst(dest.num()) ||
-       (osdmap->get_inst(dest.num()) != inst)) {
-      dout(0) << "ms_handle_remote_reset " << dest << " inst " << inst
-             << ", ignoring, already have newer osdmap" << dendl;
-    } else {
-      // kick requests
-      set<pg_t> changed_pgs;
-      dout(0) << "ms_handle_remote_reset " << dest << dendl;
-      scan_pgs_for(changed_pgs, dest.num());
-      if (!changed_pgs.empty()) {
-       dout(0) << "ms_handle_remote_reset " << dest << " kicking " << changed_pgs << dendl;
-       kick_requests(changed_pgs);
-      }
-    }
-  }
+  if (osdmap->have_addr(addr))
+    maybe_request_map();
 }
 
 
index 2bbe29d3343ef124b352a629b63067a99a2e35b1..8c3f3e6eb9dd8be323e27998a0233480e593ec7d 100644 (file)
@@ -697,7 +697,7 @@ public:
     }
   }
 
-  void ms_handle_remote_reset(const entity_addr_t& addr, entity_name_t dest);
+  void ms_handle_remote_reset(const entity_addr_t& addr);
 
 };
 
index 0359881034a50c1bfd3ded2edf0efecebe85f369..368d34dd7b3abf3efa90462f6d3dde4cb21ef493 100644 (file)
@@ -57,8 +57,9 @@ class Admin : public Dispatcher {
     return true;
   }
 
-  void ms_handle_failure(Message *m, const entity_inst_t& inst) { 
-  }
+  bool ms_handle_reset(const entity_addr_t& peer) { return false; }
+  void ms_handle_failure(Message *m, const entity_addr_t& peer) { }
+  void ms_handle_remote_reset(const entity_addr_t& peer) {}
 
 } dispatcher;
 
@@ -92,7 +93,7 @@ int main(int argc, const char **argv, const char *envp[]) {
   // start monitor
   messenger = rank.register_entity(entity_name_t::MON(whoami));
   messenger->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
-  messenger->set_dispatcher(&dispatcher);
+  messenger->add_dispatcher_head(&dispatcher);
 
   rank.start();