#define dout_subsys ceph_subsys_mds
#undef dout_prefix
-#define dout_prefix *_dout << "mds." << mds->get_nodeid() << ".sessionmap "
+#define dout_prefix *_dout << "mds." << rank << ".sessionmap "
class SessionMapIOContext : public MDSIOContextBase
// -------------------
-void SessionMap::encode(bufferlist& bl) const
+void SessionMapStore::encode(bufferlist& bl) const
{
uint64_t pre = -1; // for 0.19 compatibility; we forgot an encoding prefix.
::encode(pre, bl);
ENCODE_FINISH(bl);
}
-void SessionMap::decode(bufferlist::iterator& p)
+/**
+ * Deserialize sessions, and update by_state index
+ */
+void SessionMap::decode(bufferlist::iterator &p)
+{
+ // Populate `sessions`
+ SessionMapStore::decode(p);
+
+ // Update `by_state`
+ for (ceph::unordered_map<entity_name_t, Session*>::iterator i = session_map.begin();
+ i != session_map.end(); ++i) {
+ Session *s = i->second;
+ if (by_state.count(s->get_state()) == 0)
+ by_state[s->get_state()] = new xlist<Session*>;
+ by_state[s->get_state()]->push_back(&s->item_session_list);
+ }
+}
+
+uint64_t SessionMap::set_state(Session *session, int s) {
+ if (session->state != s) {
+ session->set_state(s);
+ if (by_state.count(s) == 0)
+ by_state[s] = new xlist<Session*>;
+ by_state[s]->push_back(&session->item_session_list);
+ }
+ return session->get_state_seq();
+}
+
+void SessionMapStore::decode(bufferlist::iterator& p)
{
utime_t now = ceph_clock_now(g_ceph_context);
uint64_t pre;
::decode(inst.name, p);
Session *s = get_or_add_session(inst);
if (s->is_closed())
- set_state(s, Session::STATE_OPEN);
+ s->set_state(Session::STATE_OPEN);
s->decode(p);
}
} else {
session_map[s->info.inst.name] = s;
}
- set_state(s, Session::STATE_OPEN);
+ s->set_state(Session::STATE_OPEN);
s->last_cap_renew = now;
}
}
}
-void SessionMap::dump(Formatter *f) const
+void SessionMapStore::dump(Formatter *f) const
{
f->open_array_section("Sessions");
for (ceph::unordered_map<entity_name_t,Session*>::const_iterator p = session_map.begin();
f->close_section(); // Sessions
}
-void SessionMap::generate_test_instances(list<SessionMap*>& ls)
+void SessionMapStore::generate_test_instances(list<SessionMapStore*>& ls)
{
// pretty boring for now
- ls.push_back(new SessionMap(NULL));
+ ls.push_back(new SessionMapStore());
}
void SessionMap::wipe()
_update_human_name();
}
+
int importing_count;
friend class SessionMap;
-
// Human (friendly) name is soft state generated from client metadata
void _update_human_name();
std::string human_name;
public:
+ inline int get_state() const {return state;}
+ void set_state(int new_state)
+ {
+ if (state != new_state) {
+ state = new_state;
+ state_seq++;
+ }
+ }
void decode(bufferlist::iterator &p);
void set_client_metadata(std::map<std::string, std::string> const &meta);
std::string get_human_name() const {return human_name;}
class MDS;
-class SessionMap {
+/**
+ * Encapsulate the serialized state associated with SessionMap. Allows
+ * encode/decode outside of live MDS instance.
+ */
+class SessionMapStore {
public:
- MDS *mds;
-private:
ceph::unordered_map<entity_name_t, Session*> session_map;
+ version_t version;
+ mds_rank_t rank;
+
+ virtual void encode(bufferlist& bl) const;
+ virtual void decode(bufferlist::iterator& blp);
+ void dump(Formatter *f) const;
+
+ void set_rank(mds_rank_t r)
+ {
+ rank = r;
+ }
+
+ Session* get_or_add_session(const entity_inst_t& i) {
+ Session *s;
+ if (session_map.count(i.name)) {
+ s = session_map[i.name];
+ } else {
+ s = session_map[i.name] = new Session;
+ s->info.inst = i;
+ s->last_cap_renew = ceph_clock_now(g_ceph_context);
+ }
+
+ return s;
+ }
+
+ static void generate_test_instances(list<SessionMapStore*>& ls);
+
+ void reset_state()
+ {
+ session_map.clear();
+ }
+
+ SessionMapStore() : version(0), rank(MDS_RANK_NONE) {}
+ virtual ~SessionMapStore() {};
+};
+
+class SessionMap : public SessionMapStore {
public:
- map<int,xlist<Session*>* > by_state;
-
+ MDS *mds;
+
public: // i am lazy
- version_t version, projected, committing, committed;
+ version_t projected, committing, committed;
+ map<int,xlist<Session*>* > by_state;
+ uint64_t set_state(Session *session, int state);
map<version_t, list<MDSInternalContextBase*> > commit_waiters;
-public:
- SessionMap(MDS *m) : mds(m),
- version(0), projected(0), committing(0), committed(0)
+ SessionMap(MDS *m) : mds(m),
+ projected(0), committing(0), committed(0)
{ }
-
- //for the dencoder
- SessionMap() : mds(NULL), version(0), projected(0),
- committing(0), committed(0) {}
-
+
// sessions
+ void decode(bufferlist::iterator& blp);
bool empty() { return session_map.empty(); }
const ceph::unordered_map<entity_name_t, Session*> &get_sessions() const
{
return p->second;
}
}
- Session* get_or_add_session(const entity_inst_t& i) {
- Session *s;
- if (session_map.count(i.name)) {
- s = session_map[i.name];
- } else {
- s = session_map[i.name] = new Session;
- s->info.inst = i;
- s->last_cap_renew = ceph_clock_now(g_ceph_context);
- }
- return s;
- }
+
void add_session(Session *s);
void remove_session(Session *s);
void touch_session(Session *session);
return 0;
return by_state[state]->front();
}
- uint64_t set_state(Session *session, int s) {
- if (session->state != s) {
- session->state = s;
- session->state_seq++;
- if (by_state.count(s) == 0)
- by_state[s] = new xlist<Session*>;
- by_state[s]->push_back(&session->item_session_list);
- }
- return session->state_seq;
- }
+
void dump();
void get_client_set(set<client_t>& s) {
inodeno_t ino;
list<MDSInternalContextBase*> waiting_for_load;
- void encode(bufferlist& bl) const;
- void decode(bufferlist::iterator& blp);
- void dump(Formatter *f) const;
- static void generate_test_instances(list<SessionMap*>& ls);
-
object_t get_object_name();
void load(MDSInternalContextBase *onload);