]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
clientmap replaced with sessionmap; untested
authorSage Weil <sage@newdream.net>
Mon, 31 Dec 2007 22:52:26 +0000 (14:52 -0800)
committerSage Weil <sage@newdream.net>
Mon, 31 Dec 2007 22:52:26 +0000 (14:52 -0800)
src/mds/LogSegment.h
src/mds/MDCache.cc
src/mds/MDCache.h
src/mds/MDS.cc
src/mds/MDS.h
src/mds/Migrator.cc
src/mds/Server.cc
src/mds/Server.h
src/mds/SessionMap.h
src/mds/journal.cc

index c4cf1d50897ff3ebdde33bb512cd87a80f0d2c61..4723960c79363ede49908ac1089b1da03965f480 100644 (file)
@@ -54,7 +54,7 @@ class LogSegment {
 
   // table version
   version_t allocv;
-  version_t clientmapv;
+  version_t sessionmapv;
   version_t anchortablev;
 
   // try to expire
@@ -62,7 +62,7 @@ class LogSegment {
 
   // cons
   LogSegment(off_t off) : offset(off), end(off), num_events(0),
-                         allocv(0), clientmapv(0), anchortablev(0) 
+                         allocv(0), sessionmapv(0), anchortablev(0) 
   { }
 };
 
index af4dfdec9ce03528fc6250a8b5a9f3ae5d5dbeee..bf5947df107f5c9a98c2c08d861143fdd0607305 100644 (file)
@@ -1252,7 +1252,7 @@ void MDCache::handle_resolve(MMDSResolve *m)
     for (list<metareqid_t>::iterator p = m->slave_requests.begin();
         p != m->slave_requests.end();
         ++p) {
-      if (mds->clientmap.have_completed_request(*p)) {
+      if (mds->sessionmap.have_completed_request(*p)) {
        // COMMIT
        dout(10) << " ambiguous slave request " << *p << " will COMMIT" << dendl;
        ack->add_commit(*p);
@@ -2664,7 +2664,9 @@ void MDCache::rejoin_import_cap(CInode *in, int client, inode_caps_reconnect_t&
                                              in->client_caps[client].wanted());
   
   reap->set_mds( frommds ); // reap from whom?
-  mds->messenger->send_message(reap, mds->clientmap.get_inst(client));
+  Session *session = mds->sessionmap.get_session(entity_name_t::CLIENT(client));
+  assert(session);
+  mds->messenger->send_message(reap, session->inst);
 }
 
 void MDCache::rejoin_send_acks()
index 35c324f150686970f5c4e8f29ed27395e431f2c0..59996c9733c6f67bc34500131a807124ea418101 100644 (file)
@@ -39,6 +39,7 @@ class Renamer;
 class Logger;
 
 class Message;
+class Session;
 
 class MMDSResolve;
 class MMDSResolveAck;
@@ -80,6 +81,7 @@ struct PVList {
  */
 struct MDRequest {
   metareqid_t reqid;
+  Session *session;
 
   // -- i am a client (master) request
   MClientRequest *client_request; // client request (if any)
@@ -116,6 +118,8 @@ struct MDRequest {
   bool committing;
   bool aborted;
 
+  // break rarely-used fields into a separately allocated structure 
+  // to save memory for most ops
   struct More {
     set<int> slaves;           // mds nodes that have slave requests to me (implies client_request)
     set<int> waiting_on_slave; // peers i'm waiting for slavereq replies from. 
@@ -148,19 +152,19 @@ struct MDRequest {
 
   // ---------------------------------------------------
   MDRequest() : 
-    client_request(0), ref(0), 
+    session(0), client_request(0), ref(0), 
     slave_request(0), slave_to_mds(-1), 
     ls(0),
     done_locking(false), committing(false), aborted(false),
     _more(0) {}
   MDRequest(metareqid_t ri, MClientRequest *req) : 
-    reqid(ri), client_request(req), ref(0), 
+    reqid(ri), session(0), client_request(req), ref(0), 
     slave_request(0), slave_to_mds(-1), 
     ls(0),
     done_locking(false), committing(false), aborted(false),
     _more(0) {}
   MDRequest(metareqid_t ri, int by) : 
-    reqid(ri), client_request(0), ref(0),
+    reqid(ri), session(0), client_request(0), ref(0),
     slave_request(0), slave_to_mds(by), 
     ls(0),
     done_locking(false), committing(false), aborted(false),
index 28ffc0d755ed9072f49a9bcf0c7cacc7e06cbb20..cabca2dd284a42072a093ea19b94027dea7bfd20 100644 (file)
@@ -71,7 +71,7 @@
 // cons/des
 MDS::MDS(int whoami, Messenger *m, MonMap *mm) : 
   timer(mds_lock), 
-  clientmap(this), sessionmap(this) {
+  sessionmap(this) {
 
   this->whoami = whoami;
 
@@ -281,7 +281,7 @@ void MDS::send_message_client(Message *m, int client)
 {
   version_t seq = sessionmap.inc_push_seq(client);
   dout(10) << "send_message_client client" << client << " seq " << seq << " " << *m << dendl;
-  messenger->send_message(m, sessionmap.get_inst(entity_name_t::CLIENT(client)));
+  messenger->send_message(m, sessionmap.get_session(entity_name_t::CLIENT(client))->inst);
 }
 
 void MDS::send_message_client(Message *m, entity_inst_t clientinst)
@@ -653,12 +653,12 @@ void MDS::bcast_mds_map()
   dout(7) << "bcast_mds_map " << mdsmap->get_epoch() << dendl;
 
   // share the map with mounted clients
-  set<entity_name_t> clients;
-  sessionmap.get_session_set(clients);
-  for (set<entity_name_t>::const_iterator p = clients.begin();
+  set<Session*> clients;
+  sessionmap.get_client_session_set(clients);
+  for (set<Session*>::const_iterator p = clients.begin();
        p != clients.end();
        ++p) 
-    messenger->send_message(new MMDSMap(mdsmap), sessionmap.get_inst(*p));
+    messenger->send_message(new MMDSMap(mdsmap), (*p)->inst);
   last_client_mdsmap_bcast = mdsmap->get_epoch();
 }
 
@@ -754,7 +754,7 @@ void MDS::boot_create()
   idalloc->reset();
   idalloc->save(fin->new_sub());
 
-  // write empty clientmap
+  // write empty sessionmap
   sessionmap.save(fin->new_sub());
   
   // fixme: fake out anchortable
index f0f5316fc6eb650c15ac56b6fb7ac19210bff95f..5a69301e681b0bb6320ab2e3a956a608006148b4 100644 (file)
@@ -200,7 +200,6 @@ class MDS : public Dispatcher {
   void     reset_tick();
 
   // -- client map --
-  ClientMap    clientmap;
   SessionMap   sessionmap;
   epoch_t      last_client_mdsmap_bcast;
   //void log_clientmap(Context *c);
index 78cd7fcfcbcc0820a264ce836a260465258a87dd..5565d541bdf038a4d6d81c0a6b285aa7f5495dbc 100644 (file)
@@ -885,7 +885,7 @@ void Migrator::encode_export_inode_caps(CInode *in, bufferlist& bl,
   for (map<int, Capability>::iterator it = in->client_caps.begin();
        it != in->client_caps.end();
        it++) 
-    exported_client_map[it->first] = mds->clientmap.get_inst(it->first);
+    exported_client_map[it->first] = mds->sessionmap.get_inst(entity_name_t::CLIENT(it->first));
 }
 
 void Migrator::finish_export_inode_caps(CInode *in)
@@ -2276,7 +2276,7 @@ void Migrator::handle_export_caps(MExportCaps *ex)
    */
 
   C_M_LoggedImportCaps *finish = new C_M_LoggedImportCaps(this, in, ex->get_source().num());
-  ESessions *le = new ESessions(mds->clientmap.inc_projected());
+  ESessions *le = new ESessions(++mds->sessionmap.projected);
 
   // decode new caps
   bufferlist::iterator blp = ex->cap_bl.begin();
index 602f3d2662c967c6031830687d945687c7138b76..2e5ae550cc2079871d33bd374b2ffcbcf493e2bc 100644 (file)
@@ -123,15 +123,15 @@ void Server::dispatch(Message *m)
 
 class C_MDS_session_finish : public Context {
   MDS *mds;
-  entity_inst_t client_inst;
+  Session *session;
   bool open;
   version_t cmapv;
 public:
-  C_MDS_session_finish(MDS *m, entity_inst_t ci, bool s, version_t mv) :
-    mds(m), client_inst(ci), open(s), cmapv(mv) { }
+  C_MDS_session_finish(MDS *m, Session *se, bool s, version_t mv) :
+    mds(m), session(se), open(s), cmapv(mv) { }
   void finish(int r) {
     assert(r == 0);
-    mds->server->_session_logged(client_inst, open, cmapv);
+    mds->server->_session_logged(session, open, cmapv);
   }
 };
 
@@ -139,41 +139,38 @@ public:
 void Server::handle_client_session(MClientSession *m)
 {
   dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
-  int from = m->get_source().num();
   bool open = false;
   Session *session = mds->sessionmap.get_session(m->get_source());
+  assert(m->get_source().is_client()); // should _not_ come from an mds!
 
   switch (m->op) {
   case MClientSession::OP_REQUEST_OPEN:
     open = true;
-    if (!session) {
-      dout(10) << "already open, dropping this req" << dendl;
+    if (session && (session->is_opening() || session->is_open())) {
+      dout(10) << "already open|opening, dropping this req" << dendl;
       delete m;
       return;
     }
-    if (mds->clientmap.is_opening(from)) {
-      dout(10) << "already opening, dropping this req" << dendl;
-      delete m;
-      return;
-    }
-    mds->clientmap.add_opening(from);
+    assert(!session);  // ?
+    session = mds->sessionmap.get_or_add_session(m->get_source());
+    session->inst = m->get_source_inst();
+    session->state = Session::STATE_OPENING;
     break;
 
   case MClientSession::OP_REQUEST_CLOSE:
-    if (mds->clientmap.is_closing(from)) {
-      dout(10) << "already closing, dropping this req" << dendl;
+    if (!session || session->is_closing()) {
+      dout(10) << "already closing|dne, dropping this req" << dendl;
       delete m;
       return;
     }
-    if (m->seq < mds->clientmap.get_push_seq(from)) {
-      dout(10) << "old push seq " << m->seq << " < " << mds->clientmap.get_push_seq(from
+    if (m->seq < session->get_push_seq()) {
+      dout(10) << "old push seq " << m->seq << " < " << session->get_push_seq(
               << ", dropping" << dendl;
       delete m;
       return;
     }
-    assert(m->seq == mds->clientmap.get_push_seq(from));
-
-    mds->clientmap.add_closing(from);
+    assert(m->seq == session->get_push_seq());
+    session->state = Session::STATE_CLOSING;
     break;
 
   default:
@@ -181,64 +178,76 @@ void Server::handle_client_session(MClientSession *m)
   }
 
   // journal it
-  version_t cmapv = mds->clientmap.inc_projected();
-  dout(10) << " clientmap v " << mds->clientmap.get_version() << " pv " << cmapv << dendl;
-  mdlog->submit_entry(new ESession(m->get_source_inst(), open, cmapv),
-                     new C_MDS_session_finish(mds, m->get_source_inst(), open, cmapv));
+  version_t pv = ++mds->sessionmap.projected;
+  dout(10) << " sessionmap v " << mds->sessionmap.version << " pv " << pv << dendl;
+  mdlog->submit_entry(new ESession(m->get_source_inst(), open, pv),
+                     new C_MDS_session_finish(mds, session, open, pv));
   delete m;
 
   if (logger) logger->inc("hcsess");
 }
 
-void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cmapv)
+void Server::_session_logged(Session *session, bool open, version_t pv)
 {
-  dout(10) << "_session_logged " << client_inst << " " << (open ? "open":"close")
-          << " " << cmapv 
+  dout(10) << "_session_logged " << session->inst << " " << (open ? "open":"close")
+          << " " << pv 
           << dendl;
 
   // apply
-  int from = client_inst.name.num();
   if (open) {
-    assert(mds->clientmap.is_opening(from));
-    mds->clientmap.open_session(client_inst);
-  } else if (mds->clientmap.is_closing(from)) {
-    mds->clientmap.close_session(from);
-    
-    // purge completed requests from clientmap
-    mds->clientmap.trim_completed_requests(client_inst.name, 0);
+    assert(session->is_opening());
+    session->state = Session::STATE_OPEN;
+    mds->sessionmap.version++;
+  } else if (session->is_closing()) {
+    mds->sessionmap.remove_session(session);
+    mds->sessionmap.version++;
   } else {
     // close must have been canceled (by an import?) ...
     assert(!open);
-    mds->clientmap.noop();
+    mds->sessionmap.version++;  // noop
   }
   
-  assert(cmapv == mds->clientmap.get_version());
+  assert(pv == mds->sessionmap.version);
   
   // reply
   if (open) 
-    mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), client_inst);
+    mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), session->inst);
   else
-    mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), client_inst);
+    mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE), session->inst);
 }
 
 void Server::prepare_force_open_sessions(map<int,entity_inst_t>& cm)
 {
-  version_t cmapv = mds->clientmap.inc_projected();
-  dout(10) << "prepare_force_open_sessions " << cmapv 
+  /* 
+   * FIXME !
+   */
+  version_t pv = ++mds->sessionmap.projected;
+  dout(10) << "prepare_force_open_sessions " << pv 
           << " on " << cm.size() << " clients"
           << dendl;
   for (map<int,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
-    mds->clientmap.add_opening(p->first);
+    Session *session = mds->sessionmap.get_or_add_session(p->second.name);
+    if (session->state == Session::STATE_UNDEF ||
+       session->state == Session::STATE_CLOSING)
+      session->state = Session::STATE_OPENING;
   }
 }
 
 void Server::finish_force_open_sessions(map<int,entity_inst_t>& cm)
 {
-  version_t v = mds->clientmap.get_version();
-  dout(10) << "finish_force_open_sessions on " << cm.size() << " clients, v " << v << " -> " << (v+1) << dendl;
+  /*
+   * FIXME: need to carefully consider the race conditions between a
+   * client trying to close a session and an MDS doing an import
+   * trying to force open a session...  
+   */
+  dout(10) << "finish_force_open_sessions on " << cm.size() << " clients,"
+          << " v " << mds->sessionmap.version << " -> " << (mds->sessionmap.version+1) << dendl;
   for (map<int,entity_inst_t>::iterator p = cm.begin(); p != cm.end(); ++p) {
-    if (mds->clientmap.is_closing(p->first)) {
-      dout(15) << "force_open_sessions canceling close on " << p->second << dendl;
+    Session *session = mds->sessionmap.get_session(p->second.name);
+    assert(session);
+    /*
+    if (session->is_closing()) {
+      dout(15) << "force_open_sessions canceling close on " << session->inst << dendl;
       mds->clientmap.remove_closing(p->first);
       continue;
     }
@@ -246,12 +255,14 @@ void Server::finish_force_open_sessions(map<int,entity_inst_t>& cm)
       dout(15) << "force_open_sessions have session " << p->second << dendl;
       continue;
     }
-    
-    dout(10) << "force_open_sessions opening " << p->second << dendl;
-    mds->clientmap.open_session(p->second);
-    mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), p->second);
+    */
+    if (session->is_opening()) {
+      dout(10) << "force_open_sessions opening " << session->inst << dendl;
+      session->state = Session::STATE_OPEN;
+      mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN), session->inst);
+    }
   }
-  mds->clientmap.set_version(v+1);
+  mds->sessionmap.version++;
 }
 
 
@@ -260,35 +271,34 @@ void Server::terminate_sessions()
   dout(2) << "terminate_sessions" << dendl;
 
   // kill them off.  clients will retry etc.
-  for (set<int>::const_iterator p = mds->clientmap.get_session_set().begin();
-       p != mds->clientmap.get_session_set().end();
+  set<Session*> sessions;
+  mds->sessionmap.get_client_session_set(sessions);
+  for (set<Session*>::const_iterator p = sessions.begin();
+       p != sessions.end();
        ++p) {
-    if (mds->clientmap.is_closing(*p)) 
-      continue;
-    mds->clientmap.add_closing(*p);
-    version_t cmapv = mds->clientmap.inc_projected();
-    mdlog->submit_entry(new ESession(mds->clientmap.get_inst(*p), false, cmapv),
-                       new C_MDS_session_finish(mds, mds->clientmap.get_inst(*p), false, cmapv));
+    Session *session = *p;
+    if (session->is_closing()) continue;
+    session->state = Session::STATE_CLOSING;
+    version_t pv = ++mds->sessionmap.projected;
+    mdlog->submit_entry(new ESession(session->inst, false, pv),
+                       new C_MDS_session_finish(mds, session, false, pv));
   }
 }
 
 
 void Server::reconnect_clients()
 {
-  // reconnect with clients
-  if (mds->clientmap.get_session_set().empty()) {
+  mds->sessionmap.get_client_set(client_reconnect_gather);
+
+  if (client_reconnect_gather.empty()) {
     dout(7) << "reconnect_clients -- no sessions, doing nothing." << dendl;
     reconnect_gather_finish();
     return;
   }
   
   dout(7) << "reconnect_clients -- sending mdsmap to clients with sessions" << dendl;
-  
   mds->bcast_mds_map();  // send mdsmap to all client sessions
-
-  // init gather list
   reconnect_start = g_clock.now();
-  client_reconnect_gather = mds->clientmap.get_session_set();
   dout(1) << "reconnect_clients -- " << client_reconnect_gather.size() << " sessions" << dendl;
 }
 
@@ -296,15 +306,14 @@ void Server::handle_client_reconnect(MClientReconnect *m)
 {
   dout(7) << "handle_client_reconnect " << m->get_source() << dendl;
   int from = m->get_source().num();
+  Session *session = mds->sessionmap.get_session(m->get_source());
 
   if (m->closed) {
-    dout(7) << " client had no session, removing from clientmap" << dendl;
-
-    mds->clientmap.add_closing(from);
-    version_t cmapv = mds->clientmap.inc_projected();
-    mdlog->submit_entry(new ESession(mds->clientmap.get_inst(from), false, cmapv),
-                       new C_MDS_session_finish(mds, mds->clientmap.get_inst(from), false, cmapv));
-
+    dout(7) << " client had no session, removing from session map" << dendl;
+    assert(session);  // ?
+    version_t pv = ++mds->sessionmap.projected;
+    mdlog->submit_entry(new ESession(session->inst, false, pv),
+                       new C_MDS_session_finish(mds, session, false, pv));
   } else {
     
     // caps
@@ -439,9 +448,9 @@ void Server::reply_request(MDRequest *mdr, MClientReply *reply, CInode *tracei)
           << " (" << strerror(-reply->get_result())
           << ") " << *req << dendl;
 
-  // note result code in clientmap?
+  // note result code in session map?
   if (!req->is_idempotent())
-    mds->clientmap.add_completed_request(mdr->reqid);
+    mdr->session->add_completed_request(mdr->reqid.tid);
 
   /*
   if (tracei && !tracei->hack_accessed) {
@@ -512,8 +521,9 @@ void Server::handle_client_request(MClientRequest *req)
   }
 
   // active session?
+  Session *session = 0;
   if (req->get_client_inst().name.is_client() && 
-      !mds->clientmap.have_session(req->get_client_inst().name.num())) {
+      !(session = mds->sessionmap.get_session(req->get_client_inst().name))) {
     dout(5) << "no session for " << req->get_client_inst().name << ", dropping" << dendl;
     delete req;
     return;
@@ -530,7 +540,8 @@ void Server::handle_client_request(MClientRequest *req)
 
   // retry?
   if (req->get_retry_attempt()) {
-    if (mds->clientmap.have_completed_request(req->get_reqid())) {
+    assert(session);
+    if (session->have_completed_request(req->get_reqid().tid)) {
       dout(5) << "already completed " << req->get_reqid() << dendl;
       mds->messenger->send_message(new MClientReply(req, 0), req->get_client_inst());
       delete req;
@@ -540,13 +551,13 @@ void Server::handle_client_request(MClientRequest *req)
   // trim completed_request list
   if (req->get_oldest_client_tid() > 0) {
     dout(15) << " oldest_client_tid=" << req->get_oldest_client_tid() << dendl;
-    mds->clientmap.trim_completed_requests(req->get_client_inst().name, 
-                                          req->get_oldest_client_tid());
+    session->trim_completed_requests(req->get_oldest_client_tid());
   }
 
   // register + dispatch
   MDRequest *mdr = mdcache->request_start(req);
   if (!mdr) return;
+  mdr->session = session;
 
   if (ref) {
     dout(10) << "inode op on ref " << *ref << dendl;
index d2252f33df7bcaaee7ab74e93d5a6d8da2d1a96d..27d3651764a704b1802f73d735835f483e5800b5 100644 (file)
@@ -56,7 +56,7 @@ public:
   set<CInode*> reconnected_caps;
 
   void handle_client_session(class MClientSession *m);
-  void _session_logged(entity_inst_t ci, bool open, version_t cmapv);
+  void _session_logged(Session *session, bool open, version_t cmapv);
   void prepare_force_open_sessions(map<int,entity_inst_t> &cm);
   void finish_force_open_sessions(map<int,entity_inst_t> &cm);
   void terminate_sessions();
index c4aa9472459b451fbbb8071d14659f2e8c49eb1c..d3e8cb0dc1b9ad9463b1de1d8fa630fd6aeb2634 100644 (file)
@@ -28,20 +28,23 @@ using __gnu_cxx::hash_map;
 class CInode;
 
 class Session {
-  // -- state --
+  // -- state etc --
 public:
+  static const int STATE_UNDEF = 0;
   static const int STATE_OPENING = 1;
   static const int STATE_OPEN = 2;
   static const int STATE_CLOSING = 3;
   static const int STATE_STALE = 4;   // ?
   static const int STATE_RECONNECTING = 5;
 
-private:
   int state;
   utime_t last_alive;         // last alive
-public:
   entity_inst_t inst;
 
+  bool is_opening() { return state == STATE_OPENING; }
+  bool is_open() { return state == STATE_OPEN; }
+  bool is_closing() { return state == STATE_CLOSING; }
+
   // -- caps --
 private:
   version_t cap_push_seq;     // cap push seq #
@@ -75,19 +78,25 @@ public:
     }
     finish_contexts(fls);
   }
-  void add_trim_waiter(metareqid_t ri, Context *c) {
-    waiting_for_trim[ri.tid] = c;
+  void add_trim_waiter(tid_t tid, Context *c) {
+    waiting_for_trim[tid] = c;
   }
-  bool have_completed_request(metareqid_t ri) const {
-    return completed_requests.count(ri.tid);
+  bool have_completed_request(tid_t tid) const {
+    return completed_requests.count(tid);
   }
 
+
+  Session() : 
+    state(STATE_UNDEF), 
+    cap_push_seq(0) { }
 };
 
 class SessionMap {
 private:
   MDS *mds;
   hash_map<entity_name_t, Session> session_map;
+  
+public:  // i am lazy
   version_t version, projected, committing, committed;
   map<version_t, list<Context*> > commit_waiters;
 
@@ -96,25 +105,53 @@ public:
                       version(0), projected(0), committing(0), committed(0) 
   { }
     
+  // sessions
   bool empty() { return session_map.empty(); }
-  
   Session* get_session(entity_name_t w) {
     if (session_map.count(w))
       return &session_map[w];
     return 0;
   }
-  Session* add_session(entity_name_t w) {
+  Session* get_or_add_session(entity_name_t w) {
     return &session_map[w];
   }
-  void remove_session(entity_name_t w) {
-    session_map.erase(w);
+  Session* get_or_add_session(entity_inst_t i) {
+    Session *s = get_or_add_session(i.name);
+    s->inst = i;
+    return s;
+  }
+  void remove_session(Session *s) {
+    s->trim_completed_requests(0);
+    session_map.erase(s->inst.name);
   }
 
-  void get_session_set(set<entity_name_t>& s) {
+  void get_client_set(set<int>& s) {
     for (hash_map<entity_name_t,Session>::iterator p = session_map.begin();
         p != session_map.end();
         p++)
-      s.insert(p->first);
+      if (p->second.inst.name.is_client())
+       s.insert(p->second.inst.name.num());
+  }
+  void get_client_session_set(set<Session*>& s) {
+    for (hash_map<entity_name_t,Session>::iterator p = session_map.begin();
+        p != session_map.end();
+        p++)
+      if (p->second.inst.name.is_client())
+       s.insert(&p->second);
+  }
+
+  void open_sessions(map<int,entity_inst_t>& client_map) {
+    for (map<int,entity_inst_t>::iterator p = client_map.begin(); 
+        p != client_map.end(); 
+        ++p) {
+      Session *session = get_or_add_session(p->second);
+      if (session->state == Session::STATE_UNDEF ||
+         session->state == Session::STATE_CLOSING) {
+       session->inst = p->second;
+       session->state = Session::STATE_OPEN;
+      }
+    }
+    version++;
   }
 
   // helpers
@@ -128,7 +165,21 @@ public:
   version_t get_push_seq(int client) {
     return get_session(entity_name_t::CLIENT(client))->get_push_seq();
   }
-  
+  bool have_completed_request(metareqid_t rid) {
+    Session *session = get_session(rid.name);
+    return session && session->have_completed_request(rid.tid);
+  }
+  void add_completed_request(metareqid_t rid) {
+    Session *session = get_session(rid.name);
+    assert(session);
+    session->add_completed_request(rid.tid);
+  }
+  void trim_completed_requests(entity_name_t c, tid_t tid) {
+    Session *session = get_session(c);
+    assert(session);
+    session->trim_completed_requests(tid);
+  }
+
   // -- loading, saving --
   inode_t inode;
   list<Context*> waiting_for_load;
index 3b39679dcd61f0ecabad3cc5203e22621c6d1912..4e1b50b3a4cb547731853c204e088df64850df78 100644 (file)
@@ -158,14 +158,14 @@ C_Gather *LogSegment::try_to_expire(MDS *mds)
     mds->idalloc->save(gather->new_sub(), allocv);
   }
 
-  // clientmap
-  if (clientmapv > mds->clientmap.get_committed()) {
-    dout(10) << "try_to_expire saving clientmap, need " << clientmapv 
-             << ", committed is " << mds->clientmap.get_committed()
-             << " (" << mds->clientmap.get_committing() << ")"
+  // sessionmap
+  if (sessionmapv > mds->sessionmap.committed) {
+    dout(10) << "try_to_expire saving sessionmap, need " << sessionmapv 
+             << ", committed is " << mds->sessionmap.committed
+             << " (" << mds->sessionmap.committing << ")"
              << dendl;
     if (!gather) gather = new C_Gather;
-    mds->clientmap.save(gather->new_sub(), clientmapv);
+    mds->sessionmap.save(gather->new_sub(), sessionmapv);
   }
 
   // pending commit atids
@@ -362,7 +362,7 @@ bool EMetaBlob::has_expired(MDS *mds)
   for (list<metareqid_t>::iterator p = client_reqs.begin();
        p != client_reqs.end();
        ++p) {
-    if (mds->clientmap.have_completed_request(*p)) {
+    if (mds->sessionmap.have_completed_request(*p)) {
       dout(10) << "EMetaBlob.has_expired still have completed request " << *p
               << dendl;
       return false;
@@ -504,10 +504,10 @@ void EMetaBlob::expire(MDS *mds, Context *c)
   for (list<metareqid_t>::iterator p = client_reqs.begin();
        p != client_reqs.end();
        ++p) {
-    if (mds->clientmap.have_completed_request(*p)) {
+    if (mds->sessionmap.have_completed_request(*p)) {
       dout(10) << "EMetaBlob.expire waiting on completed request " << *p
               << dendl;
-      mds->clientmap.add_trim_waiter(*p, gather->new_sub());
+      mds->sessionmap.add_trim_waiter(*p, gather->new_sub());
     }
   }
 
@@ -740,7 +740,7 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg)
   for (list<metareqid_t>::iterator p = client_reqs.begin();
        p != client_reqs.end();
        ++p)
-    mds->clientmap.add_completed_request(*p);
+    mds->sessionmap.add_completed_request(*p);
 
 
   // update segment
@@ -752,49 +752,52 @@ void EMetaBlob::replay(MDS *mds, LogSegment *logseg)
 
 void ESession::update_segment()
 {
-  _segment->clientmapv = cmapv;
+  _segment->sessionmapv = cmapv;
 }
 
 void ESession::replay(MDS *mds)
 {
-  if (mds->clientmap.get_version() >= cmapv) {
-    dout(10) << "ESession.replay clientmap " << mds->clientmap.get_version() 
+  if (mds->sessionmap.version >= cmapv) {
+    dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version 
             << " >= " << cmapv << ", noop" << dendl;
 
     // hrm, this isn't very pretty.
     if (!open)
-      mds->clientmap.trim_completed_requests(client_inst.name, 0);
+      mds->sessionmap.trim_completed_requests(client_inst.name, 0);
 
   } else {
-    dout(10) << "ESession.replay clientmap " << mds->clientmap.get_version() 
+    dout(10) << "ESession.replay sessionmap " << mds->sessionmap.version
             << " < " << cmapv << dendl;
-    assert(mds->clientmap.get_version() + 1 == cmapv);
+    mds->sessionmap.projected = ++mds->sessionmap.version;
+    assert(mds->sessionmap.version == cmapv);
     if (open) {
-      mds->clientmap.open_session(client_inst);
+      Session *session = mds->sessionmap.get_or_add_session(client_inst.name);
+      session->inst = client_inst;
+      session->state = Session::STATE_OPEN;
     } else {
-      mds->clientmap.close_session(client_inst.name.num());
-      mds->clientmap.trim_completed_requests(client_inst.name, 0);
+      Session *session = mds->sessionmap.get_session(client_inst.name);
+      if (session)
+       mds->sessionmap.remove_session(session);
     }
-    mds->clientmap.reset_projected(); // make it follow version.
   }
 }
 
 void ESessions::update_segment()
 {
-  _segment->clientmapv = cmapv;
+  _segment->sessionmapv = cmapv;
 }
 
 void ESessions::replay(MDS *mds)
 {
-  if (mds->clientmap.get_version() >= cmapv) {
-    dout(10) << "ESessions.replay clientmap " << mds->clientmap.get_version() 
+  if (mds->sessionmap.version >= cmapv) {
+    dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version
             << " >= " << cmapv << ", noop" << dendl;
   } else {
-    dout(10) << "ESessions.replay clientmap " << mds->clientmap.get_version() 
+    dout(10) << "ESessions.replay sessionmap " << mds->sessionmap.version
             << " < " << cmapv << dendl;
-    mds->clientmap.open_sessions(client_map);
-    assert(mds->clientmap.get_version() == cmapv);
-    mds->clientmap.reset_projected(); // make it follow version.
+    mds->sessionmap.open_sessions(client_map);
+    assert(mds->sessionmap.version == cmapv);
+    mds->sessionmap.projected = mds->sessionmap.version;
   }
 }
 
@@ -1077,18 +1080,18 @@ void EImportStart::replay(MDS *mds)
   mds->mdcache->add_ambiguous_import(base, bounds);
 
   // open client sessions?
-  if (mds->clientmap.get_version() >= cmapv) {
-    dout(10) << "EImportStart.replay clientmap " << mds->clientmap.get_version() 
+  if (mds->sessionmap.version >= cmapv) {
+    dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version 
             << " >= " << cmapv << ", noop" << dendl;
   } else {
-    dout(10) << "EImportStart.replay clientmap " << mds->clientmap.get_version() 
+    dout(10) << "EImportStart.replay sessionmap " << mds->sessionmap.version 
             << " < " << cmapv << dendl;
     map<int,entity_inst_t> cm;
     bufferlist::iterator blp = client_map.begin();
     ::_decode_simple(cm, blp);
-    mds->clientmap.open_sessions(cm);
-    assert(mds->clientmap.get_version() == cmapv);
-    mds->clientmap.reset_projected(); // make it follow version.
+    mds->sessionmap.open_sessions(cm);
+    assert(mds->sessionmap.version == cmapv);
+    mds->sessionmap.projected = mds->sessionmap.version;
   }
 }