]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
monclient: add method to retrieve the latest version of a map
authorJosh Durgin <josh.durgin@dreamhost.com>
Thu, 28 Jul 2011 17:30:05 +0000 (10:30 -0700)
committerJosh Durgin <josh.durgin@dreamhost.com>
Fri, 29 Jul 2011 22:38:26 +0000 (15:38 -0700)
Signed-off-by: Josh Durgin <josh.durgin@dreamhost.com>
src/mon/MonClient.cc
src/mon/MonClient.h

index 506627b928213ec1a6b4ce17162f9f3a3d9d2092..e8607f2fab848a7b9cf90b792900c1683c876e8a 100644 (file)
@@ -14,6 +14,8 @@
 
 #include "msg/SimpleMessenger.h"
 #include "messages/MMonGetMap.h"
+#include "messages/MMonGetVersion.h"
+#include "messages/MMonGetVersionReply.h"
 #include "messages/MMonMap.h"
 #include "messages/MAuth.h"
 #include "messages/MAuthReply.h"
@@ -56,7 +58,8 @@ MonClient::MonClient(CephContext *cct_) :
   authenticate_err(0),
   auth(NULL),
   keyring(NULL),
-  rotating_secrets(NULL)
+  rotating_secrets(NULL),
+  version_req_id(0)
 {
 }
 
@@ -272,6 +275,7 @@ bool MonClient::ms_dispatch(Message *m)
   case CEPH_MSG_MON_MAP:
   case CEPH_MSG_AUTH_REPLY:
   case CEPH_MSG_MON_SUBSCRIBE_ACK:
+  case CEPH_MSG_MON_GET_VERSION_REPLY:
     break;
   default:
     return false;
@@ -296,6 +300,8 @@ bool MonClient::ms_dispatch(Message *m)
   case CEPH_MSG_MON_SUBSCRIBE_ACK:
     handle_subscribe_ack((MMonSubscribeAck*)m);
     break;
+  case CEPH_MSG_MON_GET_VERSION_REPLY:
+    handle_get_version_reply((MMonGetVersionReply*)m);
   }
   return true;
 }
@@ -740,4 +746,30 @@ int MonClient::wait_auth_rotating(double timeout)
   return 0;
 }
 
+// ---------
+
+void MonClient::is_latest_map(string map, version_t cur_ver, Context *onfinish)
+{
+  ldout(cct, 10) << "is_latest_map " << map << " current " << cur_ver << dendl;;
+  Mutex::Locker l(monc_lock);
+  MMonGetVersion *m = new MMonGetVersion();
+  m->what = map;
+  m->handle = ++version_req_id;
+  version_requests[m->handle] = new version_req_d(onfinish, cur_ver);
+  _send_mon_message(m);
+}
 
+void MonClient::handle_get_version_reply(MMonGetVersionReply* m)
+{
+  assert(monc_lock.is_locked());
+  map<tid_t, version_req_d*>::iterator iter = version_requests.find(m->handle);
+  if (iter == version_requests.end()) {
+    ldout(cct, 0) << "version request with handle " << m->handle
+                 << " not found" << dendl;
+  } else {
+    version_req_d *req = iter->second;
+    req->context->complete(m->version != req->version);
+    version_requests.erase(iter);
+    delete req;
+  }
+}
index 5ace16290882bdcd3cb496684f521a20f49700f6..d3fa314b543246bb59632a52b2e2aeade4fdf2c5 100644 (file)
@@ -31,6 +31,8 @@
 
 class MonMap;
 class MMonMap;
+class MMonGetVersion;
+class MMonGetVersionReply;
 class MMonSubscribeAck;
 class MAuthReply;
 class MAuthRotating;
@@ -236,7 +238,23 @@ public:
     if (auth)
       auth->add_want_keys(want);
   }
+
+  // version requests
+public:
+  void is_latest_map(string map, version_t cur_ver, Context *onfinish);
+
 private:
+  struct version_req_d {
+    Context *context;
+    version_t version;
+    version_req_d(Context *con, version_t ver) : context(con), version(ver) {}
+  };
+
+  map<tid_t, version_req_d*> version_requests;
+  tid_t version_req_id;
+  void handle_get_version_reply(MMonGetVersionReply* m);
+
+
   MonClient(const MonClient &rhs);
   MonClient& operator=(const MonClient &rhs);
 };