From b3ffcce6d92b21a9dd313b7d0df78c6a4070a876 Mon Sep 17 00:00:00 2001 From: sageweil Date: Wed, 4 Apr 2007 20:07:32 +0000 Subject: [PATCH] * prelim work for EOpen journaling * cache rejoin rewrite * export dir prep cleanup * thread cleanup git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1336 29311d96-e01e-0410-9327-a35deaab8ce9 --- branches/sage/cephmds2/TODO | 10 + branches/sage/cephmds2/common/Thread.h | 46 ++- branches/sage/cephmds2/common/Timer.h | 4 - branches/sage/cephmds2/config.cc | 2 +- branches/sage/cephmds2/mds/CInode.cc | 34 +- branches/sage/cephmds2/mds/CInode.h | 24 +- branches/sage/cephmds2/mds/FileLock.h | 31 +- branches/sage/cephmds2/mds/Locker.cc | 1 - branches/sage/cephmds2/mds/LogEvent.cc | 46 ++- branches/sage/cephmds2/mds/LogEvent.h | 25 +- branches/sage/cephmds2/mds/MDCache.cc | 372 ++++++++++++++---- branches/sage/cephmds2/mds/MDCache.h | 16 +- branches/sage/cephmds2/mds/MDLog.cc | 3 +- branches/sage/cephmds2/mds/MDLog.h | 6 +- branches/sage/cephmds2/mds/Migrator.cc | 21 +- branches/sage/cephmds2/mds/Server.cc | 8 + branches/sage/cephmds2/mds/SimpleLock.h | 13 +- branches/sage/cephmds2/mds/events/EOpen.h | 48 +++ branches/sage/cephmds2/mds/events/EUnlink.h | 71 ---- branches/sage/cephmds2/mds/journal.cc | 105 ++--- .../sage/cephmds2/messages/MExportDirPrep.h | 24 +- .../sage/cephmds2/messages/MMDSCacheRejoin.h | 164 +++++++- branches/sage/cephmds2/msg/Message.cc | 4 +- branches/sage/cephmds2/msg/Message.h | 2 +- 24 files changed, 721 insertions(+), 359 deletions(-) create mode 100644 branches/sage/cephmds2/mds/events/EOpen.h delete mode 100644 branches/sage/cephmds2/mds/events/EUnlink.h diff --git a/branches/sage/cephmds2/TODO b/branches/sage/cephmds2/TODO index 8f705b5521d55..3ef48543d45bb 100644 --- a/branches/sage/cephmds2/TODO +++ b/branches/sage/cephmds2/TODO @@ -54,6 +54,16 @@ mds - handled by individual MDSCacheObject _finish()'s - properly recover lock state on rejoin... + - recovering mds rejoins replicas it pulled out of its journal + - replicas will tell it when they hold an xlock + - surviving mds rejoins replicas from a recovering mds + - will tell auth if it holds an xlock +- recovering open files + - need to journal EOpen + - track openness in hierarchy, so we can tell when to expire vs rejournal EOpen metablobs + - need to batch EOpen events when rejournaling to avoid looping + - recovery will either have inode (from EOpen), or will provide path+cap to reassert open state. + - path+cap window will require fetching and metadata from disk before doing the rejoin - clientmap request history - hmm, so we need completion codes, but only after recovery. diff --git a/branches/sage/cephmds2/common/Thread.h b/branches/sage/cephmds2/common/Thread.h index 8565ce9effd92..2d8a393498479 100644 --- a/branches/sage/cephmds2/common/Thread.h +++ b/branches/sage/cephmds2/common/Thread.h @@ -16,6 +16,7 @@ #define __THREAD_H #include +#include class Thread { private: @@ -25,9 +26,7 @@ class Thread { Thread() : thread_id(0) {} virtual ~Thread() {} - pthread_t &get_thread_id() { return thread_id; } - bool is_started() { return thread_id != 0; } - + protected: virtual void *entry() = 0; private: @@ -36,27 +35,42 @@ class Thread { } public: + pthread_t &get_thread_id() { return thread_id; } + bool is_started() { return thread_id != 0; } + bool am_self() { return (pthread_self() == thread_id); } + int create() { return pthread_create( &thread_id, NULL, _entry_func, (void*)this ); } - - bool am_self() { - return (pthread_self() == thread_id); - } - int join(void **prval = 0) { - assert(thread_id); - //if (thread_id == 0) return -1; // never started. - + if (thread_id == 0) { + cerr << "WARNING: join on thread that was never started" << endl; + //assert(0); + return -EINVAL; // never started. + } + int status = pthread_join(thread_id, prval); - if (status == 0) - thread_id = 0; - else { - cout << "join status = " << status << endl; - assert(0); + if (status != 0) { + switch (status) { + case -EINVAL: + cerr << "thread " << thread_id << " join status = EINVAL" << endl; + break; + case -ESRCH: + cerr << "thread " << thread_id << " join status = ESRCH" << endl; + assert(0); + break; + case -EDEADLK: + cerr << "thread " << thread_id << " join status = EDEADLK" << endl; + break; + default: + cerr << "thread " << thread_id << " join status = " << status << endl; + } + assert(0); // none of these should happen. } + thread_id = 0; return status; } + }; #endif diff --git a/branches/sage/cephmds2/common/Timer.h b/branches/sage/cephmds2/common/Timer.h index 88d9929ac5ae1..80470c3615737 100644 --- a/branches/sage/cephmds2/common/Timer.h +++ b/branches/sage/cephmds2/common/Timer.h @@ -53,15 +53,11 @@ class Timer { map< utime_t, set > scheduled; // time -> (context ...) hash_map< Context*, utime_t > event_times; // event -> time - // get time of the next event - //Context* get_next_scheduled(utime_t& when); - bool get_next_due(utime_t &when); void register_timer(); // make sure i get a callback void cancel_timer(); // make sure i get a callback - //pthread_t thread_id; bool thread_stop; Mutex lock; bool timed_sleep; diff --git a/branches/sage/cephmds2/config.cc b/branches/sage/cephmds2/config.cc index ddbc606b726c1..50937c5edc294 100644 --- a/branches/sage/cephmds2/config.cc +++ b/branches/sage/cephmds2/config.cc @@ -164,7 +164,7 @@ md_config_t g_conf = { mds_decay_halflife: 30, mds_beacon_interval: 5.0, - mds_beacon_grace: 10.0, + mds_beacon_grace: 15.0, mds_log: true, mds_log_max_len: MDS_CACHE_SIZE / 3, diff --git a/branches/sage/cephmds2/mds/CInode.cc b/branches/sage/cephmds2/mds/CInode.cc index 7496852637c48..a0ee7cb0251d9 100644 --- a/branches/sage/cephmds2/mds/CInode.cc +++ b/branches/sage/cephmds2/mds/CInode.cc @@ -94,33 +94,6 @@ void CInode::print(ostream& out) // ====== CInode ======= -CInode::CInode(MDCache *c, bool auth) : - authlock(this, LOCK_OTYPE_IAUTH, WAIT_AUTHLOCK_OFFSET), - linklock(this, LOCK_OTYPE_ILINK, WAIT_LINKLOCK_OFFSET), - dirfragtreelock(this, LOCK_OTYPE_IDIRFRAGTREE, WAIT_DIRFRAGTREELOCK_OFFSET), - filelock(this, LOCK_OTYPE_IFILE, WAIT_FILELOCK_OFFSET) -{ - mdcache = c; - - //num_parents = 0; - parent = NULL; - - auth_pins = 0; - nested_auth_pins = 0; - //num_request_pins = 0; - - state = 0; - - if (auth) state_set(STATE_AUTH); -} - -CInode::~CInode() { - for (map::iterator p = dirfrags.begin(); - p != dirfrags.end(); - ++p) - delete p->second; -} - // dirfrags @@ -135,13 +108,15 @@ frag_t CInode::pick_dirfrag(const string& dn) void CInode::get_dirfrags(list& ls) { + // all dirfrags for (map::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) ls.push_back(p->second); } void CInode::get_nested_dirfrags(list& ls) -{ // same subtree +{ + // dirfrags in same subtree for (map::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) @@ -149,7 +124,8 @@ void CInode::get_nested_dirfrags(list& ls) ls.push_back(p->second); } void CInode::get_subtree_dirfrags(list& ls) -{ // new subtree +{ + // dirfrags that are roots of new subtrees for (map::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) diff --git a/branches/sage/cephmds2/mds/CInode.h b/branches/sage/cephmds2/mds/CInode.h index 4f600f6cfb963..a1021d3bb81e1 100644 --- a/branches/sage/cephmds2/mds/CInode.h +++ b/branches/sage/cephmds2/mds/CInode.h @@ -55,7 +55,7 @@ class CInode : public MDSCacheObject { //static const int PIN_REPLICATED = 1; static const int PIN_DIR = 2; static const int PIN_PROXY = 5; // can't expire yet - static const int PIN_CAPS = 7; // local fh's + static const int PIN_CAPS = 7; // client caps static const int PIN_AUTHPIN = 8; static const int PIN_IMPORTING = -9; // importing static const int PIN_RENAMESRC = 11; // pinned on dest for foreign rename @@ -122,11 +122,12 @@ class CInode : public MDSCacheObject { string symlink; // symlink dest, if symlink fragtree_t dirfragtree; // dir frag tree, if any - frag_t pick_dirfrag(const string &dn); + off_t last_open_journaled; // log offset for the last journaled EOpen // -- cache infrastructure -- map dirfrags; // cached dir fragments + frag_t pick_dirfrag(const string &dn); CDir* get_dirfrag(frag_t fg) { if (dirfrags.count(fg)) return dirfrags[fg]; @@ -175,8 +176,23 @@ protected: public: // --------------------------- - CInode(MDCache *c, bool auth=true); - ~CInode(); + CInode(MDCache *c, bool auth=true) : + mdcache(c), + last_open_journaled(0), + parent(0), + replica_caps_wanted(0), + auth_pins(0), nested_auth_pins(0), + authlock(this, LOCK_OTYPE_IAUTH, WAIT_AUTHLOCK_OFFSET), + linklock(this, LOCK_OTYPE_ILINK, WAIT_LINKLOCK_OFFSET), + dirfragtreelock(this, LOCK_OTYPE_IDIRFRAGTREE, WAIT_DIRFRAGTREELOCK_OFFSET), + filelock(this, LOCK_OTYPE_IFILE, WAIT_FILELOCK_OFFSET) + { + state = 0; + if (auth) state_set(STATE_AUTH); + }; + ~CInode() { + close_dirfrags(); + } // -- accessors -- diff --git a/branches/sage/cephmds2/mds/FileLock.h b/branches/sage/cephmds2/mds/FileLock.h index 192f0b9b38f50..cef3384010c0a 100644 --- a/branches/sage/cephmds2/mds/FileLock.h +++ b/branches/sage/cephmds2/mds/FileLock.h @@ -28,22 +28,22 @@ using namespace std; // C = cache reads, R = read, W = write, A = append, B = buffer writes, L = lazyio // -----auth-------- ---replica------- -#define LOCK_SYNC_ 0 // AR R . / C R . . . L R . / C R . . . L stat() -#define LOCK_GSYNCL -11 // A . . / C ? . . . L loner -> sync (*) FIXME: let old loner keep R, somehow... -#define LOCK_GSYNCM -12 // A . . / . R . . . L +#define LOCK_SYNC_ 1 // AR R . / C R . . . L R . / C R . . . L stat() +#define LOCK_GSYNCL -12 // A . . / C ? . . . L loner -> sync (*) FIXME: let old loner keep R, somehow... +#define LOCK_GSYNCM -13 // A . . / . R . . . L -#define LOCK_LOCK_ 1 // AR R W / C . . . . . . . / C . . . . . truncate() -#define LOCK_GLOCKR_ -2 // AR R . / C . . . . . . . / C . . . . . -#define LOCK_GLOCKL -3 // A . . / . . . . . . loner -> lock -#define LOCK_GLOCKM -4 // A . . / . . . . . . +#define LOCK_LOCK_ 2 // AR R W / C . . . . . . . / C . . . . . truncate() +#define LOCK_GLOCKR_ -3 // AR R . / C . . . . . . . / C . . . . . +#define LOCK_GLOCKL -4 // A . . / . . . . . . loner -> lock +#define LOCK_GLOCKM -5 // A . . / . . . . . . -#define LOCK_MIXED 5 // AR . . / . R W A . L . . / . R . . . L -#define LOCK_GMIXEDR -6 // AR R . / . R . . . L . . / . R . . . L -#define LOCK_GMIXEDL -7 // A . . / . . . . . L loner -> mixed +#define LOCK_MIXED 6 // AR . . / . R W A . L . . / . R . . . L +#define LOCK_GMIXEDR -7 // AR R . / . R . . . L . . / . R . . . L +#define LOCK_GMIXEDL -8 // A . . / . . . . . L loner -> mixed -#define LOCK_LONER 8 // A . . / C R W A B L (lock) -#define LOCK_GLONERR -9 // A . . / . R . . . L -#define LOCK_GLONERM -10 // A . . / . R W A . L +#define LOCK_LONER 9 // A . . / C R W A B L (lock) +#define LOCK_GLONERR -10 // A . . / . R . . . L +#define LOCK_GLONERM -11 // A . . / . R W A . L // 4 stable @@ -207,8 +207,9 @@ class FileLock : public SimpleLock { inline ostream& operator<<(ostream& out, FileLock& l) { - out << "(" << get_lock_type_name(l.get_type()) - << " " << get_filelock_state_name(l.get_state()); + out << "("; + //out << get_lock_type_name(l.get_type()) << " "; + out << get_filelock_state_name(l.get_state()); if (!l.get_gather_set().empty()) out << " g=" << l.get_gather_set(); if (l.get_num_rdlock()) out << " r=" << l.get_num_rdlock(); diff --git a/branches/sage/cephmds2/mds/Locker.cc b/branches/sage/cephmds2/mds/Locker.cc index c6977e8a1a373..68d25a94078b6 100644 --- a/branches/sage/cephmds2/mds/Locker.cc +++ b/branches/sage/cephmds2/mds/Locker.cc @@ -26,7 +26,6 @@ #include "events/EString.h" #include "events/EUpdate.h" -#include "events/EUnlink.h" #include "msg/Messenger.h" diff --git a/branches/sage/cephmds2/mds/LogEvent.cc b/branches/sage/cephmds2/mds/LogEvent.cc index 43ac0ec964734..de0e6eeb3afb8 100644 --- a/branches/sage/cephmds2/mds/LogEvent.cc +++ b/branches/sage/cephmds2/mds/LogEvent.cc @@ -17,21 +17,25 @@ // events i know of #include "events/EString.h" -#include "events/EImportMap.h" -#include "events/EMetaBlob.h" -#include "events/EUpdate.h" -#include "events/ESlaveUpdate.h" -#include "events/EUnlink.h" + #include "events/EMount.h" #include "events/EClientMap.h" -#include "events/EAnchor.h" -#include "events/EAnchorClient.h" -#include "events/EAlloc.h" -#include "events/EPurgeFinish.h" +#include "events/EImportMap.h" #include "events/EExport.h" #include "events/EImportStart.h" #include "events/EImportFinish.h" +#include "events/EUpdate.h" +#include "events/ESlaveUpdate.h" +#include "events/EOpen.h" + +#include "events/EAlloc.h" +#include "events/EPurgeFinish.h" + +#include "events/EAnchor.h" +#include "events/EAnchorClient.h" + + LogEvent *LogEvent::decode(bufferlist& bl) { // parse type, length @@ -48,20 +52,24 @@ LogEvent *LogEvent::decode(bufferlist& bl) // create event LogEvent *le; switch (type) { - case EVENT_STRING: le = new EString(); break; + case EVENT_STRING: le = new EString; break; + + case EVENT_MOUNT: le = new EMount; break; + case EVENT_CLIENTMAP: le = new EClientMap; break; case EVENT_IMPORTMAP: le = new EImportMap; break; - case EVENT_UPDATE: le = new EUpdate; break; - case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break; - case EVENT_UNLINK: le = new EUnlink(); break; - case EVENT_PURGEFINISH: le = new EPurgeFinish(); break; - case EVENT_MOUNT: le = new EMount(); break; - case EVENT_CLIENTMAP: le = new EClientMap(); break; - case EVENT_ANCHOR: le = new EAnchor(); break; - case EVENT_ANCHORCLIENT: le = new EAnchorClient(); break; - case EVENT_ALLOC: le = new EAlloc(); break; case EVENT_EXPORT: le = new EExport; break; case EVENT_IMPORTSTART: le = new EImportStart; break; case EVENT_IMPORTFINISH: le = new EImportFinish; break; + + case EVENT_UPDATE: le = new EUpdate; break; + case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break; + case EVENT_OPEN: le = new EOpen; break; + + case EVENT_ALLOC: le = new EAlloc; break; + case EVENT_PURGEFINISH: le = new EPurgeFinish; break; + + case EVENT_ANCHOR: le = new EAnchor; break; + case EVENT_ANCHORCLIENT: le = new EAnchorClient; break; default: dout(1) << "uh oh, unknown log event type " << type << endl; assert(0); diff --git a/branches/sage/cephmds2/mds/LogEvent.h b/branches/sage/cephmds2/mds/LogEvent.h index 8138ab2008455..f4791dbc7ddca 100644 --- a/branches/sage/cephmds2/mds/LogEvent.h +++ b/branches/sage/cephmds2/mds/LogEvent.h @@ -16,29 +16,20 @@ #define EVENT_STRING 1 -#define EVENT_INODEUPDATE 2 -#define EVENT_DIRUPDATE 3 - -#define EVENT_IMPORTMAP 4 -#define EVENT_UPDATE 5 -#define EVENT_SLAVEUPDATE 6 - #define EVENT_MOUNT 7 #define EVENT_CLIENTMAP 8 +#define EVENT_IMPORTMAP 2 +#define EVENT_EXPORT 30 +#define EVENT_IMPORTSTART 31 +#define EVENT_IMPORTFINISH 32 -#define EVENT_ALLOC 10 -#define EVENT_MKNOD 11 -#define EVENT_MKDIR 12 -#define EVENT_LINK 13 +#define EVENT_UPDATE 3 +#define EVENT_SLAVEUPDATE 4 +#define EVENT_OPEN 5 -#define EVENT_UNLINK 20 -#define EVENT_RMDIR 21 +#define EVENT_ALLOC 10 #define EVENT_PURGEFINISH 22 -#define EVENT_EXPORT 30 -#define EVENT_IMPORTSTART 31 -#define EVENT_IMPORTFINISH 32 - #define EVENT_ANCHOR 40 #define EVENT_ANCHORCLIENT 41 diff --git a/branches/sage/cephmds2/mds/MDCache.cc b/branches/sage/cephmds2/mds/MDCache.cc index 41547e8f6dae7..044670253049b 100644 --- a/branches/sage/cephmds2/mds/MDCache.cc +++ b/branches/sage/cephmds2/mds/MDCache.cc @@ -41,14 +41,12 @@ #include "events/EUpdate.h" #include "events/ESlaveUpdate.h" #include "events/EString.h" -#include "events/EUnlink.h" #include "events/EPurgeFinish.h" #include "messages/MGenericMessage.h" #include "messages/MMDSImportMap.h" #include "messages/MMDSCacheRejoin.h" -#include "messages/MMDSCacheRejoinAck.h" #include "messages/MDiscover.h" #include "messages/MDiscoverReply.h" @@ -1303,7 +1301,7 @@ void MDCache::recalc_auth_bits() * rejoin phase! * we start out by sending rejoins to everyone in the recovery set. * - * if _were_ are rejoining, send for all regions in our cache. + * if we are rejoin, send for all regions in our cache. * if we are active|stopping, send only to nodes that are are rejoining. */ void MDCache::send_cache_rejoins() @@ -1320,12 +1318,12 @@ void MDCache::send_cache_rejoins() if (*p == mds->get_nodeid()) continue; // nothing to myself! if (mds->is_rejoin() || mds->mdsmap->is_rejoin(*p)) - rejoins[*p] = new MMDSCacheRejoin; + rejoins[*p] = new MMDSCacheRejoin(MMDSCacheRejoin::OP_REJOIN); } assert(!migrator->is_importing()); assert(!migrator->is_exporting()); - + // check all subtrees for (map >::iterator p = subtrees.begin(); p != subtrees.end(); @@ -1353,7 +1351,7 @@ void MDCache::send_cache_rejoins() } // nothing? - if (rejoins.empty()) { + if (mds->is_rejoin() && rejoins.empty()) { dout(10) << "nothing to rejoin, going active" << endl; mds->set_want_state(MDSMap::STATE_ACTIVE); } @@ -1363,8 +1361,12 @@ void MDCache::send_cache_rejoins() void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin) { dout(10) << "cache_rejoin_walk " << *dir << endl; - rejoin->add_dirfrag(dir->dirfrag()); + //if (mds->is_rejoin()) + rejoin->add_weak_dirfrag(dir->dirfrag()); + //else + //rejoin->add_strong_dirfrag(dir->dirfrag(), dir->get_replica_nonce()); + list nested; // finish this dir, then do nested items // walk dentries @@ -1372,13 +1374,43 @@ void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin) p != dir->items.end(); ++p) { // dentry - rejoin->add_dentry(dir->dirfrag(), p->first); + CDentry *dn = p->second; + if (mds->is_rejoin()) + rejoin->add_weak_dentry(dir->dirfrag(), p->first); + else { + rejoin->add_strong_dentry(dir->dirfrag(), p->first, + dn->get_replica_nonce(), + dn->lock.get_state()); + if (dn->lock.is_xlocked()) + rejoin->add_dentry_xlock(dir->dirfrag(), p->first, + dn->lock.get_xlocked_by()->reqid); + } // inode? - if (p->second->is_primary() && p->second->get_inode()) { - CInode *in = p->second->get_inode(); - rejoin->add_inode(in->ino(), - in->get_caps_wanted()); + if (dn->is_primary() && dn->get_inode()) { + CInode *in = dn->get_inode(); + if (mds->is_rejoin() && in->get_caps_wanted() == 0) + rejoin->add_weak_inode(in->ino()); + else { + rejoin->add_strong_inode(in->ino(), in->get_replica_nonce(), + in->get_caps_wanted(), + in->authlock.get_state(), + in->linklock.get_state(), + in->dirfragtreelock.get_state(), + in->filelock.get_state()); + if (in->authlock.is_xlocked()) + rejoin->add_inode_xlock(in->ino(), in->authlock.get_type(), + in->authlock.get_xlocked_by()->reqid); + if (in->linklock.is_xlocked()) + rejoin->add_inode_xlock(in->ino(), in->linklock.get_type(), + in->linklock.get_xlocked_by()->reqid); + if (in->dirfragtreelock.is_xlocked()) + rejoin->add_inode_xlock(in->ino(), in->dirfragtreelock.get_type(), + in->dirfragtreelock.get_xlocked_by()->reqid); + if (in->filelock.is_xlocked()) + rejoin->add_inode_xlock(in->ino(), in->filelock.get_type(), + in->filelock.get_xlocked_by()->reqid); + } // dirfrags in this subtree? list dfs; @@ -1401,7 +1433,6 @@ void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin) /* * i got a rejoin. - * * - reply with the lockstate * * if i am active|stopping, @@ -1409,30 +1440,59 @@ void MDCache::cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin) */ void MDCache::handle_cache_rejoin(MMDSCacheRejoin *m) { - dout(7) << "handle_cache_rejoin from " << m->get_source() << endl; + dout(7) << "handle_cache_rejoin " << *m << " from " << m->get_source() << endl; + + switch (m->op) { + case MMDSCacheRejoin::OP_REJOIN: + handle_cache_rejoin_rejoin(m); + break; + + case MMDSCacheRejoin::OP_ACK: + handle_cache_rejoin_ack(m); + break; + + case MMDSCacheRejoin::OP_MISSING: + handle_cache_rejoin_missing(m); + break; + + case MMDSCacheRejoin::OP_FULL: + handle_cache_rejoin_full(m); + break; + + default: + assert(0); + } + delete m; +} + +void MDCache::handle_cache_rejoin_rejoin(MMDSCacheRejoin *m) +{ int from = m->get_source().num(); - MMDSCacheRejoinAck *ack = new MMDSCacheRejoinAck; + // do immediate ack? + MMDSCacheRejoin *ack = 0; + MMDSCacheRejoin *missing = 0; if (mds->is_active() || mds->is_stopping()) { - dout(10) << "removing stale cache replicas" << endl; + dout(10) << "i am active. removing stale cache replicas" << endl; + // first, scour cache of replica references for (hash_map::iterator p = inode_map.begin(); p != inode_map.end(); ++p) { // inode CInode *in = p->second; - if (in->is_replica(from) && m->inodes.count(p->first) == 0) { + if (in->is_replica(from) && m->weak_inodes.count(p->first) == 0) { inode_remove_replica(in, from); dout(10) << " rem " << *in << endl; } - + // dentry if (in->parent) { CDentry *dn = in->parent; if (dn->is_replica(from) && - (m->dentries.count(dn->get_dir()->dirfrag()) == 0 || - m->dentries[dn->get_dir()->dirfrag()].count(dn->get_name()) == 0)) { + (m->weak_dentries.count(dn->get_dir()->dirfrag()) == 0 || + m->weak_dentries[dn->get_dir()->dirfrag()].count(dn->get_name()) == 0)) { dn->remove_replica(from); dout(10) << " rem " << *dn << endl; } @@ -1445,87 +1505,179 @@ void MDCache::handle_cache_rejoin(MMDSCacheRejoin *m) p != dfs.end(); ++p) { CDir *dir = *p; - if (dir->is_replica(from) && m->dirfrags.count(dir->dirfrag()) == 0) { + if (dir->is_replica(from) && m->weak_dirfrags.count(dir->dirfrag()) == 0) { dir->remove_replica(from); dout(10) << " rem " << *dir << endl; } } } - } else { - assert(mds->is_rejoin()); + + // do immediate ack. + ack = new MMDSCacheRejoin(MMDSCacheRejoin::OP_ACK); } - + // dirs - for (set::iterator p = m->dirfrags.begin(); - p != m->dirfrags.end(); + for (set::iterator p = m->weak_dirfrags.begin(); + p != m->weak_dirfrags.end(); ++p) { CDir *dir = get_dirfrag(*p); - assert(dir); - int nonce = dir->add_replica(from); - dout(10) << " has " << *dir << endl; - ack->add_dirfrag(*p, nonce); + if (dir) { + int nonce = dir->add_replica(from); + dout(10) << " have " << *dir << endl; + if (ack) + ack->add_strong_dirfrag(*p, nonce); - // dentries - for (set::iterator q = m->dentries[*p].begin(); - q != m->dentries[*p].end(); - ++q) { - CDentry *dn = dir->lookup(*q); - assert(dn); - int nonce = dn->add_replica(from); - dout(10) << " has " << *dn << endl; - ack->add_dentry(*p, *q, dn->lock.get_state(), nonce); + // dentries + for (set::iterator q = m->weak_dentries[*p].begin(); + q != m->weak_dentries[*p].end(); + ++q) { + CDentry *dn = dir->lookup(*q); + if (dn) { + int nonce = dn->add_replica(from); + dout(10) << " have " << *dn << endl; + ack->add_strong_dentry(*p, *q, dn->lock.get_state(), nonce); + } else { + dout(10) << " missing " << *p << " " << *q << endl; + if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING); + missing->add_weak_dentry(*p, *q); + } + if (ack) + ack->add_strong_dentry(*p, *q, nonce, dn->lock.get_state()); + } + } else { + dout(10) << " missing " << *p << endl; + if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING); + missing->add_weak_dirfrag(*p); + + // dentries + for (set::iterator q = m->weak_dentries[*p].begin(); + q != m->weak_dentries[*p].end(); + ++q) + missing->add_weak_dentry(*p, *q); } } // inodes - for (map::iterator p = m->inodes.begin(); - p != m->inodes.end(); + for (set::iterator p = m->weak_inodes.begin(); + p != m->weak_inodes.end(); ++p) { - CInode *in = get_inode(p->first); - assert(in); - int nonce = in->add_replica(from); - if (p->second) - in->mds_caps_wanted[from] = p->second; - else + CInode *in = get_inode(*p); + if (in) { + int nonce = in->add_replica(from); in->mds_caps_wanted.erase(from); - in->authlock.remove_gather(from); // just in case - in->linklock.remove_gather(from); // just in case - in->dirfragtreelock.remove_gather(from); // just in case - in->filelock.remove_gather(from); // just in case - dout(10) << " has " << *in << endl; - ack->add_inode(p->first, - in->authlock.get_replica_state(), - in->linklock.get_replica_state(), - in->dirfragtreelock.get_replica_state(), - in->filelock.get_replica_state(), - nonce); + in->authlock.remove_gather(from); // just in case + in->linklock.remove_gather(from); // just in case + in->dirfragtreelock.remove_gather(from); // just in case + in->filelock.remove_gather(from); // just in case + dout(10) << " have (weak) " << *in << endl; + if (ack) + ack->add_strong_inode(in->ino(), + nonce, + 0, + in->authlock.get_replica_state(), + in->linklock.get_replica_state(), + in->dirfragtreelock.get_replica_state(), + in->filelock.get_replica_state()); + } else { + dout(10) << " missing " << *p << endl; + if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING); + missing->add_weak_inode(*p); + } } - // send ack - mds->send_message_mds(ack, from, MDS_PORT_CACHE); + // strong inodes too? + for (map::iterator p = m->strong_inodes.begin(); + p != m->strong_inodes.end(); + ++p) { + CInode *in = get_inode(p->first); + if (in) { + dout(10) << " have (strong) " << *in << endl; + int nonce = in->add_replica(from); + if (p->second.caps_wanted) + in->mds_caps_wanted[from] = p->second.caps_wanted; + else + in->mds_caps_wanted.erase(from); + in->authlock.remove_gather(from); // just in case + in->linklock.remove_gather(from); // just in case + in->dirfragtreelock.remove_gather(from); // just in case + in->filelock.remove_gather(from); // just in case + dout(10) << " have (weak) " << *in << endl; + if (ack) + ack->add_strong_inode(in->ino(), + nonce, + 0, + in->authlock.get_replica_state(), + in->linklock.get_replica_state(), + in->dirfragtreelock.get_replica_state(), + in->filelock.get_replica_state()); + } else { + dout(10) << " missing " << p->first << endl; + if (!missing) missing = new MMDSCacheRejoin(MMDSCacheRejoin::OP_MISSING); + missing->add_weak_inode(p->first); + } + } - delete m; + // xlocks + for (list::iterator p = m->xlocked_inodes.begin(); + p != m->xlocked_inodes.end(); + ++p) { + CInode *in = get_inode(p->ino); + if (!in) continue; // already missing, from strong_inodes list above. + + dout(10) << " inode xlock by " << p->reqid << " on " << *in << endl; + assert(0); + //SimpleLock *lock = in->get_lock(p->locktype); + // .. FIXME IMPLEMENT ME .. + + + } + for (map >::iterator p = m->xlocked_dentries.begin(); + p != m->xlocked_dentries.end(); + ++p) { + CDir *dir = get_dirfrag(p->first); + if (!dir) continue; // already missing, from above. + for (map::iterator q = p->second.begin(); + q != p->second.end(); + ++q) { + CDentry *dn = dir->lookup(q->first); + if (!dn) continue; // already missing, from above. + dout(10) << " dn xlock by " << q->second << " on " << *dn << endl; + + // FIXME IMPLEMENT ME + assert(0); + } + } + + // send ack? + if (ack) + mds->send_message_mds(ack, from, MDS_PORT_CACHE); + else + want_rejoin_ack.insert(from); + + // send missing? + if (missing) + mds->send_message_mds(missing, from, MDS_PORT_CACHE); } -void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoinAck *m) +void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoin *m) { dout(7) << "handle_cache_rejoin_ack from " << m->get_source() << endl; int from = m->get_source().num(); // dirs - for (list::iterator p = m->dirfrags.begin(); - p != m->dirfrags.end(); + for (map::iterator p = m->strong_dirfrags.begin(); + p != m->strong_dirfrags.end(); ++p) { - CDir *dir = get_dirfrag(p->dirfrag); + CDir *dir = get_dirfrag(p->first); assert(dir); - dir->set_replica_nonce(p->nonce); + dir->set_replica_nonce(p->second.nonce); dout(10) << " got " << *dir << endl; // dentries - for (map::iterator q = m->dentries[p->dirfrag].begin(); - q != m->dentries[p->dirfrag].end(); + for (map::iterator q = m->strong_dentries[p->first].begin(); + q != m->strong_dentries[p->first].end(); ++q) { CDentry *dn = dir->lookup(q->first); assert(dn); @@ -1536,25 +1688,25 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoinAck *m) } // inodes - for (list::iterator p = m->inodes.begin(); - p != m->inodes.end(); + for (map::iterator p = m->strong_inodes.begin(); + p != m->strong_inodes.end(); ++p) { - CInode *in = get_inode(p->ino); + CInode *in = get_inode(p->first); assert(in); - in->set_replica_nonce(p->nonce); - in->authlock.set_state(p->authlock); - in->linklock.set_state(p->linklock); - in->dirfragtreelock.set_state(p->dirfragtreelock); - in->filelock.set_state(p->filelock); + in->set_replica_nonce(p->second.nonce); + in->authlock.set_state(p->second.authlock); + in->linklock.set_state(p->second.linklock); + in->dirfragtreelock.set_state(p->second.dirfragtreelock); + in->filelock.set_state(p->second.filelock); dout(10) << " got " << *in << endl; } - delete m; - // done? rejoin_ack_gather.erase(from); if (rejoin_ack_gather.empty()) { dout(7) << "all done, going active!" << endl; + send_cache_rejoin_acks(); + show_subtrees(); show_cache(); mds->set_want_state(MDSMap::STATE_ACTIVE); @@ -1565,6 +1717,68 @@ void MDCache::handle_cache_rejoin_ack(MMDSCacheRejoinAck *m) } +void MDCache::handle_cache_rejoin_missing(MMDSCacheRejoin *m) +{ + dout(7) << "handle_cache_rejoin_missing from " << m->get_source() << endl; + + MMDSCacheRejoin *full = new MMDSCacheRejoin(MMDSCacheRejoin::OP_FULL); + + // dirs + for (set::iterator p = m->weak_dirfrags.begin(); + p != m->weak_dirfrags.end(); + ++p) { + CDir *dir = get_dirfrag(*p); + assert(dir); + dout(10) << " sending " << *dir << endl; + + // dentries + for (set::iterator q = m->weak_dentries[*p].begin(); + q != m->weak_dentries[*p].end(); + ++q) { + CDentry *dn = dir->lookup(*q); + assert(dn); + dout(10) << " sending " << *dn << endl; + if (mds->is_rejoin()) + full->add_weak_dentry(*p, *q); + else + full->add_strong_dentry(*p, *q, dn->get_replica_nonce(), dn->lock.get_state()); + } + } + + // inodes + for (set::iterator p = m->weak_inodes.begin(); + p != m->weak_inodes.end(); + ++p) { + CInode *in = get_inode(*p); + assert(in); + + dout(10) << " sending " << *in << endl; + full->add_full_inode(in->inode, in->symlink, in->dirfragtree); + if (mds->is_rejoin()) + full->add_weak_inode(in->ino()); + else + full->add_strong_inode(in->ino(), + in->get_replica_nonce(), + in->get_caps_wanted(), + in->authlock.get_replica_state(), + in->linklock.get_replica_state(), + in->dirfragtreelock.get_replica_state(), + in->filelock.get_replica_state()); + } + + mds->send_message_mds(full, m->get_source().num(), MDS_PORT_CACHE); +} + +void MDCache::handle_cache_rejoin_full(MMDSCacheRejoin *m) +{ + assert(0); // write me +} + +void MDCache::send_cache_rejoin_acks() +{ + dout(7) << "send_cache_rejoin_acks to " << want_rejoin_ack << endl; + +} @@ -2398,9 +2612,11 @@ void MDCache::dispatch(Message *m) case MSG_MDS_CACHEREJOIN: handle_cache_rejoin((MMDSCacheRejoin*)m); break; + /* case MSG_MDS_CACHEREJOINACK: handle_cache_rejoin_ack((MMDSCacheRejoinAck*)m); break; + */ case MSG_MDS_DISCOVER: diff --git a/branches/sage/cephmds2/mds/MDCache.h b/branches/sage/cephmds2/mds/MDCache.h index fb62b48564089..052f380cd3816 100644 --- a/branches/sage/cephmds2/mds/MDCache.h +++ b/branches/sage/cephmds2/mds/MDCache.h @@ -243,21 +243,29 @@ public: // -- recovery -- protected: + set recovery_set; + // from EImportStart w/o EImportFinish during journal replay map > my_ambiguous_imports; // from MMDSImportMaps map > > other_ambiguous_imports; - set recovery_set; set wants_import_map; // nodes i need to send my import map to set got_import_map; // nodes i got import_maps from - set rejoin_ack_gather; // nodes i need a rejoin ack from void handle_import_map(MMDSImportMap *m); - void handle_cache_rejoin(MMDSCacheRejoin *m); - void handle_cache_rejoin_ack(MMDSCacheRejoinAck *m); void disambiguate_imports(); + + set rejoin_gather; // nodes from whom i need a rejoin + set rejoin_ack_gather; // nodes from whom i need a rejoin ack + set want_rejoin_ack; // nodes to whom i need to send a rejoin ack + void cache_rejoin_walk(CDir *dir, MMDSCacheRejoin *rejoin); + void handle_cache_rejoin(MMDSCacheRejoin *m); + void handle_cache_rejoin_rejoin(MMDSCacheRejoin *m); + void handle_cache_rejoin_ack(MMDSCacheRejoin *m); + void handle_cache_rejoin_missing(MMDSCacheRejoin *m); + void handle_cache_rejoin_full(MMDSCacheRejoin *m); void send_cache_rejoin_acks(); void recalc_auth_bits(); diff --git a/branches/sage/cephmds2/mds/MDLog.cc b/branches/sage/cephmds2/mds/MDLog.cc index f030d8b76c286..f577e29417e1e 100644 --- a/branches/sage/cephmds2/mds/MDLog.cc +++ b/branches/sage/cephmds2/mds/MDLog.cc @@ -129,8 +129,7 @@ off_t MDLog::get_write_pos() -void MDLog::submit_entry( LogEvent *le, - Context *c ) +void MDLog::submit_entry( LogEvent *le, Context *c ) { if (g_conf.mds_log) { dout(5) << "submit_entry " << journaler->get_write_pos() << " : " << *le << endl; diff --git a/branches/sage/cephmds2/mds/MDLog.h b/branches/sage/cephmds2/mds/MDLog.h index 4383fd6b63077..f06a2eea21427 100644 --- a/branches/sage/cephmds2/mds/MDLog.h +++ b/branches/sage/cephmds2/mds/MDLog.h @@ -102,9 +102,13 @@ class MDLog { friend class MDCache; void init_journaler(); + public: + void add_import_map_expire_waiter(Context *c) { + import_map_expire_waiters.push_back(c); + } + - public: // replay state map > pending_exports; diff --git a/branches/sage/cephmds2/mds/Migrator.cc b/branches/sage/cephmds2/mds/Migrator.cc index 450797605f284..1bcdda631abb1 100644 --- a/branches/sage/cephmds2/mds/Migrator.cc +++ b/branches/sage/cephmds2/mds/Migrator.cc @@ -552,10 +552,12 @@ void Migrator::export_frozen(CDir *dir) // include spanning tree for all nested exports. // these need to be on the destination _before_ the final export so that // dir_auth updates on any nested exports are properly absorbed. + // this includes inodes and dirfrags included in the subtree, but + // only the inodes at the bounds. set inodes_added; - // include base dir - prep->add_dir( new CDirDiscover(dir, dir->add_replica(dest)) ); + // include base dirfrag + prep->add_dirfrag( new CDirDiscover(dir, dir->add_replica(dest)) ); // check bounds for (set::iterator it = bounds.begin(); @@ -580,23 +582,20 @@ void Migrator::export_frozen(CDir *dir) // don't repeat ourselves if (inodes_added.count(cur->ino())) break; // did already! inodes_added.insert(cur->ino()); - - CDir *parent_dir = cur->get_parent_dir(); - // inode? + // inode assert(cur->inode->is_auth()); inode_trace.push_front(cur->inode); dout(7) << " will add " << *cur->inode << endl; - // include dir? - // note: don't replicate ambiguous auth items! they're - // frozen anyway. - if (cur->is_auth() && !cur->is_ambiguous_auth()) { - prep->add_dir( new CDirDiscover(cur, cur->add_replica(dest)) ); // yay! + // include the dirfrag? only if it's not the bounding subtree root. + if (cur != bound) { + assert(cur->is_auth()); + prep->add_dirfrag( new CDirDiscover(cur, cur->add_replica(dest)) ); // yay! dout(7) << " added " << *cur << endl; } - cur = parent_dir; + cur = cur->get_parent_dir(); } for (list::iterator it = inode_trace.begin(); diff --git a/branches/sage/cephmds2/mds/Server.cc b/branches/sage/cephmds2/mds/Server.cc index d8fb638c47ada..7a84a758376f9 100644 --- a/branches/sage/cephmds2/mds/Server.cc +++ b/branches/sage/cephmds2/mds/Server.cc @@ -35,6 +35,7 @@ #include "events/EString.h" #include "events/EUpdate.h" #include "events/EMount.h" +#include "events/EOpen.h" #include "include/filepath.h" #include "common/Timer.h" @@ -2568,6 +2569,13 @@ void Server::_do_open(MDRequest *mdr, CInode *cur) reply->set_file_caps_seq(cap->get_last_seq()); reply->set_file_data_version(fdv); reply_request(mdr, reply, cur); + + // journal? + if (cur->last_open_journaled == 0) { + cur->last_open_journaled = mdlog->get_write_pos(); + mdlog->submit_entry(new EOpen(cur)); + } + } diff --git a/branches/sage/cephmds2/mds/SimpleLock.h b/branches/sage/cephmds2/mds/SimpleLock.h index 3062809428b9d..cd27319e99c63 100644 --- a/branches/sage/cephmds2/mds/SimpleLock.h +++ b/branches/sage/cephmds2/mds/SimpleLock.h @@ -38,13 +38,15 @@ inline const char *get_lock_type_name(int t) { } // -- lock states -- +#define LOCK_UNDEF 0 // auth rep -#define LOCK_SYNC 0 // AR R . R . -#define LOCK_LOCK 1 // AR R W . . -#define LOCK_GLOCKR 2 // AR R . . . +#define LOCK_SYNC 1 // AR R . R . +#define LOCK_LOCK 2 // AR R W . . +#define LOCK_GLOCKR 3 // AR R . . . inline const char *get_simplelock_state_name(int n) { switch (n) { + case LOCK_UNDEF: return "undef"; case LOCK_SYNC: return "sync"; case LOCK_LOCK: return "lock"; case LOCK_GLOCKR: return "glockr"; @@ -242,8 +244,9 @@ public: inline ostream& operator<<(ostream& out, SimpleLock& l) { - out << "(" << get_lock_type_name(l.get_type()) - << " " << get_simplelock_state_name(l.get_state()); + out << "("; + //out << get_lock_type_name(l.get_type()) << " "; + out << get_simplelock_state_name(l.get_state()); if (!l.get_gather_set().empty()) out << " g=" << l.get_gather_set(); if (l.get_num_rdlock()) out << " r=" << l.get_num_rdlock(); diff --git a/branches/sage/cephmds2/mds/events/EOpen.h b/branches/sage/cephmds2/mds/events/EOpen.h new file mode 100644 index 0000000000000..729fea934c1c6 --- /dev/null +++ b/branches/sage/cephmds2/mds/events/EOpen.h @@ -0,0 +1,48 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#ifndef __MDS_EOPEN_H +#define __MDS_EOPEN_H + +#include "../LogEvent.h" +#include "EMetaBlob.h" + +class EOpen : public LogEvent { +public: + EMetaBlob metablob; + inodeno_t ino; + + EOpen() : LogEvent(EVENT_OPEN) { } + EOpen(CInode *in) : LogEvent(EVENT_OPEN), + ino(in->ino()) { + metablob.add_primary_dentry(in->get_parent_dn(), false); + } + void print(ostream& out) { + out << "open " << metablob; + } + + void encode_payload(bufferlist& bl) { + ::_encode(ino, bl); + metablob._encode(bl); + } + void decode_payload(bufferlist& bl, int& off) { + ::_decode(ino, bl, off); + metablob._decode(bl, off); + } + + bool has_expired(MDS *mds); + void expire(MDS *mds, Context *c); + void replay(MDS *mds); +}; + +#endif diff --git a/branches/sage/cephmds2/mds/events/EUnlink.h b/branches/sage/cephmds2/mds/events/EUnlink.h deleted file mode 100644 index 7d972488dab1b..0000000000000 --- a/branches/sage/cephmds2/mds/events/EUnlink.h +++ /dev/null @@ -1,71 +0,0 @@ -// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ - -#ifndef __EUNLINK_H -#define __EUNLINK_H - -#include -#include "config.h" -#include "include/types.h" - -#include "../LogEvent.h" -#include "EMetaBlob.h" - -#include "../CInode.h" -#include "../CDentry.h" -#include "../CDir.h" - -/// help rewrite me - -class EUnlink : public LogEvent { - protected: - version_t dirv; - string dname; - - public: - EMetaBlob metaglob; - - /* - EUnlink(CDir *dir, CDentry* dn, CInode *in) : - LogEvent(EVENT_UNLINK), - diritrace(dir->inode), - dirv(dir->get_version()), - dname(dn->get_name()), - inodetrace(in) {} - */ - EUnlink() : LogEvent(EVENT_UNLINK) { } - - virtual void encode_payload(bufferlist& bl) { - /* - diritrace.encode(bl); - bl.append((char*)&dirv, sizeof(dirv)); - ::_encode(dname, bl); - inodetrace.encode(bl); - */ - } - void decode_payload(bufferlist& bl, int& off) { - /* - diritrace.decode(bl,off); - bl.copy(off, sizeof(dirv), (char*)&dirv); - off += sizeof(dirv); - ::_decode(dname, bl, off); - inodetrace.decode(bl, off); - */ - } - - bool has_expired(MDS *mds); - void expire(MDS *mds, Context *c); - void replay(MDS *mds); -}; - -#endif diff --git a/branches/sage/cephmds2/mds/journal.cc b/branches/sage/cephmds2/mds/journal.cc index d1e8e0cd178f7..73e13dcab52d2 100644 --- a/branches/sage/cephmds2/mds/journal.cc +++ b/branches/sage/cephmds2/mds/journal.cc @@ -12,24 +12,26 @@ */ #include "events/EString.h" +#include "events/EImportMap.h" +#include "events/EMount.h" +#include "events/EClientMap.h" #include "events/EMetaBlob.h" -#include "events/EAlloc.h" -#include "events/EAnchor.h" -#include "events/EAnchorClient.h" + #include "events/EUpdate.h" #include "events/ESlaveUpdate.h" -#include "events/EImportMap.h" - -#include "events/EMount.h" -#include "events/EClientMap.h" +#include "events/EOpen.h" +#include "events/EAlloc.h" #include "events/EPurgeFinish.h" -#include "events/EUnlink.h" + #include "events/EExport.h" #include "events/EImportStart.h" #include "events/EImportFinish.h" +#include "events/EAnchor.h" +#include "events/EAnchorClient.h" + #include "MDS.h" #include "MDLog.h" #include "MDCache.h" @@ -633,6 +635,51 @@ void EUpdate::replay(MDS *mds) } +// ------------------------ +// EOpen + +bool EOpen::has_expired(MDS *mds) +{ + CInode *in = mds->mdcache->get_inode(ino); + if (!in) return true; + if (!in->is_any_caps()) return true; + if (in->last_open_journaled > get_start_off() || + in->last_open_journaled == 0) return true; + return false; +} + +void EOpen::expire(MDS *mds, Context *c) +{ + CInode *in = mds->mdcache->get_inode(ino); + assert(in); + + dout(10) << "EOpen.expire " << ino + << " last_open_journaled " << in->last_open_journaled << endl; + + // wait? + // FIXME this is stupid. + if (in->last_open_journaled == get_start_off()) { + //|| + //(get_start_off() < mds->mdlog->last_import_map && + //in->last_open_journaled < mds->mdlog->last_import_map)) { + dout(10) << "waiting." << endl; + // wait + mds->mdlog->add_import_map_expire_waiter(c); + } else { + // rejournal now. + dout(10) << "rejournaling" << endl; + in->last_open_journaled = mds->mdlog->get_write_pos(); + mds->mdlog->submit_entry(new EOpen(in)); + } +} + +void EOpen::replay(MDS *mds) +{ + dout(10) << "EOpen.replay " << ino << endl; + metablob.replay(mds); +} + + // ----------------------- // EUpdate @@ -717,48 +764,6 @@ void EImportMap::replay(MDS *mds) -// ----------------------- -// EUnlink - -bool EUnlink::has_expired(MDS *mds) -{ - /* - // dir - CInode *diri = mds->mdcache->get_inode( diritrace.back().inode.ino ); - CDir *dir = 0; - if (diri) dir = diri->dir; - - if (dir && dir->get_last_committed_version() < dirv) return false; - - if (!inodetrace.trace.empty()) { - // inode - CInode *in = mds->mdcache->get_inode( inodetrace.back().inode.ino ); - if (in && in->get_last_committed_version() < inodetrace.back().inode.version) - return false; - } - */ - return true; -} - -void EUnlink::expire(MDS *mds, Context *c) -{ - /* - CInode *diri = mds->mdcache->get_inode( diritrace.back().inode.ino ); - CDir *dir = diri->dir; - assert(dir); - - // okay! - dout(7) << "commiting dirty (from unlink) dir " << *dir << endl; - mds->mdstore->commit_dir(dir, dirv, c); - */ -} - -void EUnlink::replay(MDS *mds) -{ -} - - - // ----------------------- // EPurgeFinish diff --git a/branches/sage/cephmds2/messages/MExportDirPrep.h b/branches/sage/cephmds2/messages/MExportDirPrep.h index 9959089ae80b9..fce07df7958b7 100644 --- a/branches/sage/cephmds2/messages/MExportDirPrep.h +++ b/branches/sage/cephmds2/messages/MExportDirPrep.h @@ -34,7 +34,7 @@ class MExportDirPrep : public Message { map inode_dentry; map > frags_by_ino; - map dirs; + map dirfrags; set bystanders; @@ -53,11 +53,11 @@ class MExportDirPrep : public Message { string& get_dentry(inodeno_t ino) { return inode_dentry[ino]; } - bool have_dir(dirfrag_t df) { - return dirs.count(df); + bool have_dirfrag(dirfrag_t df) { + return dirfrags.count(df); } CDirDiscover* get_dirfrag_discover(dirfrag_t df) { - return dirs[df]; + return dirfrags[df]; } set &get_bystanders() { return bystanders; } @@ -76,8 +76,8 @@ class MExportDirPrep : public Message { iit != inodes.end(); iit++) delete *iit; - for (map::iterator dit = dirs.begin(); - dit != dirs.end(); + for (map::iterator dit = dirfrags.begin(); + dit != dirfrags.end(); dit++) delete dit->second; } @@ -96,8 +96,8 @@ class MExportDirPrep : public Message { inode_dirfrag[in->get_ino()] = df; inode_dentry[in->get_ino()] = dentry; } - void add_dir(CDirDiscover *dir) { - dirs[dir->get_dirfrag()] = dir; + void add_dirfrag(CDirDiscover *dir) { + dirfrags[dir->get_dirfrag()] = dir; frags_by_ino[dir->get_dirfrag().ino].push_back(dir->get_dirfrag().frag); } void add_bystander(int who) { @@ -143,7 +143,7 @@ class MExportDirPrep : public Message { for (int i=0; i_decode(payload, off); - dirs[dir->get_dirfrag()] = dir; + dirfrags[dir->get_dirfrag()] = dir; } ::_decode(bystanders, payload, off); @@ -174,10 +174,10 @@ class MExportDirPrep : public Message { } // dirs - int nd = dirs.size(); + int nd = dirfrags.size(); payload.append((char*)&nd, sizeof(int)); - for (map::iterator dit = dirs.begin(); - dit != dirs.end(); + for (map::iterator dit = dirfrags.begin(); + dit != dirfrags.end(); dit++) dit->second->_encode(payload); diff --git a/branches/sage/cephmds2/messages/MMDSCacheRejoin.h b/branches/sage/cephmds2/messages/MMDSCacheRejoin.h index f75900defd2f8..2d1d858958ce8 100644 --- a/branches/sage/cephmds2/messages/MMDSCacheRejoin.h +++ b/branches/sage/cephmds2/messages/MMDSCacheRejoin.h @@ -22,37 +22,167 @@ class MMDSCacheRejoin : public Message { public: - map inodes; // ino -> caps_wanted - set dirfrags; - map > dentries; // dir -> (dentries...) + static const int OP_REJOIN = 1; // replica -> auth, i exist. and maybe my lock state. + static const int OP_ACK = 3; // auth -> replica, here is your lock state. + static const int OP_MISSING = 4; // auth -> replica, i am missing these items + static const int OP_FULL = 5; // replica -> auth, here is the full object. + static const char *get_opname(int op) { + switch (op) { + case OP_REJOIN: return "rejoin"; + case OP_ACK: return "ack"; + case OP_MISSING: return "missing"; + case OP_FULL: return "full"; + default: assert(0); + } + } + + // -- types -- + struct inode_strong { + int caps_wanted; + int nonce; + int authlock; + int linklock; + int dirfragtreelock; + int filelock; + inode_strong() {} + inode_strong(int n, int cw=0, int a=0, int l=0, int dft=0, int f=0) : + caps_wanted(cw), + nonce(n), + authlock(a), linklock(l), dirfragtreelock(dft), filelock(f) { } + }; + struct inode_full { + inode_t inode; + string symlink; + fragtree_t dirfragtree; + inode_full() {} + inode_full(const inode_t& i, const string& s, const fragtree_t& f) : + inode(i), symlink(s), dirfragtree(f) {} + inode_full(bufferlist& bl, int& off) { + ::_decode(inode, bl, off); + ::_decode(symlink, bl, off); + ::_decode(dirfragtree, bl, off); + } + void _encode(bufferlist& bl) { + ::_encode(inode, bl); + ::_encode(symlink, bl); + ::_encode(dirfragtree, bl); + } + }; + struct inode_xlock { + inodeno_t ino; + int locktype; + metareqid_t reqid; + inode_xlock() {} + inode_xlock(inodeno_t i, int lt, const metareqid_t& ri) : + ino(i), locktype(lt), reqid(ri) {} + }; + + struct dirfrag_strong { + int nonce; + dirfrag_strong() {} + dirfrag_strong(int n) : nonce(n) {} + }; + struct dn_strong { + int nonce; + int lock; + dn_strong() {} + dn_strong(int n, int l) : nonce(n), lock(l) {} + }; + + // -- data -- + int op; + + set weak_inodes; + map strong_inodes; + list full_inodes; + list xlocked_inodes; + + set weak_dirfrags; + map strong_dirfrags; + + map > weak_dentries; + map > strong_dentries; + map > xlocked_dentries; MMDSCacheRejoin() : Message(MSG_MDS_CACHEREJOIN) {} + MMDSCacheRejoin(int o) : + Message(MSG_MDS_CACHEREJOIN), + op(o) {} char *get_type_name() { return "cache_rejoin"; } + void print(ostream& out) { + out << "cache_rejoin " << get_opname(op); + } - void add_dirfrag(dirfrag_t dirfrag) { - dirfrags.insert(dirfrag); + // -- builders -- + // inodes + void add_weak_inode(inodeno_t ino) { + weak_inodes.insert(ino); } - void add_dentry(dirfrag_t dirfrag, const string& dn) { - dentries[dirfrag].insert(dn); + void add_strong_inode(inodeno_t i, int n, int cw, int a, int l, int dft, int f) { + strong_inodes[i] = inode_strong(n, cw, a, l, dft, f); } - void add_inode(inodeno_t ino, int cw) { - inodes[ino] = cw; + void add_full_inode(inode_t &i, const string& s, const fragtree_t &f) { + full_inodes.push_back(inode_full(i, s, f)); + } + void add_inode_xlock(inodeno_t ino, int lt, const metareqid_t& ri) { + xlocked_inodes.push_back(inode_xlock(ino, lt, ri)); } + // dirfrags + void add_weak_dirfrag(dirfrag_t df) { + weak_dirfrags.insert(df); + } + void add_strong_dirfrag(dirfrag_t df, int n) { + strong_dirfrags[df] = dirfrag_strong(n); + } + + // dentries + void add_weak_dentry(dirfrag_t df, const string& dname) { + weak_dentries[df].insert(dname); + } + void add_strong_dentry(dirfrag_t df, const string& dname, int n, int ls) { + strong_dentries[df][dname] = dn_strong(n, ls); + } + void add_dentry_xlock(dirfrag_t df, const string& dname, const metareqid_t& ri) { + xlocked_dentries[df][dname] = ri; + } + + // -- encoding -- void encode_payload() { - ::_encode(inodes, payload); - ::_encode(dirfrags, payload); - for (set::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) - ::_encode(dentries[*p], payload); + ::_encode(weak_inodes, payload); + ::_encode(strong_inodes, payload); + + __uint32_t nfull = full_inodes.size(); + ::_encode(nfull, payload); + for (list::iterator p = full_inodes.begin(); p != full_inodes.end(); ++p) + p->_encode(payload); + + ::_encode(xlocked_inodes, payload); + ::_encode(weak_dirfrags, payload); + //::_encode(strong_dirfrags, payload); + ::_encode(weak_dentries, payload); + ::_encode(strong_dentries, payload); + ::_encode(xlocked_dentries, payload); } void decode_payload() { int off = 0; - ::_decode(inodes, payload, off); - ::_decode(dirfrags, payload, off); - for (set::iterator p = dirfrags.begin(); p != dirfrags.end(); ++p) - ::_decode(dentries[*p], payload, off); + ::_decode(weak_inodes, payload, off); + ::_decode(strong_inodes, payload, off); + + __uint32_t nfull; + ::_decode(nfull, payload, off); + for (unsigned i=0; i