]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
uclient: subscribe to mdsmap; strip out some signed_ticket cruft; fix mount
authorSage Weil <sage@newdream.net>
Mon, 31 Aug 2009 20:00:09 +0000 (13:00 -0700)
committerSage Weil <sage@newdream.net>
Mon, 31 Aug 2009 20:00:09 +0000 (13:00 -0700)
src/client/Client.cc
src/client/Client.h
src/librados.cc
src/mon/MonClient.cc
src/mon/MonClient.h
src/osdc/Objecter.cc
src/osdc/Objecter.h

index d3d519a86cd3a81108a151ed505498a278bbc14b..dfbf568f2847b2f565f4e701bcb7e90077d5d086 100644 (file)
@@ -815,7 +815,7 @@ int Client::make_request(MClientRequest *req,
 
       if (!mdsmap->is_active(mds)) {
        dout(10) << "no address for mds" << mds << ", requesting new mdsmap" << dendl;
-       monclient->send_mon_message(new MMDSGetMap(monclient->get_fsid(), mdsmap->get_epoch()));
+       //monclient->send_mon_message(new MMDSGetMap(monclient->get_fsid(), mdsmap->get_epoch()));
        waiting_for_mdsmap.push_back(&cond);
        cond.Wait(client_lock);
 
@@ -1201,6 +1201,8 @@ void Client::handle_mds_map(MMDSMap* m)
 
   delete oldmap;
   delete m;
+
+  monclient->update_sub("mdsmap", mdsmap->get_epoch());
 }
 
 void Client::send_reconnect(int mds)
@@ -2544,9 +2546,6 @@ int Client::mount()
   
   whoami = messenger->get_myname().num();
 
-  signed_ticket = monclient->get_signed_ticket();
-
-  objecter->signed_ticket = signed_ticket;
   objecter->init();
 
   mounted = true;
@@ -2554,6 +2553,9 @@ int Client::mount()
   dout(2) << "mounted: have osdmap " << osdmap->get_epoch() 
          << " and mdsmap " << mdsmap->get_epoch() 
          << dendl;
+
+  monclient->update_sub("mdsmap", mdsmap->get_epoch());
+
   
   // hack: get+pin root inode.
   //  fuse assumes it's always there.
@@ -2736,6 +2738,8 @@ void Client::tick()
   dout(21) << "tick" << dendl;
   tick_event = new C_C_Tick(this);
   timer.add_event_after(g_conf.client_tick_interval, tick_event);
+
+  monclient->tick();
   
   utime_t now = g_clock.now();
 
index e29fa6832783f1b9de8e7c4ac4c686c9e6ea675d..6552c12593e74bc538b3effc25e2cea6cf163b7f 100644 (file)
@@ -753,8 +753,6 @@ public:
   Messenger *messenger;  
   int whoami;
 
-  bufferlist signed_ticket;
-  
   // mds sessions
   map<int, MDSSession> mds_sessions;  // mds -> push seq
   map<int, list<Cond*> > waiting_for_session;
index 839c84282d4ce2d758a18a0ba0117cb9a54b5be5..de5c224ec6e3bc1a7f5aaed27acda4ecabc11ab9 100644 (file)
@@ -304,7 +304,6 @@ bool RadosClient::init()
 
   lock.Lock();
 
-  objecter->signed_ticket = monclient.get_signed_ticket();
   objecter->set_client_incarnation(0);
   objecter->init();
 
index 69f3c60d8dcd337a60091bacbb566aea7ef87716..7a5e0dbf58a28d809a1083fdeb178d278f2c41a5 100644 (file)
@@ -20,6 +20,8 @@
 #include "include/nstring.h"
 
 #include "messages/MClientMountAck.h"
+#include "messages/MMonSubscribe.h"
+#include "messages/MMonSubscribeAck.h"
 #include "common/ConfUtils.h"
 
 #include "MonClient.h"
@@ -161,6 +163,9 @@ bool MonClient::dispatch_impl(Message *m)
     handle_mount_ack((MClientMountAck*)m);
     return true;
 
+  case CEPH_MSG_MON_SUBSCRIBE_ACK:
+    handle_subscribe_ack((MMonSubscribeAck*)m);
+    return true;
   }
 
   return false;
@@ -227,23 +232,10 @@ int MonClient::mount(double mount_timeout)
   }
   mounters++;
 
-  while (signed_ticket.length() == 0 ||
-        (!itsme && !mounted))       // non-doers wait a little longer
+  while (!mounted)
     mount_cond.Wait(monc_lock);
 
-  if (!itsme) {
-    dout(5) << "additional mounter returning" << dendl;
-    assert(mounted);
-    return 0;
-  }
-
-  // finish.
-  timer.cancel_event(mount_timeout_event);
-  mount_timeout_event = 0;
-
-  mounted = true;
-  mount_cond.SignalAll(); // wake up non-doers
-
+  dout(5) << "mount success" << dendl;
   return 0;
 }
 
@@ -257,8 +249,14 @@ void MonClient::handle_mount_ack(MClientMountAck* m)
 
   messenger->reset_myname(entity_name_t::CLIENT(m->client));
 
-  mount_cond.Signal();
+  // finish.
+  timer.cancel_event(mount_timeout_event);
+  mount_timeout_event = 0;
+
+  mounted = true;
 
+  mount_cond.SignalAll();
+  
   delete m;
 }
 
@@ -279,3 +277,64 @@ void MonClient::pick_new_mon()
   messenger->mark_down(monmap.get_inst(oldmon).addr);
   monmap.pick_mon(true);
 }
+
+
+void MonClient::ms_handle_reset(const entity_addr_t& peer)
+{
+  dout(10) << "ms_handle_reset " << peer << dendl;
+  pick_new_mon();
+  renew_subs();
+}
+
+
+// ---------
+
+void MonClient::renew_subs()
+{
+  if (sub_have.empty()) {
+    dout(10) << "renew_subs - empty" << dendl;
+    return;
+  }
+
+  dout(10) << "renew_subs" << dendl;
+
+  if (sub_renew_sent == utime_t())
+    sub_renew_sent = g_clock.now();
+
+  MMonSubscribe *m = new MMonSubscribe;
+  m->what = sub_have;
+  send_mon_message(m);
+}
+
+void MonClient::handle_subscribe_ack(MMonSubscribeAck *m)
+{
+  if (sub_renew_sent != utime_t()) {
+    sub_renew_after = sub_renew_sent;
+    sub_renew_after += 1000.0 * m->interval_ms;
+    dout(10) << "handle_subscribe_ack sent " << sub_renew_sent << " renew after " << sub_renew_after << dendl;
+    sub_renew_sent = utime_t();
+  } else {
+    dout(10) << "handle_subscribe_ack sent " << sub_renew_sent << ", ignoring" << dendl;
+  }
+}
+
+void MonClient::tick()
+{
+  dout(10) << "tick" << dendl;
+
+  if (!mounted)
+    return;
+
+  utime_t now = g_clock.now();
+  static utime_t last_tick;
+
+  if (now - last_tick < 10.0)
+    return;
+  last_tick = now;
+
+  if (now > sub_renew_after)
+    renew_subs();
+
+  int oldmon = monmap.pick_mon();
+  messenger->send_keepalive(monmap.mon_inst[oldmon]);
+}
index 33caa7e1ba4e098723ec107542e58e82ec3e9806..b658db4fde8daa8eee4b0103a3293ddf7459fb04 100644 (file)
@@ -27,6 +27,7 @@
 class MonMap;
 class MMonMap;
 class MClientMountAck;
+class MMonSubscribeAck;
 
 class MonClient : public Dispatcher {
 public:
@@ -36,9 +37,6 @@ private:
 
   entity_addr_t my_addr;
 
-  ClientTicket ticket;
-  bufferlist signed_ticket;
-
   Context *mount_timeout_event;
 
   Mutex monc_lock;
@@ -47,10 +45,11 @@ private:
   int mounters;
   Cond mount_cond, map_cond;
 
-
   bool dispatch_impl(Message *m);
   void handle_monmap(MMonMap *m);
 
+  void ms_handle_reset(const entity_addr_t& peer);
+
  protected:
   class C_MountTimeout : public Context {
     MonClient *client;
@@ -65,13 +64,29 @@ private:
   void _try_mount(double timeout);
   void _mount_timeout(double timeout);
   void handle_mount_ack(MClientMountAck* m);
+
+  // mon subscriptions
+private:
+  map<nstring,version_t> sub_have;  // my subs, and current versions
+  utime_t sub_renew_sent, sub_renew_after;
+
+public:
+  void renew_subs();
+  void update_sub(nstring what, version_t have) {
+    bool had = sub_have.count(what);
+    sub_have[what] = have;
+    if (!had)
+      renew_subs();
+  }
+  void handle_subscribe_ack(MMonSubscribeAck* m);
+
  public:
   MonClient() : messenger(NULL),
+               mount_timeout_event(NULL),
                monc_lock("MonClient::monc_lock"),
                timer(monc_lock) {
     mounted = false;
     mounters = 0;
-    mount_timeout_event = 0;
   }
 
   int build_initial_monmap();
@@ -79,6 +94,8 @@ private:
 
   int mount(double mount_timeout);
 
+  void tick();
+
   void send_mon_message(Message *m, bool new_mon=false);
   void note_mon_leader(int m) {
     monmap.last_mon = m;
@@ -110,8 +127,6 @@ private:
 
   void set_messenger(Messenger *m) { messenger = m; }
 
-  bufferlist& get_signed_ticket() { return signed_ticket; }
-  ClientTicket& get_ticket() { return ticket; }
 
 };
 
index beee81ad25550bdd50fa46d4198f12f684c9340b..57c4df3412ac3676822fd3e1170f76261c5917f4 100644 (file)
@@ -53,6 +53,8 @@ void Objecter::init()
   assert(client_lock.is_locked());  // otherwise event cancellation is unsafe
   timer.add_event_after(g_conf.objecter_tick_interval, new C_Tick(this));
   maybe_request_map();
+  
+  //monc->update_sub("osdmap", 0);
 }
 
 void Objecter::shutdown() 
@@ -190,6 +192,8 @@ void Objecter::handle_osd_map(MOSDMap *m)
   }
 
   delete m;
+
+  //monc->update_sub("osdmap", osdmap->get_epoch());
 }
 
 
@@ -442,7 +446,9 @@ tid_t Objecter::op_submit(Op *op)
     if (op->onack)
       flags |= CEPH_OSD_FLAG_ACK;
 
-    MOSDOp *m = new MOSDOp(signed_ticket, client_inc, op->tid,
+    bufferlist empty_ticket_fixme;
+#warning remove signed ticket ref
+    MOSDOp *m = new MOSDOp(empty_ticket_fixme, client_inc, op->tid,
                           op->oid, op->layout, osdmap->get_epoch(),
                           flags);
 
index 28c947cd5ec01ecf994ec6a4db6aef45a68ff8ee..c516ccadeb35b34e7812b2f566320b9d6871688b 100644 (file)
@@ -177,8 +177,6 @@ class Objecter {
   MonClient *monc;
   OSDMap    *osdmap;
 
-  bufferlist signed_ticket;
-
  
  private:
   tid_t last_tid;