From a21552d78666a73a0572c8fe422c8973fae133cb Mon Sep 17 00:00:00 2001 From: sageweil Date: Thu, 5 Apr 2007 20:29:13 +0000 Subject: [PATCH] * session recovery cleanup * EOpen batching git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1341 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/TODO | 24 +-- branches/sage/cephmds2/client/Client.cc | 50 +++-- .../sage/cephmds2/client/SyntheticClient.cc | 3 +- branches/sage/cephmds2/config.cc | 2 + branches/sage/cephmds2/config.h | 1 + branches/sage/cephmds2/mds/AnchorClient.cc | 2 +- branches/sage/cephmds2/mds/CDir.cc | 6 +- branches/sage/cephmds2/mds/CInode.h | 2 + branches/sage/cephmds2/mds/ClientMap.h | 17 +- branches/sage/cephmds2/mds/MDCache.cc | 16 +- branches/sage/cephmds2/mds/MDS.h | 4 +- branches/sage/cephmds2/mds/Migrator.cc | 4 +- branches/sage/cephmds2/mds/Server.cc | 196 +++++++++++++----- branches/sage/cephmds2/mds/Server.h | 13 ++ branches/sage/cephmds2/mds/events/EOpen.h | 18 +- branches/sage/cephmds2/mds/journal.cc | 60 +++--- .../sage/cephmds2/messages/MClientReconnect.h | 6 +- branches/sage/cephmds2/mon/ClientMonitor.cc | 2 +- branches/sage/cephmds2/msg/SimpleMessenger.cc | 5 +- 19 files changed, 276 insertions(+), 155 deletions(-) diff --git a/branches/sage/cephmds2/TODO b/branches/sage/cephmds2/TODO index 04a5e01a813e0..00c87fe4cfa26 100644 --- a/branches/sage/cephmds2/TODO +++ b/branches/sage/cephmds2/TODO @@ -40,15 +40,12 @@ mds - ScatterLock or something? hrm. - discover -/ - hard link dentries - open_remote_ino needs major work... - FIXME how to journal root and stray inode content? - in particular, i care about dirfragtree.. get it on rejoin? - and dir sizes, if i add that... also on rejoin? -- rejoin and replicas that are not in recovered node's cache... fetch storm? - - remote xlocks - drop remote locks on request finish - handled by individual MDSCacheObject _finish()'s @@ -58,30 +55,13 @@ mds - replicas will tell it when they hold an xlock - surviving mds rejoins replicas from a recovering mds - will tell auth if it holds an xlock + - recovering open files -/ - need to journal EOpen -/ - track openness in hierarchy, so we can tell when to expire vs rejournal EOpen metablobs - - need to batch EOpen events when rejournaling to avoid looping - - or something......... - recovery will either have inode (from EOpen), or will provide path+cap to reassert open state. - - path+cap window will require fetching and metadata from disk before doing the rejoin -- client reconnect stage after resolve, but before rejoin. - -/- clientmap request history -/ - journaled bit in MDRequest -/ - complete request list in MClientMap -/ - populated on replay -/ - periodic client->mds MClientLastRetry -> trim request list - - -/untested- truncate... + - path+cap window will require some fetching of metadata from disk before doing the rejoin -- mds failure vs clients - - idempotent client ops - - EMetablob replay, expire logic - journal+recovery - local rename -/ - fix dir renames vs subtrees - how to notify replicas... / - stray purge - stray reintegration diff --git a/branches/sage/cephmds2/client/Client.cc b/branches/sage/cephmds2/client/Client.cc index 53598bfcd548e..3cb83b84781ef 100644 --- a/branches/sage/cephmds2/client/Client.cc +++ b/branches/sage/cephmds2/client/Client.cc @@ -892,28 +892,34 @@ void Client::send_reconnect(int mds) MClientReconnect *m = new MClientReconnect; - for (hash_map::iterator p = inode_map.begin(); - p != inode_map.end(); - p++) { - if (p->second->caps.count(mds)) { - dout(10) << " caps on " << p->first - << " " << cap_string(p->second->caps[mds].caps) - << " wants " << cap_string(p->second->file_caps_wanted()) - << endl; - m->add_inode_caps(p->first, - p->second->caps[mds].caps, - p->second->caps[mds].seq, - p->second->file_caps_wanted()); - string path; - p->second->make_path(path); - dout(10) << " path on " << p->first << " is " << path << endl; - m->add_inode_path(p->first, path); - } - if (p->second->stale_caps.count(mds)) { - dout(10) << " clearing stale caps on " << p->first << endl; - p->second->stale_caps.erase(mds); // hrm, is this right? - } - } + if (mds_sessions.count(mds)) { + // i have an open session. + for (hash_map::iterator p = inode_map.begin(); + p != inode_map.end(); + p++) { + if (p->second->caps.count(mds)) { + dout(10) << " caps on " << p->first + << " " << cap_string(p->second->caps[mds].caps) + << " wants " << cap_string(p->second->file_caps_wanted()) + << endl; + m->add_inode_caps(p->first, + p->second->caps[mds].caps, + p->second->caps[mds].seq, + p->second->file_caps_wanted()); + string path; + p->second->make_path(path); + dout(10) << " path on " << p->first << " is " << path << endl; + m->add_inode_path(p->first, path); + } + if (p->second->stale_caps.count(mds)) { + dout(10) << " clearing stale caps on " << p->first << endl; + p->second->stale_caps.erase(mds); // hrm, is this right? + } + } + } else { + dout(10) << " i had no session with this mds"; + m->closed = true; + } messenger->send_message(m, mdsmap->get_inst(mds), MDS_PORT_SERVER); } diff --git a/branches/sage/cephmds2/client/SyntheticClient.cc b/branches/sage/cephmds2/client/SyntheticClient.cc index bb7d6540f7998..5564cf90a6d72 100644 --- a/branches/sage/cephmds2/client/SyntheticClient.cc +++ b/branches/sage/cephmds2/client/SyntheticClient.cc @@ -20,7 +20,8 @@ using namespace std; #include "SyntheticClient.h" #include "include/filepath.h" -#include "mds/MDS.h" +#include "mds/mdstypes.h" +#include "common/Logger.h" #include #include diff --git a/branches/sage/cephmds2/config.cc b/branches/sage/cephmds2/config.cc index 51394988bc8ed..379df1fb68859 100644 --- a/branches/sage/cephmds2/config.cc +++ b/branches/sage/cephmds2/config.cc @@ -176,6 +176,8 @@ md_config_t g_conf = { mds_log_before_reply: true, mds_log_flush_on_shutdown: true, mds_log_import_map_interval: 1024*1024, // frequency (in bytes) of EImportMap in log + mds_log_eopen_size: 100, + mds_bal_replicate_threshold: 2000, mds_bal_unreplicate_threshold: 0,//500, mds_bal_hash_rd: 10000, diff --git a/branches/sage/cephmds2/config.h b/branches/sage/cephmds2/config.h index 8d01d06b4ed0b..11bc3c2827048 100644 --- a/branches/sage/cephmds2/config.h +++ b/branches/sage/cephmds2/config.h @@ -168,6 +168,7 @@ struct md_config_t { bool mds_log_before_reply; bool mds_log_flush_on_shutdown; off_t mds_log_import_map_interval; + int mds_log_eopen_size; float mds_bal_replicate_threshold; float mds_bal_unreplicate_threshold; diff --git a/branches/sage/cephmds2/mds/AnchorClient.cc b/branches/sage/cephmds2/mds/AnchorClient.cc index 7b6eac2d8df1b..d7bfb655f06d8 100644 --- a/branches/sage/cephmds2/mds/AnchorClient.cc +++ b/branches/sage/cephmds2/mds/AnchorClient.cc @@ -195,7 +195,7 @@ void AnchorClient::handle_anchor_reply(class MAnchor *m) // kick any waiters if (ack_waiters.count(atid)) { dout(15) << "kicking waiters on atid " << atid << endl; - mds->queue_finished(ack_waiters[atid]); + mds->queue_waiters(ack_waiters[atid]); ack_waiters.erase(atid); } } diff --git a/branches/sage/cephmds2/mds/CDir.cc b/branches/sage/cephmds2/mds/CDir.cc index 6a754a5f50bd8..b13359fb916a1 100644 --- a/branches/sage/cephmds2/mds/CDir.cc +++ b/branches/sage/cephmds2/mds/CDir.cc @@ -452,7 +452,7 @@ void CDir::finish_waiting(int mask, int result) list finished; take_waiting(mask, finished); //finish_contexts(finished, result); - cache->mds->queue_finished(finished); + cache->mds->queue_waiters(finished); } @@ -944,7 +944,7 @@ void CDir::_committed(version_t v) map >::iterator n = p; n++; if (p->first > committed_version) break; // haven't committed this far yet. - cache->mds->queue_finished(p->second); + cache->mds->queue_waiters(p->second); waiting_for_commit.erase(p); p = n; } @@ -1048,7 +1048,7 @@ void CDir::set_dir_auth(pair a, bool iamauth) if (was_ambiguous && dir_auth.second == CDIR_AUTH_UNKNOWN) { list ls; take_waiting(WAIT_SINGLEAUTH, ls); - cache->mds->queue_finished(ls); + cache->mds->queue_waiters(ls); } } diff --git a/branches/sage/cephmds2/mds/CInode.h b/branches/sage/cephmds2/mds/CInode.h index a1021d3bb81e1..7ecc8ac292e41 100644 --- a/branches/sage/cephmds2/mds/CInode.h +++ b/branches/sage/cephmds2/mds/CInode.h @@ -63,6 +63,7 @@ class CInode : public MDSCacheObject { static const int PIN_UNANCHORING = 13; static const int PIN_OPENINGDIR = 14; static const int PIN_REMOTEPARENT = 15; + static const int PIN_BATCHOPENJOURNAL = 16; const char *pin_name(int p) { switch (p) { @@ -76,6 +77,7 @@ class CInode : public MDSCacheObject { case PIN_UNANCHORING: return "unanchoring"; case PIN_OPENINGDIR: return "openingdir"; case PIN_REMOTEPARENT: return "remoteparent"; + case PIN_BATCHOPENJOURNAL: return "batchopenjournal"; default: return generic_pin_name(p); } } diff --git a/branches/sage/cephmds2/mds/ClientMap.h b/branches/sage/cephmds2/mds/ClientMap.h index 065cd684a5dee..6100b38beaffc 100644 --- a/branches/sage/cephmds2/mds/ClientMap.h +++ b/branches/sage/cephmds2/mds/ClientMap.h @@ -64,7 +64,9 @@ public: private: // effects version hash_map client_inst; - set sessions; + set sessions; + set opening; + set closing; public: bool empty() { @@ -77,15 +79,21 @@ public: } const set& get_session_set() { return sessions; } + bool is_opening(int c) { return opening.count(c); } + void add_opening(int c) { opening.insert(c); } + bool is_closing(int c) { return closing.count(c); } + void add_closing(int c) { closing.insert(c); } bool have_session(int client) { return client_inst.count(client); } - void add_session(const entity_inst_t& inst) { + void open_session(const entity_inst_t& inst) { + opening.erase(inst.name.num()); client_inst[inst.name.num()] = inst; sessions.insert(inst.name.num()); version++; } - void rem_session(int client) { + void close_session(int client) { + closing.erase(client); sessions.erase(client); client_inst.erase(client); version++; @@ -117,7 +125,8 @@ public: map >::iterator q = waiting_for_trim.find(client); if (q != waiting_for_trim.end()) { list fls; - while (q->second.begin()->first < mintid) { + while (!q->second.empty() && + (mintid == 0 || q->second.begin()->first < mintid)) { fls.push_back(q->second.begin()->second); q->second.erase(q->second.begin()); } diff --git a/branches/sage/cephmds2/mds/MDCache.cc b/branches/sage/cephmds2/mds/MDCache.cc index b4b39b058a930..202a04564d3ec 100644 --- a/branches/sage/cephmds2/mds/MDCache.cc +++ b/branches/sage/cephmds2/mds/MDCache.cc @@ -987,7 +987,7 @@ void MDCache::handle_mds_failure(int who) // take waiters list waiters; in->take_waiting(CInode::WAIT_DIR, waiters); - mds->queue_finished(waiters); + mds->queue_waiters(waiters); dout(10) << "kicking WAIT_DIR on " << *in << endl; // remove from mds list @@ -1053,7 +1053,7 @@ void MDCache::handle_mds_recovery(int who) } // queue them up. - mds->queue_finished(waiters); + mds->queue_waiters(waiters); } void MDCache::set_recovery_set(set& s) @@ -2489,6 +2489,11 @@ bool MDCache::shutdown_pass() trim(0); dout(5) << "lru size now " << lru.lru_get_size() << endl; + // flush batching eopens, so that we can properly expire them. + mds->server->journal_opens(); // hrm, this is sort of a hack. + + // flush what we can from the log + mds->mdlog->trim(0); // SUBTREES // send all imports back to 0. @@ -2531,9 +2536,6 @@ bool MDCache::shutdown_pass() // FIXME dout(7) << "FIXME: i need to empty out stray dir contents..." << endl; - // LOG - mds->mdlog->trim(0); - // (wait for) flush log? if (g_conf.mds_log_flush_on_shutdown) { if (mds->mdlog->get_non_importmap_events()) { @@ -4077,7 +4079,7 @@ void MDCache::handle_discover_reply(MDiscoverReply *m) // finish errors directly finish_contexts(error, -ENOENT); - mds->queue_finished(finished); + mds->queue_waiters(finished); // done delete m; @@ -4304,7 +4306,7 @@ void MDCache::handle_dentry_unlink(MDentryUnlink *m) list finished; CDir *dir = add_replica_dir(in, m->straydir->get_dirfrag().frag, *m->straydir, m->get_source().num(), finished); - if (!finished.empty()) mds->queue_finished(finished); + if (!finished.empty()) mds->queue_waiters(finished); // dentry straydn = dir->add_dentry( m->straydn->get_dname(), 0, false ); diff --git a/branches/sage/cephmds2/mds/MDS.h b/branches/sage/cephmds2/mds/MDS.h index a1d6d8201c00c..d7645c322a518 100644 --- a/branches/sage/cephmds2/mds/MDS.h +++ b/branches/sage/cephmds2/mds/MDS.h @@ -138,10 +138,10 @@ class MDS : public Dispatcher { // -- waiters -- list finished_queue; - void queue_finished(Context *c) { + void queue_waiter(Context *c) { finished_queue.push_back(c); } - void queue_finished(list& ls) { + void queue_waiters(list& ls) { finished_queue.splice( finished_queue.end(), ls ); } diff --git a/branches/sage/cephmds2/mds/Migrator.cc b/branches/sage/cephmds2/mds/Migrator.cc index eebbaca6f8e33..9319b78dcd4bd 100644 --- a/branches/sage/cephmds2/mds/Migrator.cc +++ b/branches/sage/cephmds2/mds/Migrator.cc @@ -259,7 +259,7 @@ void Migrator::handle_mds_failure(int who) mds->locker->dentry_anon_rdlock_trace_finish(trace); // wake up any waiters - mds->queue_finished(export_finish_waiters[dir]); + mds->queue_waiters(export_finish_waiters[dir]); export_finish_waiters.erase(dir); // send pending import_maps? (these need to go out when all exports have finished.) @@ -1170,7 +1170,7 @@ void Migrator::export_finish(CDir *dir) export_notify_ack_waiting.erase(dir); // queue finishers - mds->queue_finished(export_finish_waiters[dir]); + mds->queue_waiters(export_finish_waiters[dir]); export_finish_waiters.erase(dir); // stats diff --git a/branches/sage/cephmds2/mds/Server.cc b/branches/sage/cephmds2/mds/Server.cc index 2dbde64253775..de5e88f494e84 100644 --- a/branches/sage/cephmds2/mds/Server.cc +++ b/branches/sage/cephmds2/mds/Server.cc @@ -91,30 +91,15 @@ void Server::dispatch(Message *m) class C_MDS_session_finish : public Context { MDS *mds; - MClientSession *m; + entity_inst_t client_inst; bool open; version_t cmapv; public: - C_MDS_session_finish(MDS *m, MClientSession *msg, bool s, version_t mv) : - mds(m), m(msg), open(s), cmapv(mv) { } + C_MDS_session_finish(MDS *m, entity_inst_t ci, bool s, version_t mv) : + mds(m), client_inst(ci), open(s), cmapv(mv) { } void finish(int r) { assert(r == 0); - - // apply - if (open) - mds->clientmap.add_session(m->get_source_inst()); - else - mds->clientmap.rem_session(m->get_source().num()); - - assert(cmapv == mds->clientmap.get_version()); - - // purge completed requests from clientmap? - if (!open) - mds->clientmap.trim_completed_requests(m->get_source().num(), 0); - - // reply - mds->messenger->send_message(new MClientSession(m->op+1), m->get_source_inst()); - delete m; + mds->server->_session_logged(client_inst, open, cmapv); } }; @@ -122,13 +107,58 @@ public: void Server::handle_client_session(MClientSession *m) { dout(3) << "handle_client_session " << *m << " from " << m->get_source() << endl; - + int from = m->get_source().num(); bool open = m->op == MClientSession::OP_OPEN; - + + if (open) { + if (mds->clientmap.is_opening(from)) { + dout(10) << "already opening, dropping this req" << endl; + delete m; + return; + } + mds->clientmap.add_opening(from); + } else { + if (mds->clientmap.is_closing(from)) { + dout(10) << "already closing, dropping this req" << endl; + delete m; + return; + } + mds->clientmap.add_closing(from); + } + // journal it version_t cmapv = mds->clientmap.inc_projected(); mdlog->submit_entry(new ESession(m->get_source_inst(), open, cmapv), - new C_MDS_session_finish(mds, m, open, cmapv)); + new C_MDS_session_finish(mds, m->get_source_inst(), open, cmapv)); + delete m; +} + +void Server::_session_logged(entity_inst_t client_inst, bool open, version_t cmapv) +{ + dout(10) << "_session_logged " << client_inst << " " << (open ? "open":"close") + << " " << cmapv + << endl; + + // apply + int from = client_inst.name.num(); + if (open) { + assert(mds->clientmap.is_opening(from)); + mds->clientmap.open_session(client_inst); + } else { + assert(mds->clientmap.is_closing(from)); + mds->clientmap.close_session(from); + + // purge completed requests from clientmap + mds->clientmap.trim_completed_requests(from, 0); + } + + assert(cmapv == mds->clientmap.get_version()); + + // reply + if (open) + mds->messenger->send_message(new MClientSession(MClientSession::OP_OPEN_ACK), client_inst); + else + mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE_ACK), client_inst); } @@ -137,19 +167,16 @@ void Server::terminate_sessions() dout(2) << "terminate_sessions" << endl; // kill them off. clients will retry etc. - while (!mds->clientmap.get_session_set().empty()) { - int client = *mds->clientmap.get_session_set().begin(); - dout(10) << "terminating session for client" << client << endl; - - mds->messenger->send_message(new MClientSession(MClientSession::OP_CLOSE_ACK), - mds->clientmap.get_inst(client)); - mds->clientmap.rem_session(client); - - // trim requests - mds->clientmap.trim_completed_requests(client, 0); + for (set::const_iterator p = mds->clientmap.get_session_set().begin(); + p != mds->clientmap.get_session_set().end(); + ++p) { + if (mds->clientmap.is_closing(*p)) + continue; + mds->clientmap.add_closing(*p); + version_t cmapv = mds->clientmap.inc_projected(); + mdlog->submit_entry(new ESession(mds->clientmap.get_inst(*p), false, cmapv), + new C_MDS_session_finish(mds, mds->clientmap.get_inst(*p), false, cmapv)); } - - // FIXME hrm, should i journal this? } @@ -178,24 +205,35 @@ void Server::handle_client_reconnect(MClientReconnect *m) dout(7) << "handle_client_reconnect " << m->get_source() << endl; int from = m->get_source().num(); - // caps - for (map::iterator p = m->inode_caps.begin(); - p != m->inode_caps.end(); - ++p) { - CInode *in = mdcache->get_inode(p->first); - if (!in) { - dout(0) << "missing " << p->first << ", fetching via " << m->inode_path[p->first] << endl; - assert(0); - continue; - } + if (m->closed) { + dout(7) << " client had no session, removing from clientmap" << endl; - dout(10) << " client cap " << cap_string(p->second.wanted) - << " seq " << p->second.seq - << " on " << *in << endl; - Capability cap(p->second.wanted, p->second.seq); - in->add_client_cap(from, cap); + mds->clientmap.add_closing(from); + version_t cmapv = mds->clientmap.inc_projected(); + mdlog->submit_entry(new ESession(mds->clientmap.get_inst(from), false, cmapv), + new C_MDS_session_finish(mds, mds->clientmap.get_inst(from), false, cmapv)); - reconnected_open_files.insert(in); + } else { + + // caps + for (map::iterator p = m->inode_caps.begin(); + p != m->inode_caps.end(); + ++p) { + CInode *in = mdcache->get_inode(p->first); + if (!in) { + dout(0) << "missing " << p->first << ", fetching via " << m->inode_path[p->first] << endl; + assert(0); + continue; + } + + dout(10) << " client cap " << cap_string(p->second.wanted) + << " seq " << p->second.seq + << " on " << *in << endl; + Capability cap(p->second.wanted, p->second.seq); + in->add_client_cap(from, cap); + + reconnected_open_files.insert(in); + } } // remove from gather set @@ -1727,6 +1765,8 @@ void Server::_unlink_local(MDRequest *mdr, CDentry *dn) C_MDS_unlink_local_finish *fin = new C_MDS_unlink_local_finish(mds, mdr, dn, straydn, ipv, pi->ctime); + journal_opens(); // journal pending opens, just in case + // log + wait mdlog->submit_entry(le); mdlog->wait_for_sync(fin); @@ -2224,6 +2264,8 @@ void Server::_rename_local(MDRequest *mdr, C_MDS_rename_local_finish *fin = new C_MDS_rename_local_finish(mds, mdr, srcdn, destdn, straydn, ipv, pi ? pi->ctime:utime_t()); + + journal_opens(); // journal pending opens, just in case if (anchorfin) { // doing anchor update prepare first @@ -2681,12 +2723,62 @@ void Server::_do_open(MDRequest *mdr, CInode *cur) // journal? if (cur->last_open_journaled == 0) { - cur->last_open_journaled = mdlog->get_write_pos(); - mdlog->submit_entry(new EOpen(cur)); + queue_journal_open(cur); + maybe_journal_opens(); } } +void Server::queue_journal_open(CInode *in) +{ + dout(10) << "queue_journal_open on " << *in << endl; + + // pin so our pointer stays valid + in->get(CInode::PIN_BATCHOPENJOURNAL); + + // queue it up for a bit + journal_open_queue.insert(in); +} + + +void Server::journal_opens() +{ + dout(10) << "journal_opens " << journal_open_queue.size() << " inodes" << endl; + if (journal_open_queue.empty()) return; + + EOpen *le = 0; + + // check queued inodes + for (set::iterator p = journal_open_queue.begin(); + p != journal_open_queue.end(); + ++p) { + (*p)->put(CInode::PIN_BATCHOPENJOURNAL); + if ((*p)->is_any_caps()) { + if (!le) le = new EOpen; + le->add_inode(*p); + (*p)->last_open_journaled = mds->mdlog->get_write_pos(); + } + } + journal_open_queue.clear(); + + if (le) { + // journal + mds->mdlog->submit_entry(le); + + // add waiters to journal entry + for (list::iterator p = journal_open_waiters.begin(); + p != journal_open_waiters.end(); + ++p) + mds->mdlog->wait_for_sync(*p); + journal_open_waiters.clear(); + } else { + // nothing worth journaling here, just kick the waiters. + mds->queue_waiters(journal_open_waiters); + } +} + + + class C_MDS_open_truncate_purged : public Context { MDS *mds; diff --git a/branches/sage/cephmds2/mds/Server.h b/branches/sage/cephmds2/mds/Server.h index 2cd8a073e64ff..24a7d19d13922 100644 --- a/branches/sage/cephmds2/mds/Server.h +++ b/branches/sage/cephmds2/mds/Server.h @@ -43,6 +43,7 @@ public: set reconnected_open_files; void handle_client_session(class MClientSession *m); + void _session_logged(entity_inst_t ci, bool open, version_t cmapv); void reconnect_clients(); void handle_client_reconnect(class MClientReconnect *m); void client_reconnect_failure(int from); @@ -84,6 +85,18 @@ public: void handle_client_opent(MDRequest *mdr); // O_TRUNC variant. void _do_open(MDRequest *mdr, CInode *ref); + set journal_open_queue; // to be journal + list journal_open_waiters; + void queue_journal_open(CInode *in); + void add_journal_open_waiter(Context *c) { + journal_open_waiters.push_back(c); + } + void maybe_journal_opens() { + if (journal_open_queue.size() >= (unsigned)g_conf.mds_log_eopen_size) + journal_opens(); + } + void journal_opens(); + // namespace changes void handle_client_mknod(MDRequest *mdr); void handle_client_mkdir(MDRequest *mdr); diff --git a/branches/sage/cephmds2/mds/events/EOpen.h b/branches/sage/cephmds2/mds/events/EOpen.h index 6af9dc61810d6..3ddfabf330f15 100644 --- a/branches/sage/cephmds2/mds/events/EOpen.h +++ b/branches/sage/cephmds2/mds/events/EOpen.h @@ -20,23 +20,27 @@ class EOpen : public LogEvent { public: EMetaBlob metablob; - inodeno_t ino; + list inos; EOpen() : LogEvent(EVENT_OPEN) { } - EOpen(CInode *in) : LogEvent(EVENT_OPEN), - ino(in->ino()) { - metablob.add_primary_dentry(in->get_parent_dn(), false); + EOpen(CInode *in) : LogEvent(EVENT_OPEN) { + add_inode(in); } void print(ostream& out) { - out << "EOpen " << ino << " " << metablob; + out << "EOpen " << metablob; + } + + void add_inode(CInode *in) { + inos.push_back(in->ino()); + metablob.add_primary_dentry(in->get_parent_dn(), false); } void encode_payload(bufferlist& bl) { - ::_encode(ino, bl); + ::_encode(inos, bl); metablob._encode(bl); } void decode_payload(bufferlist& bl, int& off) { - ::_decode(ino, bl, off); + ::_decode(inos, bl, off); metablob._decode(bl, off); } diff --git a/branches/sage/cephmds2/mds/journal.cc b/branches/sage/cephmds2/mds/journal.cc index 200e3168f0438..a298ee8cb8520 100644 --- a/branches/sage/cephmds2/mds/journal.cc +++ b/branches/sage/cephmds2/mds/journal.cc @@ -35,6 +35,7 @@ #include "MDS.h" #include "MDLog.h" #include "MDCache.h" +#include "Server.h" #include "Migrator.h" #include "AnchorTable.h" #include "AnchorClient.h" @@ -504,9 +505,9 @@ void ESession::replay(MDS *mds) { dout(10) << "ESession.replay" << endl; if (open) - mds->clientmap.add_session(client_inst); + mds->clientmap.open_session(client_inst); else - mds->clientmap.rem_session(client_inst.name.num()); + mds->clientmap.close_session(client_inst.name.num()); mds->clientmap.reset_projected(); // make it follow version. } @@ -669,42 +670,45 @@ void EUpdate::replay(MDS *mds) bool EOpen::has_expired(MDS *mds) { - CInode *in = mds->mdcache->get_inode(ino); - if (!in) return true; - if (!in->is_any_caps()) return true; - if (in->last_open_journaled > get_start_off() || - in->last_open_journaled == 0) return true; - return false; + for (list::iterator p = inos.begin(); p != inos.end(); ++p) { + CInode *in = mds->mdcache->get_inode(*p); + if (in && + in->is_any_caps() && + !(in->last_open_journaled > get_start_off() || + in->last_open_journaled == 0)) { + dout(10) << "EOpen.has_expired still refer to caps on " << *in << endl; + return false; + } + } + return true; } void EOpen::expire(MDS *mds, Context *c) { - CInode *in = mds->mdcache->get_inode(ino); - assert(in); - - dout(10) << "EOpen.expire " << ino - << " last_open_journaled " << in->last_open_journaled << endl; + dout(10) << "EOpen.expire " << endl; - // wait? - // FIXME this is stupid. - if (in->last_open_journaled == get_start_off()) { - //|| - //(get_start_off() < mds->mdlog->last_import_map && - //in->last_open_journaled < mds->mdlog->last_import_map)) { - dout(10) << "waiting." << endl; - // wait - mds->mdlog->add_import_map_expire_waiter(c); - } else { - // rejournal now. - dout(10) << "rejournaling" << endl; - in->last_open_journaled = mds->mdlog->get_write_pos(); - mds->mdlog->submit_entry(new EOpen(in)); + if (mds->mdlog->is_capped()) { + dout(0) << "uh oh, log is capped, but i have unexpired opens." << endl; + assert(0); + } + + for (list::iterator p = inos.begin(); p != inos.end(); ++p) { + CInode *in = mds->mdcache->get_inode(*p); + if (!in) continue; + if (!in->is_any_caps()) continue; + + dout(10) << "EOpen.expire " << in->ino() + << " last_open_journaled " << in->last_open_journaled << endl; + + mds->server->queue_journal_open(in); } + mds->server->add_journal_open_waiter(c); + mds->server->maybe_journal_opens(); } void EOpen::replay(MDS *mds) { - dout(10) << "EOpen.replay " << ino << endl; + dout(10) << "EOpen.replay " << endl; metablob.replay(mds); } diff --git a/branches/sage/cephmds2/messages/MClientReconnect.h b/branches/sage/cephmds2/messages/MClientReconnect.h index 0b612d62dfdc6..ef6a0dc96bcc3 100644 --- a/branches/sage/cephmds2/messages/MClientReconnect.h +++ b/branches/sage/cephmds2/messages/MClientReconnect.h @@ -29,8 +29,10 @@ public: map inode_caps; map inode_path; + bool closed; - MClientReconnect() : Message(MSG_CLIENT_RECONNECT) { } + MClientReconnect() : Message(MSG_CLIENT_RECONNECT), + closed(false) { } char *get_type_name() { return "client_reconnect"; } void print(ostream& out) { @@ -48,11 +50,13 @@ public: } void encode_payload() { + ::_encode(closed, payload); ::_encode(inode_caps, payload); ::_encode(inode_path, payload); } void decode_payload() { int off = 0; + ::_decode(closed, payload, off); ::_decode(inode_caps, payload, off); ::_decode(inode_path, payload, off); } diff --git a/branches/sage/cephmds2/mon/ClientMonitor.cc b/branches/sage/cephmds2/mon/ClientMonitor.cc index 2e3b5282364bb..055370c971c48 100644 --- a/branches/sage/cephmds2/mon/ClientMonitor.cc +++ b/branches/sage/cephmds2/mon/ClientMonitor.cc @@ -59,7 +59,7 @@ void ClientMonitor::handle_client_mount(MClientMount *m) (client_map.count(from) && client_map[from] != m->get_source_addr())) { from = num_clients++; - dout(10) << "client_boot assigned client" << from << endl; + dout(10) << "client_mount assigned client" << from << endl; } client_map[from] = m->get_source_addr(); diff --git a/branches/sage/cephmds2/msg/SimpleMessenger.cc b/branches/sage/cephmds2/msg/SimpleMessenger.cc index 1b15c85e2d3cc..ca702b62ed363 100644 --- a/branches/sage/cephmds2/msg/SimpleMessenger.cc +++ b/branches/sage/cephmds2/msg/SimpleMessenger.cc @@ -1016,6 +1016,7 @@ void Rank::wait() lock.Unlock(); dout(10) << "wait: done." << endl; + dout(1) << "shutdown complete." << endl; } @@ -1101,10 +1102,10 @@ int Rank::EntityMessenger::shutdown() // stop my dispatch thread if (dispatch_thread.am_self()) { - dout(1) << "shutdown i am dispatch, setting stop flag" << endl; + dout(10) << "shutdown i am dispatch, setting stop flag" << endl; stop = true; } else { - dout(1) << "shutdown i am not dispatch, setting stop flag and joining thread." << endl; + dout(10) << "shutdown i am not dispatch, setting stop flag and joining thread." << endl; lock.Lock(); stop = true; cond.Signal(); -- 2.39.5