]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
auth: manage global id in the auth mon
authorYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 6 Nov 2009 22:30:38 +0000 (14:30 -0800)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Sat, 7 Nov 2009 00:36:10 +0000 (16:36 -0800)
src/messages/MMonGlobalID.h [new file with mode: 0644]
src/mon/AuthMonitor.cc
src/mon/AuthMonitor.h
src/mon/Monitor.cc
src/msg/Message.cc
src/msg/Message.h

diff --git a/src/messages/MMonGlobalID.h b/src/messages/MMonGlobalID.h
new file mode 100644 (file)
index 0000000..038312f
--- /dev/null
@@ -0,0 +1,40 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef __MMONGLOBALID_H
+#define __MMONGLOBALID_H
+
+#include "messages/PaxosServiceMessage.h"
+
+struct MMonGlobalID : public PaxosServiceMessage {
+  uint64_t old_max_id;
+  MMonGlobalID() : PaxosServiceMessage(MSG_MON_GLOBAL_ID, 0), old_max_id(0) { }
+
+  const char *get_type_name() { return "global_id"; }
+  void print(ostream& out) {
+    out << "global_id  (" << old_max_id << ")";
+  }
+
+  void decode_payload() {
+    bufferlist::iterator p = payload.begin();
+    paxos_decode(p);
+    ::decode(old_max_id, p);
+  }
+  void encode_payload() {
+    paxos_encode();
+    ::encode(old_max_id, payload);
+  }
+};
+
+#endif
index 37e5d6503d5c33d2cea8a307d13c4902bc188ff0..650a2cd54bb08ca7ce9fab6656624fe4d49bd2e2 100644 (file)
@@ -20,6 +20,8 @@
 #include "messages/MMonCommand.h"
 #include "messages/MAuth.h"
 #include "messages/MAuthReply.h"
+#include "messages/MMonGlobalID.h"
+#include "messages/MMonGlobalIDReply.h"
 
 #include "include/str_list.h"
 #include "common/Timer.h"
@@ -51,12 +53,12 @@ ostream& operator<<(ostream& out, AuthMonitor& pm)
 
 void AuthMonitor::check_rotate()
 {
-  KeyServerData::Incremental inc;
-  inc.op = KeyServerData::AUTH_INC_SET_ROTATING;
-  if (!mon->key_server.updated_rotating(inc.rotating_bl, last_rotating_ver))
+  KeyServerData::Incremental rot_inc;
+  rot_inc.op = KeyServerData::AUTH_INC_SET_ROTATING;
+  if (!mon->key_server.updated_rotating(rot_inc.rotating_bl, last_rotating_ver))
     return;
   dout(0) << "AuthMonitor::tick() updated rotating, now calling propose_pending" << dendl;
-  pending_auth.push_back(inc);
+  push_cephx_inc(rot_inc);
   propose_pending();
 }
 
@@ -99,7 +101,7 @@ void AuthMonitor::create_initial(bufferlist& bl)
     string k = g_conf.keys_file;
     list<string> ls;
     get_str_list(k, ls);
-    int r;
+    int r = -1;
     for (list<string>::iterator p = ls.begin(); p != ls.end(); p++)
       if ((r = bl.read_file(g_conf.keys_file)) >= 0)
        break;
@@ -118,14 +120,14 @@ void AuthMonitor::create_initial(bufferlist& bl)
           string n = iter->first;
           if (!n.empty()) {
             dout(0) << "read key for entry: " << n << dendl;
-           KeyServerData::Incremental inc;
-            if (!inc.name.from_str(n)) {
+           KeyServerData::Incremental auth_inc;
+            if (!auth_inc.name.from_str(n)) {
               dout(0) << "bad entity name " << n << dendl;
               continue;
             }
-            inc.auth = iter->second; 
-            inc.op = KeyServerData::AUTH_INC_ADD;
-            pending_auth.push_back(inc);
+            auth_inc.auth = iter->second; 
+            auth_inc.op = KeyServerData::AUTH_INC_ADD;
+            push_cephx_inc(auth_inc);
           }
         }
       }
@@ -133,9 +135,18 @@ void AuthMonitor::create_initial(bufferlist& bl)
     }
   }
 
-  KeyServerData::Incremental inc;
-  inc.op = KeyServerData::AUTH_INC_NOP;
+  max_global_id = MIN_GLOBAL_ID;
+
+  Incremental inc;
+  inc.inc_type = GLOBAL_ID;
+  inc.max_global_id = max_global_id;
   pending_auth.push_back(inc);
+  
+#if 0
+  KeyServerData::Incremental auth_inc;
+  auth_inc.op = KeyServerData::AUTH_INC_NOP;
+  push_cephx_inc(auth_inc);
+#endif
 }
 
 bool AuthMonitor::update_from_paxos()
@@ -153,6 +164,7 @@ bool AuthMonitor::update_from_paxos()
     if (v) {
       dout(7) << "update_from_paxos startup: loading summary e" << v << dendl;
       bufferlist::iterator p = latest.begin();
+      ::decode(max_global_id, p);
       ::decode(mon->key_server, p);
     }
   } 
@@ -165,16 +177,36 @@ bool AuthMonitor::update_from_paxos()
 
     bufferlist::iterator p = bl.begin();
     while (!p.end()) {
-      KeyServerData::Incremental inc;
+      Incremental inc;
       ::decode(inc, p);
-      mon->key_server.apply_data_incremental(inc);
+      switch (inc.inc_type) {
+      case GLOBAL_ID:
+        {
+          max_global_id = inc.max_global_id;
+          break;
+        }
+      case AUTH_DATA:
+        {
+          KeyServerData::Incremental auth_inc;
+          bufferlist::iterator iter = inc.auth_data.begin();
+          ::decode(auth_inc, iter);
+          mon->key_server.apply_data_incremental(auth_inc);
+          break;
+        }
+      }
     }
 
     keys_ver++;
     mon->key_server.set_ver(keys_ver);
   }
 
+  if (last_allocated_id == (uint64_t)-1) {
+    last_allocated_id = max_global_id;
+  }
+  dout(0) << "JJJ update_from_paxos() last_allocated_id=" << last_allocated_id << " max_global_id=" << max_global_id << dendl;
+
   bufferlist bl;
+  ::encode(max_global_id, bl);
   Mutex::Locker l(mon->key_server.get_lock());
   ::encode(mon->key_server, bl);
   paxos->stash_latest(paxosv, bl);
@@ -182,6 +214,20 @@ bool AuthMonitor::update_from_paxos()
   return true;
 }
 
+void AuthMonitor::increase_max_global_id()
+{
+#define GLOBAL_ID_DELTA 100
+  assert(mon->is_leader());
+
+  max_global_id += GLOBAL_ID_DELTA;
+  dout(0) << "JJJ increasing max_global_id to " << max_global_id << dendl;
+  Incremental inc;
+  inc.inc_type = GLOBAL_ID;
+  inc.max_global_id = max_global_id;
+  pending_auth.push_back(inc);
+  propose_pending();
+}
+
 void AuthMonitor::init()
 {
   version_t paxosv = paxos->get_version();
@@ -216,7 +262,7 @@ void AuthMonitor::create_pending()
 void AuthMonitor::encode_pending(bufferlist &bl)
 {
   dout(10) << "encode_pending v " << (paxos->get_version() + 1) << dendl;
-  for (vector<KeyServerData::Incremental>::iterator p = pending_auth.begin();
+  for (vector<Incremental>::iterator p = pending_auth.begin();
        p != pending_auth.end();
        p++)
     p->encode(bl);
@@ -224,7 +270,7 @@ void AuthMonitor::encode_pending(bufferlist &bl)
 
 bool AuthMonitor::preprocess_query(PaxosServiceMessage *m)
 {
-  dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
+  dout(0) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
   switch (m->get_type()) {
   case MSG_MON_COMMAND:
     return preprocess_command((MMonCommand*)m);
@@ -232,6 +278,10 @@ bool AuthMonitor::preprocess_query(PaxosServiceMessage *m)
   case CEPH_MSG_AUTH:
     return preprocess_auth((MAuth *)m);
 
+  case MSG_MON_GLOBAL_ID:
+    return false;
+
+
   default:
     assert(0);
     delete m;
@@ -245,6 +295,8 @@ bool AuthMonitor::prepare_update(PaxosServiceMessage *m)
   switch (m->get_type()) {
   case MSG_MON_COMMAND:
     return prepare_command((MMonCommand*)m);
+  case MSG_MON_GLOBAL_ID:
+    return prepare_global_id((MMonGlobalID*)m); 
   default:
     assert(0);
     delete m;
@@ -257,6 +309,48 @@ void AuthMonitor::committed()
 
 }
 
+void AuthMonitor::election_finished()
+{
+  last_allocated_id = -1;
+  dout(0) << "AuthMonitor::election_starting" << dendl;
+}
+
+uint64_t AuthMonitor::assign_global_id(MAuth *m)
+{
+  int total_mon = mon->monmap->size();
+  dout(0) << "JJJ AuthMonitor::assign_global_id m=" << *m << " mon=" << mon->whoami << "/" << total_mon << " last_allocated="
+          << last_allocated_id << " max_global_id=" <<  max_global_id << dendl;
+
+  uint64_t next_global_id = last_allocated_id + 1;
+
+  if (next_global_id < max_global_id) {
+    int reminder = next_global_id % total_mon;
+    if (reminder)
+      reminder = total_mon - reminder;
+    next_global_id += reminder + mon->whoami;
+    dout(0) << "JJJ next_global_id should be " << next_global_id << dendl;
+  }
+
+  while (next_global_id >= max_global_id) {
+    if (!mon->is_leader()) {
+      dout(0) << "JJJ not the leader, forwarding request to the leader" << dendl;
+      int leader = mon->get_leader();
+      MMonGlobalID *req = new MMonGlobalID();
+      req->old_max_id = max_global_id;
+      mon->messenger->send_message(req, mon->monmap->get_inst(leader));
+      paxos->wait_for_commit(new C_RetryMessage(this, m));
+      return 0;
+    } else {
+      dout(0) << "JJJ increasing max_global_id" << dendl;
+      increase_max_global_id();
+    }
+  }
+
+  last_allocated_id = next_global_id;
+
+  return next_global_id;
+}
+
 
 bool AuthMonitor::preprocess_auth(MAuth *m)
 {
@@ -277,6 +371,10 @@ bool AuthMonitor::preprocess_auth(MAuth *m)
     EntityName entity_name;
 
     if (!s->auth_handler) {
+      uint64_t global_id = assign_global_id(m);
+      if (!global_id)
+        goto done;
+
       set<__u32> supported;
       
       try {
@@ -370,10 +468,10 @@ bool AuthMonitor::prepare_command(MMonCommand *m)
   if (m->cmd.size() > 1) {
     if (m->cmd[1] == "add" && m->cmd.size() >= 2) {
       string entity_name;
-      KeyServerData::Incremental inc;
+      KeyServerData::Incremental auth_inc;
       if (m->cmd.size() >= 3) {
         entity_name = m->cmd[2];
-        if (!inc.name.from_str(entity_name)) {
+        if (!auth_inc.name.from_str(entity_name)) {
           ss << "bad entity name";
           rs = -EINVAL;
           goto done;
@@ -393,19 +491,19 @@ bool AuthMonitor::prepare_command(MMonCommand *m)
       }
 
       for (map<string, EntityAuth>::iterator miter = crypto_map.begin(); miter != crypto_map.end(); ++miter) {
-       KeyServerData::Incremental inc;
+       KeyServerData::Incremental auth_inc;
         dout(0) << "storing auth for " << entity_name  << dendl;
         if (miter->first.empty()) {
           if (entity_name.empty())
             continue;
-          inc.name.from_str(entity_name);
+          auth_inc.name.from_str(entity_name);
         } else {
           string s = miter->first;
-          inc.name.from_str(s);
+          auth_inc.name.from_str(s);
         }
-        inc.auth = miter->second;
-        inc.op = KeyServerData::AUTH_INC_ADD;
-        pending_auth.push_back(inc);
+        auth_inc.auth = miter->second;
+        auth_inc.op = KeyServerData::AUTH_INC_ADD;
+        push_cephx_inc(auth_inc);
       }
       ss << "updated";
       getline(ss, rs);
@@ -413,15 +511,15 @@ bool AuthMonitor::prepare_command(MMonCommand *m)
       return true;
     } else if (m->cmd[1] == "del" && m->cmd.size() >= 3) {
       string name = m->cmd[2];
-      KeyServerData::Incremental inc;
-      inc.name.from_str(name);
-      if (!mon->key_server.contains(inc.name)) {
+      KeyServerData::Incremental auth_inc;
+      auth_inc.name.from_str(name);
+      if (!mon->key_server.contains(auth_inc.name)) {
         ss << "couldn't find entry " << name;
         rs = -ENOENT;
         goto done;
       }
-      inc.op = KeyServerData::AUTH_INC_DEL;
-      pending_auth.push_back(inc);
+      auth_inc.op = KeyServerData::AUTH_INC_DEL;
+      push_cephx_inc(auth_inc);
 
       ss << "updated";
       getline(ss, rs);
@@ -444,3 +542,10 @@ done:
   return false;
 }
 
+bool AuthMonitor::prepare_global_id(MMonGlobalID *m)
+{
+  dout(0) << "JJJ AuthMonitor::prepare_global_id" << dendl;
+  increase_max_global_id();
+
+  return true;
+}
index 88b43b43fa7d67312b6ff1ad0c2d3ca3f899ff7e..0f931a7bb11d0a8e1aeae517ccdf3e8d0689c60b 100644 (file)
@@ -24,22 +24,75 @@ using namespace std;
 #include "PaxosService.h"
 #include "mon/Monitor.h"
 
-#include "auth/cephx/CephxKeyServer.h"
+#include "messages/MMonGlobalID.h"
+#include "messages/MMonGlobalIDReply.h"
 
 class MMonCommand;
 class MAuth;
 class MAuthMon;
 
+#define MIN_GLOBAL_ID 0x1000
+
 class AuthMonitor : public PaxosService {
   void auth_usage(stringstream& ss);
-  vector<KeyServerData::Incremental> pending_auth;
+  enum IncType {
+    GLOBAL_ID,
+    AUTH_DATA,
+  };
+public:
+  struct Incremental {
+    IncType inc_type;
+    uint64_t max_global_id;
+    uint32_t auth_type;
+    bufferlist auth_data;
+
+    void encode(bufferlist& bl) const {
+      __u32 _type = (__u32)inc_type;
+      ::encode(_type, bl);
+      if (_type == GLOBAL_ID) {
+       ::encode(max_global_id, bl);
+      } else {
+       ::encode(auth_type, bl);
+       ::encode(auth_data, bl);
+      }
+    }
+    void decode(bufferlist::iterator& bl) {
+      __u32 _type;
+      ::decode(_type, bl);
+      inc_type = (IncType)_type;
+      assert(inc_type >= GLOBAL_ID && inc_type <= AUTH_DATA);
+      if (_type == GLOBAL_ID) {
+       ::decode(max_global_id, bl);
+      } else {
+       ::decode(auth_type, bl);
+       ::decode(auth_data, bl);
+      }
+    }
+  };
+
+private:
+  vector<Incremental> pending_auth;
   version_t last_rotating_ver;
+  uint64_t max_global_id;
+  uint64_t last_allocated_id;
+
+  void push_cephx_inc(KeyServerData::Incremental& auth_inc) {
+    Incremental inc;
+    inc.inc_type = AUTH_DATA;
+    ::encode(auth_inc, inc.auth_data);
+    inc.auth_type = CEPH_AUTH_CEPHX;
+    pending_auth.push_back(inc);
+  }
 
   void on_active();
+  void election_finished();
 
   void create_initial(bufferlist& bl);
   bool update_from_paxos();
   void create_pending();  // prepare a new pending
+  bool prepare_global_id(MMonGlobalID *m);
+  void increase_max_global_id();
+  uint64_t assign_global_id(MAuth *m);
   void encode_pending(bufferlist &bl);  // propose pending update to peers
 
   void committed();
@@ -52,9 +105,11 @@ class AuthMonitor : public PaxosService {
   bool preprocess_command(MMonCommand *m);
   bool prepare_command(MMonCommand *m);
 
+  uint64_t assign_next_global_id();
+
   void check_rotate();
  public:
-  AuthMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p), last_rotating_ver(0) {}
+  AuthMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p), last_rotating_ver(0), max_global_id(-1), last_allocated_id(-1) {}
   void pre_auth(MAuth *m);
   
   void tick();  // check state, take actions
@@ -62,4 +117,7 @@ class AuthMonitor : public PaxosService {
   void init();
 };
 
+
+WRITE_CLASS_ENCODER(AuthMonitor::Incremental);
+
 #endif
index 2981316e361fb4e2a8f40311ce642d7d6bef2994..7a2c85f449de597b844d9ebe5c891400bd53dd77 100644 (file)
@@ -601,6 +601,7 @@ do { \
       break;
 
       // auth
+    case MSG_MON_GLOBAL_ID:
     case CEPH_MSG_AUTH:
       /* no need to check caps here */
       paxos_service[PAXOS_AUTH]->dispatch((PaxosServiceMessage*)m);
index 31cf665641c44c4956108a01240f2c27544e12c8..f10b48e3d3a02642a317612c56583e1928e02eae 100644 (file)
@@ -70,6 +70,7 @@ using namespace std;
 #include "messages/MClientMountAck.h"
 #include "messages/MMonSubscribe.h"
 #include "messages/MMonSubscribeAck.h"
+#include "messages/MMonGlobalID.h"
 #include "messages/MClientSession.h"
 #include "messages/MClientReconnect.h"
 #include "messages/MClientRequest.h"
@@ -326,6 +327,10 @@ Message *decode_message(ceph_msg_header& header, ceph_msg_footer& footer,
     m = new MAuthReply;
     break;
 
+  case MSG_MON_GLOBAL_ID:
+    m = new MMonGlobalID;
+    break; 
+
     // clients
   case CEPH_MSG_CLIENT_MOUNT:
     m = new MClientMount;
index 7906ae1ed9c4ba2f618e695b070e8be6cc3723b1..bb4ea20f9ad9d8f614213e9a91affd190d5d06b3 100644 (file)
@@ -36,6 +36,8 @@
 #define MSG_GETPOOLSTATS           58
 #define MSG_GETPOOLSTATSREPLY      59
 
+#define MSG_MON_GLOBAL_ID          60
+
 #define MSG_POOLOP                 49
 #define MSG_POOLOPREPLY            48