#include "messages/MMonCommand.h"
#include "messages/MAuth.h"
#include "messages/MAuthReply.h"
+#include "messages/MMonGlobalID.h"
+#include "messages/MMonGlobalIDReply.h"
#include "include/str_list.h"
#include "common/Timer.h"
void AuthMonitor::check_rotate()
{
- KeyServerData::Incremental inc;
- inc.op = KeyServerData::AUTH_INC_SET_ROTATING;
- if (!mon->key_server.updated_rotating(inc.rotating_bl, last_rotating_ver))
+ KeyServerData::Incremental rot_inc;
+ rot_inc.op = KeyServerData::AUTH_INC_SET_ROTATING;
+ if (!mon->key_server.updated_rotating(rot_inc.rotating_bl, last_rotating_ver))
return;
dout(0) << "AuthMonitor::tick() updated rotating, now calling propose_pending" << dendl;
- pending_auth.push_back(inc);
+ push_cephx_inc(rot_inc);
propose_pending();
}
string k = g_conf.keys_file;
list<string> ls;
get_str_list(k, ls);
- int r;
+ int r = -1;
for (list<string>::iterator p = ls.begin(); p != ls.end(); p++)
if ((r = bl.read_file(g_conf.keys_file)) >= 0)
break;
string n = iter->first;
if (!n.empty()) {
dout(0) << "read key for entry: " << n << dendl;
- KeyServerData::Incremental inc;
- if (!inc.name.from_str(n)) {
+ KeyServerData::Incremental auth_inc;
+ if (!auth_inc.name.from_str(n)) {
dout(0) << "bad entity name " << n << dendl;
continue;
}
- inc.auth = iter->second;
- inc.op = KeyServerData::AUTH_INC_ADD;
- pending_auth.push_back(inc);
+ auth_inc.auth = iter->second;
+ auth_inc.op = KeyServerData::AUTH_INC_ADD;
+ push_cephx_inc(auth_inc);
}
}
}
}
}
- KeyServerData::Incremental inc;
- inc.op = KeyServerData::AUTH_INC_NOP;
+ max_global_id = MIN_GLOBAL_ID;
+
+ Incremental inc;
+ inc.inc_type = GLOBAL_ID;
+ inc.max_global_id = max_global_id;
pending_auth.push_back(inc);
+
+#if 0
+ KeyServerData::Incremental auth_inc;
+ auth_inc.op = KeyServerData::AUTH_INC_NOP;
+ push_cephx_inc(auth_inc);
+#endif
}
bool AuthMonitor::update_from_paxos()
if (v) {
dout(7) << "update_from_paxos startup: loading summary e" << v << dendl;
bufferlist::iterator p = latest.begin();
+ ::decode(max_global_id, p);
::decode(mon->key_server, p);
}
}
bufferlist::iterator p = bl.begin();
while (!p.end()) {
- KeyServerData::Incremental inc;
+ Incremental inc;
::decode(inc, p);
- mon->key_server.apply_data_incremental(inc);
+ switch (inc.inc_type) {
+ case GLOBAL_ID:
+ {
+ max_global_id = inc.max_global_id;
+ break;
+ }
+ case AUTH_DATA:
+ {
+ KeyServerData::Incremental auth_inc;
+ bufferlist::iterator iter = inc.auth_data.begin();
+ ::decode(auth_inc, iter);
+ mon->key_server.apply_data_incremental(auth_inc);
+ break;
+ }
+ }
}
keys_ver++;
mon->key_server.set_ver(keys_ver);
}
+ if (last_allocated_id == (uint64_t)-1) {
+ last_allocated_id = max_global_id;
+ }
+ dout(0) << "JJJ update_from_paxos() last_allocated_id=" << last_allocated_id << " max_global_id=" << max_global_id << dendl;
+
bufferlist bl;
+ ::encode(max_global_id, bl);
Mutex::Locker l(mon->key_server.get_lock());
::encode(mon->key_server, bl);
paxos->stash_latest(paxosv, bl);
return true;
}
+void AuthMonitor::increase_max_global_id()
+{
+#define GLOBAL_ID_DELTA 100
+ assert(mon->is_leader());
+
+ max_global_id += GLOBAL_ID_DELTA;
+ dout(0) << "JJJ increasing max_global_id to " << max_global_id << dendl;
+ Incremental inc;
+ inc.inc_type = GLOBAL_ID;
+ inc.max_global_id = max_global_id;
+ pending_auth.push_back(inc);
+ propose_pending();
+}
+
void AuthMonitor::init()
{
version_t paxosv = paxos->get_version();
void AuthMonitor::encode_pending(bufferlist &bl)
{
dout(10) << "encode_pending v " << (paxos->get_version() + 1) << dendl;
- for (vector<KeyServerData::Incremental>::iterator p = pending_auth.begin();
+ for (vector<Incremental>::iterator p = pending_auth.begin();
p != pending_auth.end();
p++)
p->encode(bl);
bool AuthMonitor::preprocess_query(PaxosServiceMessage *m)
{
- dout(10) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
+ dout(0) << "preprocess_query " << *m << " from " << m->get_orig_source_inst() << dendl;
switch (m->get_type()) {
case MSG_MON_COMMAND:
return preprocess_command((MMonCommand*)m);
case CEPH_MSG_AUTH:
return preprocess_auth((MAuth *)m);
+ case MSG_MON_GLOBAL_ID:
+ return false;
+
+
default:
assert(0);
delete m;
switch (m->get_type()) {
case MSG_MON_COMMAND:
return prepare_command((MMonCommand*)m);
+ case MSG_MON_GLOBAL_ID:
+ return prepare_global_id((MMonGlobalID*)m);
default:
assert(0);
delete m;
}
+void AuthMonitor::election_finished()
+{
+ last_allocated_id = -1;
+ dout(0) << "AuthMonitor::election_starting" << dendl;
+}
+
+uint64_t AuthMonitor::assign_global_id(MAuth *m)
+{
+ int total_mon = mon->monmap->size();
+ dout(0) << "JJJ AuthMonitor::assign_global_id m=" << *m << " mon=" << mon->whoami << "/" << total_mon << " last_allocated="
+ << last_allocated_id << " max_global_id=" << max_global_id << dendl;
+
+ uint64_t next_global_id = last_allocated_id + 1;
+
+ if (next_global_id < max_global_id) {
+ int reminder = next_global_id % total_mon;
+ if (reminder)
+ reminder = total_mon - reminder;
+ next_global_id += reminder + mon->whoami;
+ dout(0) << "JJJ next_global_id should be " << next_global_id << dendl;
+ }
+
+ while (next_global_id >= max_global_id) {
+ if (!mon->is_leader()) {
+ dout(0) << "JJJ not the leader, forwarding request to the leader" << dendl;
+ int leader = mon->get_leader();
+ MMonGlobalID *req = new MMonGlobalID();
+ req->old_max_id = max_global_id;
+ mon->messenger->send_message(req, mon->monmap->get_inst(leader));
+ paxos->wait_for_commit(new C_RetryMessage(this, m));
+ return 0;
+ } else {
+ dout(0) << "JJJ increasing max_global_id" << dendl;
+ increase_max_global_id();
+ }
+ }
+
+ last_allocated_id = next_global_id;
+
+ return next_global_id;
+}
+
bool AuthMonitor::preprocess_auth(MAuth *m)
{
EntityName entity_name;
if (!s->auth_handler) {
+ uint64_t global_id = assign_global_id(m);
+ if (!global_id)
+ goto done;
+
set<__u32> supported;
try {
if (m->cmd.size() > 1) {
if (m->cmd[1] == "add" && m->cmd.size() >= 2) {
string entity_name;
- KeyServerData::Incremental inc;
+ KeyServerData::Incremental auth_inc;
if (m->cmd.size() >= 3) {
entity_name = m->cmd[2];
- if (!inc.name.from_str(entity_name)) {
+ if (!auth_inc.name.from_str(entity_name)) {
ss << "bad entity name";
rs = -EINVAL;
goto done;
}
for (map<string, EntityAuth>::iterator miter = crypto_map.begin(); miter != crypto_map.end(); ++miter) {
- KeyServerData::Incremental inc;
+ KeyServerData::Incremental auth_inc;
dout(0) << "storing auth for " << entity_name << dendl;
if (miter->first.empty()) {
if (entity_name.empty())
continue;
- inc.name.from_str(entity_name);
+ auth_inc.name.from_str(entity_name);
} else {
string s = miter->first;
- inc.name.from_str(s);
+ auth_inc.name.from_str(s);
}
- inc.auth = miter->second;
- inc.op = KeyServerData::AUTH_INC_ADD;
- pending_auth.push_back(inc);
+ auth_inc.auth = miter->second;
+ auth_inc.op = KeyServerData::AUTH_INC_ADD;
+ push_cephx_inc(auth_inc);
}
ss << "updated";
getline(ss, rs);
return true;
} else if (m->cmd[1] == "del" && m->cmd.size() >= 3) {
string name = m->cmd[2];
- KeyServerData::Incremental inc;
- inc.name.from_str(name);
- if (!mon->key_server.contains(inc.name)) {
+ KeyServerData::Incremental auth_inc;
+ auth_inc.name.from_str(name);
+ if (!mon->key_server.contains(auth_inc.name)) {
ss << "couldn't find entry " << name;
rs = -ENOENT;
goto done;
}
- inc.op = KeyServerData::AUTH_INC_DEL;
- pending_auth.push_back(inc);
+ auth_inc.op = KeyServerData::AUTH_INC_DEL;
+ push_cephx_inc(auth_inc);
ss << "updated";
getline(ss, rs);
return false;
}
+bool AuthMonitor::prepare_global_id(MMonGlobalID *m)
+{
+ dout(0) << "JJJ AuthMonitor::prepare_global_id" << dendl;
+ increase_max_global_id();
+
+ return true;
+}
#include "PaxosService.h"
#include "mon/Monitor.h"
-#include "auth/cephx/CephxKeyServer.h"
+#include "messages/MMonGlobalID.h"
+#include "messages/MMonGlobalIDReply.h"
class MMonCommand;
class MAuth;
class MAuthMon;
+#define MIN_GLOBAL_ID 0x1000
+
class AuthMonitor : public PaxosService {
void auth_usage(stringstream& ss);
- vector<KeyServerData::Incremental> pending_auth;
+ enum IncType {
+ GLOBAL_ID,
+ AUTH_DATA,
+ };
+public:
+ struct Incremental {
+ IncType inc_type;
+ uint64_t max_global_id;
+ uint32_t auth_type;
+ bufferlist auth_data;
+
+ void encode(bufferlist& bl) const {
+ __u32 _type = (__u32)inc_type;
+ ::encode(_type, bl);
+ if (_type == GLOBAL_ID) {
+ ::encode(max_global_id, bl);
+ } else {
+ ::encode(auth_type, bl);
+ ::encode(auth_data, bl);
+ }
+ }
+ void decode(bufferlist::iterator& bl) {
+ __u32 _type;
+ ::decode(_type, bl);
+ inc_type = (IncType)_type;
+ assert(inc_type >= GLOBAL_ID && inc_type <= AUTH_DATA);
+ if (_type == GLOBAL_ID) {
+ ::decode(max_global_id, bl);
+ } else {
+ ::decode(auth_type, bl);
+ ::decode(auth_data, bl);
+ }
+ }
+ };
+
+private:
+ vector<Incremental> pending_auth;
version_t last_rotating_ver;
+ uint64_t max_global_id;
+ uint64_t last_allocated_id;
+
+ void push_cephx_inc(KeyServerData::Incremental& auth_inc) {
+ Incremental inc;
+ inc.inc_type = AUTH_DATA;
+ ::encode(auth_inc, inc.auth_data);
+ inc.auth_type = CEPH_AUTH_CEPHX;
+ pending_auth.push_back(inc);
+ }
void on_active();
+ void election_finished();
void create_initial(bufferlist& bl);
bool update_from_paxos();
void create_pending(); // prepare a new pending
+ bool prepare_global_id(MMonGlobalID *m);
+ void increase_max_global_id();
+ uint64_t assign_global_id(MAuth *m);
void encode_pending(bufferlist &bl); // propose pending update to peers
void committed();
bool preprocess_command(MMonCommand *m);
bool prepare_command(MMonCommand *m);
+ uint64_t assign_next_global_id();
+
void check_rotate();
public:
- AuthMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p), last_rotating_ver(0) {}
+ AuthMonitor(Monitor *mn, Paxos *p) : PaxosService(mn, p), last_rotating_ver(0), max_global_id(-1), last_allocated_id(-1) {}
void pre_auth(MAuth *m);
void tick(); // check state, take actions
void init();
};
+
+WRITE_CLASS_ENCODER(AuthMonitor::Incremental);
+
#endif