- ScatterLock or something? hrm.
- discover
-/ - hard link dentries
- open_remote_ino needs major work...
- FIXME how to journal root and stray inode content?
- in particular, i care about dirfragtree.. get it on rejoin?
- and dir sizes, if i add that... also on rejoin?
-- rejoin and replicas that are not in recovered node's cache... fetch storm?
-
- remote xlocks
- drop remote locks on request finish
- handled by individual MDSCacheObject _finish()'s
- replicas will tell it when they hold an xlock
- surviving mds rejoins replicas from a recovering mds
- will tell auth if it holds an xlock
+
- recovering open files
-/ - need to journal EOpen
-/ - track openness in hierarchy, so we can tell when to expire vs rejournal EOpen metablobs
- - need to batch EOpen events when rejournaling to avoid looping
- - or something.........
- recovery will either have inode (from EOpen), or will provide path+cap to reassert open state.
- - path+cap window will require fetching and metadata from disk before doing the rejoin
-- client reconnect stage after resolve, but before rejoin.
-
-/- clientmap request history
-/ - journaled bit in MDRequest
-/ - complete request list in MClientMap
-/ - populated on replay
-/ - periodic client->mds MClientLastRetry -> trim request list
-
-
-/untested- truncate...
+ - path+cap window will require some fetching of metadata from disk before doing the rejoin
-- mds failure vs clients
- - idempotent client ops
- - EMetablob replay, expire logic
- journal+recovery
- local rename
-/ - fix dir renames vs subtrees
- how to notify replicas...
/ - stray purge
- stray reintegration
MClientReconnect *m = new MClientReconnect;
- for (hash_map<inodeno_t, Inode*>::iterator p = inode_map.begin();
- p != inode_map.end();
- p++) {
- if (p->second->caps.count(mds)) {
- dout(10) << " caps on " << p->first
- << " " << cap_string(p->second->caps[mds].caps)
- << " wants " << cap_string(p->second->file_caps_wanted())
- << endl;
- m->add_inode_caps(p->first,
- p->second->caps[mds].caps,
- p->second->caps[mds].seq,
- p->second->file_caps_wanted());
- string path;
- p->second->make_path(path);
- dout(10) << " path on " << p->first << " is " << path << endl;
- m->add_inode_path(p->first, path);
- }
- if (p->second->stale_caps.count(mds)) {
- dout(10) << " clearing stale caps on " << p->first << endl;
- p->second->stale_caps.erase(mds); // hrm, is this right?
- }
- }
+ if (mds_sessions.count(mds)) {
+ // i have an open session.
+ for (hash_map<inodeno_t, Inode*>::iterator p = inode_map.begin();
+ p != inode_map.end();
+ p++) {
+ if (p->second->caps.count(mds)) {
+ dout(10) << " caps on " << p->first
+ << " " << cap_string(p->second->caps[mds].caps)
+ << " wants " << cap_string(p->second->file_caps_wanted())
+ << endl;
+ m->add_inode_caps(p->first,
+ p->second->caps[mds].caps,
+ p->second->caps[mds].seq,
+ p->second->file_caps_wanted());
+ string path;
+ p->second->make_path(path);
+ dout(10) << " path on " << p->first << " is " << path << endl;
+ m->add_inode_path(p->first, path);
+ }
+ if (p->second->stale_caps.count(mds)) {
+ dout(10) << " clearing stale caps on " << p->first << endl;
+ p->second->stale_caps.erase(mds); // hrm, is this right?
+ }
+ }
+ } else {
+ dout(10) << " i had no session with this mds";
+ m->closed = true;
+ }
messenger->send_message(m, mdsmap->get_inst(mds), MDS_PORT_SERVER);
}
#include "SyntheticClient.h"
#include "include/filepath.h"
-#include "mds/MDS.h"
+#include "mds/mdstypes.h"
+#include "common/Logger.h"
#include <sys/types.h>
#include <sys/stat.h>
mds_log_before_reply: true,
mds_log_flush_on_shutdown: true,
mds_log_import_map_interval: 1024*1024, // frequency (in bytes) of EImportMap in log
+ mds_log_eopen_size: 100,
+
mds_bal_replicate_threshold: 2000,
mds_bal_unreplicate_threshold: 0,//500,
mds_bal_hash_rd: 10000,
bool mds_log_before_reply;
bool mds_log_flush_on_shutdown;
off_t mds_log_import_map_interval;
+ int mds_log_eopen_size;
float mds_bal_replicate_threshold;
float mds_bal_unreplicate_threshold;
// kick any waiters
if (ack_waiters.count(atid)) {
dout(15) << "kicking waiters on atid " << atid << endl;
- mds->queue_finished(ack_waiters[atid]);
+ mds->queue_waiters(ack_waiters[atid]);
ack_waiters.erase(atid);
}
}
list<Context*> finished;
take_waiting(mask, finished);
//finish_contexts(finished, result);
- cache->mds->queue_finished(finished);
+ cache->mds->queue_waiters(finished);
}
map<version_t, list<Context*> >::iterator n = p;
n++;
if (p->first > committed_version) break; // haven't committed this far yet.
- cache->mds->queue_finished(p->second);
+ cache->mds->queue_waiters(p->second);
waiting_for_commit.erase(p);
p = n;
}
if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) {
list<Context*> ls;
take_waiting(WAIT_SINGLEAUTH, ls);
- cache->mds->queue_finished(ls);
+ cache->mds->queue_waiters(ls);
}
}
static const int PIN_UNANCHORING = 13;
static const int PIN_OPENINGDIR = 14;
static const int PIN_REMOTEPARENT = 15;
+ static const int PIN_BATCHOPENJOURNAL = 16;
const char *pin_name(int p) {
switch (p) {
case PIN_UNANCHORING: return "unanchoring";
case PIN_OPENINGDIR: return "openingdir";
case PIN_REMOTEPARENT: return "remoteparent";
+ case PIN_BATCHOPENJOURNAL: return "batchopenjournal";
default: return generic_pin_name(p);
}
}
private:
// effects version
hash_map<int,entity_inst_t> client_inst;
- set<int> sessions;
+ set<int> sessions;
+ set<int> opening;
+ set<int> closing;
public:
bool empty() {
}
const set<int>& get_session_set() { return sessions; }
+ bool is_opening(int c) { return opening.count(c); }
+ void add_opening(int c) { opening.insert(c); }
+ bool is_closing(int c) { return closing.count(c); }
+ void add_closing(int c) { closing.insert(c); }
bool have_session(int client) {
return client_inst.count(client);
}
- void add_session(const entity_inst_t& inst) {
+ void open_session(const entity_inst_t& inst) {
+ opening.erase(inst.name.num());
client_inst[inst.name.num()] = inst;
sessions.insert(inst.name.num());
version++;
}
- void rem_session(int client) {
+ void close_session(int client) {
+ closing.erase(client);
sessions.erase(client);
client_inst.erase(client);
version++;
map<int, map<tid_t,Context*> >::iterator q = waiting_for_trim.find(client);
if (q != waiting_for_trim.end()) {
list<Context*> fls;
- while (q->second.begin()->first < mintid) {
+ while (!q->second.empty() &&
+ (mintid == 0 || q->second.begin()->first < mintid)) {
fls.push_back(q->second.begin()->second);
q->second.erase(q->second.begin());
}
// take waiters
list<Context*> waiters;
in->take_waiting(CInode::WAIT_DIR, waiters);
- mds->queue_finished(waiters);
+ mds->queue_waiters(waiters);
dout(10) << "kicking WAIT_DIR on " << *in << endl;
// remove from mds list
}
// queue them up.
- mds->queue_finished(waiters);
+ mds->queue_waiters(waiters);
}
void MDCache::set_recovery_set(set<int>& s)
trim(0);
dout(5) << "lru size now " << lru.lru_get_size() << endl;
+ // flush batching eopens, so that we can properly expire them.
+ mds->server->journal_opens(); // hrm, this is sort of a hack.
+
+ // flush what we can from the log
+ mds->mdlog->trim(0);
// SUBTREES
// send all imports back to 0.
// FIXME
dout(7) << "FIXME: i need to empty out stray dir contents..." << endl;
- // LOG
- mds->mdlog->trim(0);
-
// (wait for) flush log?
if (g_conf.mds_log_flush_on_shutdown) {
if (mds->mdlog->get_non_importmap_events()) {
// finish errors directly
finish_contexts(error, -ENOENT);
- mds->queue_finished(finished);
+ mds->queue_waiters(finished);
// done
delete m;
list<Context*> finished;
CDir *dir = add_replica_dir(in, m->straydir->get_dirfrag().frag, *m->straydir,
m->get_source().num(), finished);
- if (!finished.empty()) mds->queue_finished(finished);
+ if (!finished.empty()) mds->queue_waiters(finished);
// dentry
straydn = dir->add_dentry( m->straydn->get_dname(), 0, false );
// -- waiters --
list<Context*> finished_queue;
- void queue_finished(Context *c) {
+ void queue_waiter(Context *c) {
finished_queue.push_back(c);
}
- void queue_finished(list<Context*>& ls) {
+ void queue_waiters(list<Context*>& ls) {
finished_queue.splice( finished_queue.end(), ls );
}
mds->locker->dentry_anon_rdlock_trace_finish(trace);
// wake up any waiters
- mds->queue_finished(export_finish_waiters[dir]);
+ mds->queue_waiters(export_finish_waiters[dir]);
export_finish_waiters.erase(dir);
// send pending import_maps? (these need to go out when all exports have finished.)
export_notify_ack_waiting.erase(dir);
// queue finishers
- mds->queue_finished(export_finish_waiters[dir]);
+ mds->queue_waiters(export_finish_waiters[dir]);
export_finish_waiters.erase(dir);
// stats
class C_MDS_session_finish : public Context {
MDS *mds;
- MClientSession *m;
+ entity_inst_t client_inst;
bool open;
version_t cmapv;
public:
- C_MDS_session_finish(MDS *m, MClientSession *msg, bool s, version_t mv) :
- mds(m), m(msg), open(s), cmapv(mv) { }
+ C_MDS_session_finish(MDS *m, entity_inst_t ci, bool s, version_t mv) :
+ mds(m), client_inst(ci), open(s), cmapv(mv) { }
void finish(int r) {
assert(r == 0);
-
- // apply
- if (open)
- mds->clientmap.add_session(m->get_source_inst());
- else
- mds->clientmap.rem_session(m->get_source().num());
-
- assert(cmapv == mds->clientmap.get_version());
-
- // purge completed requests from clientmap?
- if (!open)
- mds->clientmap.trim_completed_requests(m->get_source().num(), 0);
-
- // reply
- mds->messenger->send_message(new MClientSession(m->op+1), m->get_source_inst());
- delete m;
+ mds->server->_session_logged(client_inst, open, cmapv);
}
};
void Server::handle_client_session(MClientSession *m)
{
dout(3) << "handle_client_session " << *m << " from " << m->get_source() << endl;
-
+ int from = m->get_source().num();
bool open = m->op == MClientSession::OP_OPEN;
-
+
+ if (open) {
+ if (mds->clientmap.is_opening(from)) {
+ dout(10) << "already opening, dropping this req" << endl;
+ delete m;
+ return;
+ }
+ mds->clientmap.add_opening(from);
+ } else {
+ if (mds->clientmap.is_closing(from)) {
+ dout(10) << "already closing, dropping this req" << endl;
+ delete m;
+ return;
+ }
+ mds->clientmap.add_closing(from);
+ }
+
// journal it
version_t cmapv = mds->clientmap.inc_projected();
mdlog->submit_entry(new ESession(m->get_source_inst(), open, cmapv),
- new C_MDS_session_finish(mds, m, open, cmapv));
+ new C_MDS_session_finish(mds, m->get_source_inst(), open, cmapv));
+ delete m;
+}
+
+void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cmapv)
+{
+ dout(10) << "_session_logged " << client_inst << " " << (open ? "open":"close")
+ << " " << cmapv
+ << endl;
+
+ // apply
+ int from = client_inst.name.num();
+ if (open) {
+ assert(mds->clientmap.is_opening(from));
+ mds->clientmap.open_session(client_inst);
+ } else {
+ assert(mds->clientmap.is_closing(from));
+ mds->clientmap.close_session(from);
+
+ // purge completed requests from clientmap
+ mds->clientmap.trim_completed_requests(from, 0);
+ }
+
+ assert(cmapv == mds->clientmap.get_version());
+
+ // reply
+ if (open)
+ mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN_ACK), client_inst);
+ else
+ mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE_ACK), client_inst);
}
dout(2) << "terminate_sessions" << endl;
// kill them off. clients will retry etc.
- while (!mds->clientmap.get_session_set().empty()) {
- int client = *mds->clientmap.get_session_set().begin();
- dout(10) << "terminating session for client" << client << endl;
-
- mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE_ACK),
- mds->clientmap.get_inst(client));
- mds->clientmap.rem_session(client);
-
- // trim requests
- mds->clientmap.trim_completed_requests(client, 0);
+ for (set<int>::const_iterator p = mds->clientmap.get_session_set().begin();
+ p != mds->clientmap.get_session_set().end();
+ ++p) {
+ if (mds->clientmap.is_closing(*p))
+ continue;
+ mds->clientmap.add_closing(*p);
+ version_t cmapv = mds->clientmap.inc_projected();
+ mdlog->submit_entry(new ESession(mds->clientmap.get_inst(*p), false, cmapv),
+ new C_MDS_session_finish(mds, mds->clientmap.get_inst(*p), false, cmapv));
}
-
- // FIXME hrm, should i journal this?
}
dout(7) << "handle_client_reconnect " << m->get_source() << endl;
int from = m->get_source().num();
- // caps
- for (map<inodeno_t, MClientReconnect::inode_caps_t>::iterator p = m->inode_caps.begin();
- p != m->inode_caps.end();
- ++p) {
- CInode *in = mdcache->get_inode(p->first);
- if (!in) {
- dout(0) << "missing " << p->first << ", fetching via " << m->inode_path[p->first] << endl;
- assert(0);
- continue;
- }
+ if (m->closed) {
+ dout(7) << " client had no session, removing from clientmap" << endl;
- dout(10) << " client cap " << cap_string(p->second.wanted)
- << " seq " << p->second.seq
- << " on " << *in << endl;
- Capability cap(p->second.wanted, p->second.seq);
- in->add_client_cap(from, cap);
+ mds->clientmap.add_closing(from);
+ version_t cmapv = mds->clientmap.inc_projected();
+ mdlog->submit_entry(new ESession(mds->clientmap.get_inst(from), false, cmapv),
+ new C_MDS_session_finish(mds, mds->clientmap.get_inst(from), false, cmapv));
- reconnected_open_files.insert(in);
+ } else {
+
+ // caps
+ for (map<inodeno_t, MClientReconnect::inode_caps_t>::iterator p = m->inode_caps.begin();
+ p != m->inode_caps.end();
+ ++p) {
+ CInode *in = mdcache->get_inode(p->first);
+ if (!in) {
+ dout(0) << "missing " << p->first << ", fetching via " << m->inode_path[p->first] << endl;
+ assert(0);
+ continue;
+ }
+
+ dout(10) << " client cap " << cap_string(p->second.wanted)
+ << " seq " << p->second.seq
+ << " on " << *in << endl;
+ Capability cap(p->second.wanted, p->second.seq);
+ in->add_client_cap(from, cap);
+
+ reconnected_open_files.insert(in);
+ }
}
// remove from gather set
C_MDS_unlink_local_finish *fin = new C_MDS_unlink_local_finish(mds, mdr, dn, straydn,
ipv, pi->ctime);
+ journal_opens(); // journal pending opens, just in case
+
// log + wait
mdlog->submit_entry(le);
mdlog->wait_for_sync(fin);
C_MDS_rename_local_finish *fin = new C_MDS_rename_local_finish(mds, mdr,
srcdn, destdn, straydn,
ipv, pi ? pi->ctime:utime_t());
+
+ journal_opens(); // journal pending opens, just in case
if (anchorfin) {
// doing anchor update prepare first
// journal?
if (cur->last_open_journaled == 0) {
- cur->last_open_journaled = mdlog->get_write_pos();
- mdlog->submit_entry(new EOpen(cur));
+ queue_journal_open(cur);
+ maybe_journal_opens();
}
}
+void Server::queue_journal_open(CInode *in)
+{
+ dout(10) << "queue_journal_open on " << *in << endl;
+
+ // pin so our pointer stays valid
+ in->get(CInode::PIN_BATCHOPENJOURNAL);
+
+ // queue it up for a bit
+ journal_open_queue.insert(in);
+}
+
+
+void Server::journal_opens()
+{
+ dout(10) << "journal_opens " << journal_open_queue.size() << " inodes" << endl;
+ if (journal_open_queue.empty()) return;
+
+ EOpen *le = 0;
+
+ // check queued inodes
+ for (set<CInode*>::iterator p = journal_open_queue.begin();
+ p != journal_open_queue.end();
+ ++p) {
+ (*p)->put(CInode::PIN_BATCHOPENJOURNAL);
+ if ((*p)->is_any_caps()) {
+ if (!le) le = new EOpen;
+ le->add_inode(*p);
+ (*p)->last_open_journaled = mds->mdlog->get_write_pos();
+ }
+ }
+ journal_open_queue.clear();
+
+ if (le) {
+ // journal
+ mds->mdlog->submit_entry(le);
+
+ // add waiters to journal entry
+ for (list<Context*>::iterator p = journal_open_waiters.begin();
+ p != journal_open_waiters.end();
+ ++p)
+ mds->mdlog->wait_for_sync(*p);
+ journal_open_waiters.clear();
+ } else {
+ // nothing worth journaling here, just kick the waiters.
+ mds->queue_waiters(journal_open_waiters);
+ }
+}
+
+
+
class C_MDS_open_truncate_purged : public Context {
MDS *mds;
set<CInode*> reconnected_open_files;
void handle_client_session(class MClientSession *m);
+ void _session_logged(entity_inst_t ci, bool open, version_t cmapv);
void reconnect_clients();
void handle_client_reconnect(class MClientReconnect *m);
void client_reconnect_failure(int from);
void handle_client_opent(MDRequest *mdr); // O_TRUNC variant.
void _do_open(MDRequest *mdr, CInode *ref);
+ set<CInode*> journal_open_queue; // to be journal
+ list<Context*> journal_open_waiters;
+ void queue_journal_open(CInode *in);
+ void add_journal_open_waiter(Context *c) {
+ journal_open_waiters.push_back(c);
+ }
+ void maybe_journal_opens() {
+ if (journal_open_queue.size() >= (unsigned)g_conf.mds_log_eopen_size)
+ journal_opens();
+ }
+ void journal_opens();
+
// namespace changes
void handle_client_mknod(MDRequest *mdr);
void handle_client_mkdir(MDRequest *mdr);
class EOpen : public LogEvent {
public:
EMetaBlob metablob;
- inodeno_t ino;
+ list<inodeno_t> inos;
EOpen() : LogEvent(EVENT_OPEN) { }
- EOpen(CInode *in) : LogEvent(EVENT_OPEN),
- ino(in->ino()) {
- metablob.add_primary_dentry(in->get_parent_dn(), false);
+ EOpen(CInode *in) : LogEvent(EVENT_OPEN) {
+ add_inode(in);
}
void print(ostream& out) {
- out << "EOpen " << ino << " " << metablob;
+ out << "EOpen " << metablob;
+ }
+
+ void add_inode(CInode *in) {
+ inos.push_back(in->ino());
+ metablob.add_primary_dentry(in->get_parent_dn(), false);
}
void encode_payload(bufferlist& bl) {
- ::_encode(ino, bl);
+ ::_encode(inos, bl);
metablob._encode(bl);
}
void decode_payload(bufferlist& bl, int& off) {
- ::_decode(ino, bl, off);
+ ::_decode(inos, bl, off);
metablob._decode(bl, off);
}
#include "MDS.h"
#include "MDLog.h"
#include "MDCache.h"
+#include "Server.h"
#include "Migrator.h"
#include "AnchorTable.h"
#include "AnchorClient.h"
{
dout(10) << "ESession.replay" << endl;
if (open)
- mds->clientmap.add_session(client_inst);
+ mds->clientmap.open_session(client_inst);
else
- mds->clientmap.rem_session(client_inst.name.num());
+ mds->clientmap.close_session(client_inst.name.num());
mds->clientmap.reset_projected(); // make it follow version.
}
bool EOpen::has_expired(MDS *mds)
{
- CInode *in = mds->mdcache->get_inode(ino);
- if (!in) return true;
- if (!in->is_any_caps()) return true;
- if (in->last_open_journaled > get_start_off() ||
- in->last_open_journaled == 0) return true;
- return false;
+ for (list<inodeno_t>::iterator p = inos.begin(); p != inos.end(); ++p) {
+ CInode *in = mds->mdcache->get_inode(*p);
+ if (in &&
+ in->is_any_caps() &&
+ !(in->last_open_journaled > get_start_off() ||
+ in->last_open_journaled == 0)) {
+ dout(10) << "EOpen.has_expired still refer to caps on " << *in << endl;
+ return false;
+ }
+ }
+ return true;
}
void EOpen::expire(MDS *mds, Context *c)
{
- CInode *in = mds->mdcache->get_inode(ino);
- assert(in);
-
- dout(10) << "EOpen.expire " << ino
- << " last_open_journaled " << in->last_open_journaled << endl;
+ dout(10) << "EOpen.expire " << endl;
- // wait?
- // FIXME this is stupid.
- if (in->last_open_journaled == get_start_off()) {
- //||
- //(get_start_off() < mds->mdlog->last_import_map &&
- //in->last_open_journaled < mds->mdlog->last_import_map)) {
- dout(10) << "waiting." << endl;
- // wait
- mds->mdlog->add_import_map_expire_waiter(c);
- } else {
- // rejournal now.
- dout(10) << "rejournaling" << endl;
- in->last_open_journaled = mds->mdlog->get_write_pos();
- mds->mdlog->submit_entry(new EOpen(in));
+ if (mds->mdlog->is_capped()) {
+ dout(0) << "uh oh, log is capped, but i have unexpired opens." << endl;
+ assert(0);
+ }
+
+ for (list<inodeno_t>::iterator p = inos.begin(); p != inos.end(); ++p) {
+ CInode *in = mds->mdcache->get_inode(*p);
+ if (!in) continue;
+ if (!in->is_any_caps()) continue;
+
+ dout(10) << "EOpen.expire " << in->ino()
+ << " last_open_journaled " << in->last_open_journaled << endl;
+
+ mds->server->queue_journal_open(in);
}
+ mds->server->add_journal_open_waiter(c);
+ mds->server->maybe_journal_opens();
}
void EOpen::replay(MDS *mds)
{
- dout(10) << "EOpen.replay " << ino << endl;
+ dout(10) << "EOpen.replay " << endl;
metablob.replay(mds);
}
map<inodeno_t, inode_caps_t> inode_caps;
map<inodeno_t, string> inode_path;
+ bool closed;
- MClientReconnect() : Message(MSG_CLIENT_RECONNECT) { }
+ MClientReconnect() : Message(MSG_CLIENT_RECONNECT),
+ closed(false) { }
char *get_type_name() { return "client_reconnect"; }
void print(ostream& out) {
}
void encode_payload() {
+ ::_encode(closed, payload);
::_encode(inode_caps, payload);
::_encode(inode_path, payload);
}
void decode_payload() {
int off = 0;
+ ::_decode(closed, payload, off);
::_decode(inode_caps, payload, off);
::_decode(inode_path, payload, off);
}
(client_map.count(from) &&
client_map[from] != m->get_source_addr())) {
from = num_clients++;
- dout(10) << "client_boot assigned client" << from << endl;
+ dout(10) << "client_mount assigned client" << from << endl;
}
client_map[from] = m->get_source_addr();
lock.Unlock();
dout(10) << "wait: done." << endl;
+ dout(1) << "shutdown complete." << endl;
}
// stop my dispatch thread
if (dispatch_thread.am_self()) {
- dout(1) << "shutdown i am dispatch, setting stop flag" << endl;
+ dout(10) << "shutdown i am dispatch, setting stop flag" << endl;
stop = true;
} else {
- dout(1) << "shutdown i am not dispatch, setting stop flag and joining thread." << endl;
+ dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << endl;
lock.Lock();
stop = true;
cond.Signal();