mds/LogEvent.o\
mds/IdAllocator.o\
mds/ClientMap.o\
+ mds/SessionMap.o\
mds/MDLog.o
OSD_OBJS= \
#include "MDBalancer.h"
#include "IdAllocator.h"
#include "Migrator.h"
-//#include "Renamer.h"
#include "AnchorTable.h"
#include "AnchorClient.h"
// cons/des
MDS::MDS(int whoami, Messenger *m, MonMap *mm) :
timer(mds_lock),
- clientmap(this) {
+ clientmap(this), sessionmap(this) {
this->whoami = whoami;
void MDS::send_message_client(Message *m, int client)
{
- version_t seq = clientmap.inc_push_seq(client);
+ version_t seq = sessionmap.inc_push_seq(client);
dout(10) << "send_message_client client" << client << " seq " << seq << " " << *m << dendl;
- messenger->send_message(m, clientmap.get_inst(client));
+ messenger->send_message(m, sessionmap.get_inst(entity_name_t::CLIENT(client)));
}
void MDS::send_message_client(Message *m, entity_inst_t clientinst)
{
- version_t seq = clientmap.inc_push_seq(clientinst.name.num());
- dout(10) << "send_message_client client" << clientinst.name.num() << " seq " << seq << " " << *m << dendl;
+ version_t seq = sessionmap.inc_push_seq(clientinst.name.num());
+ dout(10) << "send_message_client " << clientinst.name << " seq " << seq << " " << *m << dendl;
messenger->send_message(m, clientinst);
}
dout(7) << "bcast_mds_map " << mdsmap->get_epoch() << dendl;
// share the map with mounted clients
- for (set<int>::const_iterator p = clientmap.get_session_set().begin();
- p != clientmap.get_session_set().end();
- ++p) {
- messenger->send_message(new MMDSMap(mdsmap),
- clientmap.get_inst(*p));
- }
+ set<entity_name_t> clients;
+ sessionmap.get_session_set(clients);
+ for (set<entity_name_t>::const_iterator p = clients.begin();
+ p != clients.end();
+ ++p)
+ messenger->send_message(new MMDSMap(mdsmap), sessionmap.get_inst(*p));
last_client_mdsmap_bcast = mdsmap->get_epoch();
}
idalloc->save(fin->new_sub());
// write empty clientmap
- clientmap.save(fin->new_sub());
+ sessionmap.save(fin->new_sub());
// fixme: fake out anchortable
if (mdsmap->get_anchortable() == whoami) {
dout(2) << "boot_start " << step << ": opening idalloc" << dendl;
idalloc->load(gather->new_sub());
- dout(2) << "boot_start " << step << ": opening clientmap" << dendl;
- clientmap.load(gather->new_sub());
+ dout(2) << "boot_start " << step << ": opening sessionmap" << dendl;
+ sessionmap.load(gather->new_sub());
if (mdsmap->get_anchortable() == whoami) {
dout(2) << "boot_start " << step << ": opening anchor table" << dendl;
#include "MDSMap.h"
#include "ClientMap.h"
+#include "SessionMap.h"
// -- client map --
ClientMap clientmap;
+ SessionMap sessionmap;
epoch_t last_client_mdsmap_bcast;
//void log_clientmap(Context *c);
dout(3) << "handle_client_session " << *m << " from " << m->get_source() << dendl;
int from = m->get_source().num();
bool open = false;
+ Session *session = mds->sessionmap.get_session(m->get_source());
switch (m->op) {
- case MClientSession::OP_REQUEST_OPEN;
+ case MClientSession::OP_REQUEST_OPEN:
open = true;
- if (mds->clientmap.have_session(from)) {
+ if (!session) {
dout(10) << "already open, dropping this req" << dendl;
delete m;
return;
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#include "MDS.h"
+#include "SessionMap.h"
+#include "osdc/Filer.h"
+
+#include "config.h"
+
+#define dout(x) if (x <= g_conf.debug_mds) *_dout << dbeginl << g_clock.now() << " mds" << mds->get_nodeid() << ".sessionmap "
+
+void SessionMap::init_inode()
+{
+ memset(&inode, 0, sizeof(inode));
+ inode.ino = MDS_INO_SESSIONMAP_OFFSET + mds->get_nodeid();
+ inode.layout = g_OSD_FileLayout;
+}
+
+
+// ----------------
+// LOAD
+
+class C_SM_Load : public Context {
+ SessionMap *sessionmap;
+public:
+ bufferlist bl;
+ C_SM_Load(SessionMap *cm) : sessionmap(cm) {}
+ void finish(int r) {
+ sessionmap->_load_finish(bl);
+ }
+};
+
+void SessionMap::load(Context *onload)
+{
+ dout(10) << "load" << dendl;
+
+ init_inode();
+
+ if (onload)
+ waiting_for_load.push_back(onload);
+
+ C_SM_Load *c = new C_SM_Load(this);
+ mds->filer->read(inode,
+ 0, inode.layout.fl_stripe_unit,
+ &c->bl,
+ c);
+
+}
+
+void SessionMap::_load_finish(bufferlist &bl)
+{
+ decode(bl);
+ dout(10) << "_load_finish v " << version
+ << ", " << session_map.size() << " sessions, "
+ << bl.length() << " bytes"
+ << dendl;
+ projected = committing = committed = version;
+ finish_contexts(waiting_for_load);
+}
+
+
+// ----------------
+// SAVE
+
+class C_SM_Save : public Context {
+ SessionMap *sessionmap;
+ version_t version;
+public:
+ C_SM_Save(SessionMap *cm, version_t v) : sessionmap(cm), version(v) {}
+ void finish(int r) {
+ sessionmap->_save_finish(version);
+ }
+};
+
+void SessionMap::save(Context *onsave, version_t needv)
+{
+ dout(10) << "save needv " << needv << ", v " << version << dendl;
+
+ if (needv && committing >= needv) {
+ assert(committing > committed);
+ commit_waiters[committing].push_back(onsave);
+ return;
+ }
+
+ commit_waiters[version].push_back(onsave);
+
+ bufferlist bl;
+
+ init_inode();
+ encode(bl);
+ committing = version;
+ mds->filer->write(inode,
+ 0, bl.length(), bl,
+ 0,
+ 0, new C_SM_Save(this, version));
+}
+
+void SessionMap::_save_finish(version_t v)
+{
+ dout(10) << "_save_finish v" << v << dendl;
+ committed = v;
+
+ finish_contexts(commit_waiters[v]);
+ commit_waiters.erase(v);
+}
+
+
+// -------------------
+
+void SessionMap::encode(bufferlist& bl)
+{
+
+}
+
+void SessionMap::decode(bufferlist& bl)
+{
+
+}
MDS *mds;
hash_map<entity_name_t, Session> session_map;
version_t version, projected, committing, committed;
+ map<version_t, list<Context*> > commit_waiters;
public:
SessionMap(MDS *m) : mds(m),
return &session_map[w];
return 0;
}
- entity_inst_t& get_inst(entity_name_t w) {
- assert(session_map.count(w));
- return session_map[w].inst;
- }
-
Session* add_session(entity_name_t w) {
return &session_map[w];
}
void remove_session(entity_name_t w) {
session_map.erase(w);
}
+
+ void get_session_set(set<entity_name_t>& s) {
+ for (hash_map<entity_name_t,Session>::iterator p = session_map.begin();
+ p != session_map.end();
+ p++)
+ s.insert(p->first);
+ }
+
+ // helpers
+ entity_inst_t& get_inst(entity_name_t w) {
+ assert(session_map.count(w));
+ return session_map[w].inst;
+ }
+ version_t inc_push_seq(int client) {
+ return get_session(entity_name_t::CLIENT(client))->inc_push_seq();
+ }
+ version_t get_push_seq(int client) {
+ return get_session(entity_name_t::CLIENT(client))->get_push_seq();
+ }
-
+ // -- loading, saving --
+ inode_t inode;
+ list<Context*> waiting_for_load;
+
+ void encode(bufferlist& bl);
+ void decode(bufferlist& bl);
+
+ void init_inode();
+ void load(Context *onload);
+ void _load_finish(bufferlist &bl);
+ void save(Context *onsave, version_t needv=0);
+ void _save_finish(version_t v);
+
};
#define MDS_INO_LOG_OFFSET (1*MAX_MDS)
#define MDS_INO_IDS_OFFSET (2*MAX_MDS)
#define MDS_INO_CLIENTMAP_OFFSET (3*MAX_MDS)
-#define MDS_INO_STRAY_OFFSET (4*MAX_MDS)
-#define MDS_INO_BASE (5*MAX_MDS)
+#define MDS_INO_SESSIONMAP_OFFSET (4*MAX_MDS)
+#define MDS_INO_STRAY_OFFSET (5*MAX_MDS)
+#define MDS_INO_BASE (6*MAX_MDS)
#define MDS_INO_STRAY(x) (MDS_INO_STRAY_OFFSET+((unsigned)x))
#define MDS_INO_IS_STRAY(i) ((i) >= MDS_INO_STRAY_OFFSET && (i) < MDS_INO_STRAY_OFFSET+MAX_MDS)