LDINC = ar -rc
else
# For linux
-CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -D_REENTRANT -D_THREAD_SAFE
+CFLAGS = -pg -g -Wall -I. -D_FILE_OFFSET_BITS=64 -D_REENTRANT -D_THREAD_SAFE
LDINC = ld -i -o
endif
sage mds
+- the split/merge plan:
+ - dirfragtree is lazily consistent. no lock. bcast by primary when it updates.
+ - CDir is never request pinned
+ - add a CInode sticky_dir flag to somehow pin all cdirs on the fly.
+ - STICKY dir state and pin? make sure it's kept across import/export/fragment
+ - auth journals and applies update in the request update pipeline
+ - bcast to dir replicas
+ - inode auth will journal inode update separately/lazily
+ - also on handle_resolve(), if there is a mismatch.
+ - do i need a fragtrace_t something to tell me where the splits for a given frag occurred?
+ - or something like a fragtree_t simplify()?
+ - is there any reason to freeze the dir?
+ - CDentry objects will be moved to the new frag(s)
+ - Server etc. must take care not to carry CDir pointers around; they're unstable!
+
+
+- journal epoch, or something similar
+ - reduce size of EMetaBlob by skipping context when inode was already journaled since the last
+ SubtreeMap
+
+
- hmm, should we move ESubtreeMap out of the journal?
that would avoid all the icky weirdness in shutdown, with periodic logging, etc.
bl.append((char*)&version, sizeof(version));
bl.append((char*)&projected_version, sizeof(projected_version));
lock._encode(bl);
- ::_encode(replicas, bl);
+ ::_encode(replica_map, bl);
// twiddle
- clear_replicas();
+ clear_replica_map();
replica_nonce = EXPORT_NONCE;
state_clear(CDentry::STATE_AUTH);
if (is_dirty())
bl.copy(off, sizeof(projected_version), (char*)&projected_version);
off += sizeof(projected_version);
lock._decode(bl, off);
- ::_decode(replicas, bl, off);
+ ::_decode(replica_map, bl, off);
// twiddle
state = 0;
state_set(CDentry::STATE_AUTH);
if (nstate & STATE_DIRTY)
_mark_dirty();
- if (!replicas.empty())
+ if (!replica_map.empty())
get(PIN_REPLICATED);
add_replica(from, EXPORT_NONCE);
if (is_replica(to))
nitems--; // adjust dir size
}
+
+void CDir::steal_dentry(CDentry *dn)
+{
+ dout(15) << "steal_dentry " << *dn << endl;
+
+ items[dn->name] = dn;
+
+ nitems++;
+ if (dn->is_null())
+ nnull++;
+ if (dn->is_primary())
+ nested_auth_pins += dn->inode->auth_pins + dn->inode->nested_auth_pins;
+ if (dn->is_dirty())
+ num_dirty++;
+
+ dn->dir = this;
+}
+
+void CDir::purge_stolen(list<Context*>& waiters)
+{
+ if (!items.empty()) {
+ put(PIN_CHILD);
+ items.clear();
+ }
+
+ if (is_dirty()) mark_clean();
+
+ if (state_test(STATE_EXPORT)) put(PIN_EXPORT);
+ if (state_test(STATE_IMPORTBOUND)) put(PIN_IMPORTBOUND);
+ if (state_test(STATE_EXPORTBOUND)) put(PIN_EXPORTBOUND);
+
+ if (auth_pins > 0) put(PIN_AUTHPIN);
+
+ take_waiting(WAIT_ANY, waiters);
+
+ assert(get_num_ref() == 0);
+}
+
+
void CDir::remove_null_dentries() {
dout(12) << "remove_null_dentries " << *this << endl;
}
}
+void CDir::_freeze_tree(Context *c)
+{
+ dout(10) << "_freeze_tree " << *this << endl;
+
+ // there shouldn't be any conflicting auth_pins.
+ assert(is_freezeable_dir());
+
+ // twiddle state
+ state_clear(STATE_FREEZINGTREE); // actually, this may get set again by next context?
+ state_set(STATE_FROZENTREE);
+ get(PIN_FROZEN);
+
+ // auth_pin inode for duration of freeze, if we are not a subtree root.
+ if (is_auth() && !is_subtree_root())
+ inode->auth_pin();
+
+ // continue to frozen land
+ if (c) {
+ c->finish(0);
+ delete c;
+ }
+}
+
void CDir::freeze_tree_finish(Context *c)
{
// still freezing? (we may have been canceled)
_freeze_tree(c);
}
-void CDir::_freeze_tree(Context *c)
-{
- dout(10) << "_freeze_tree " << *this << endl;
-
- // there shouldn't be any conflicting auth_pins.
- assert(is_freezeable_dir());
-
- // twiddle state
- state_clear(STATE_FREEZINGTREE); // actually, this may get set again by next context?
- state_set(STATE_FROZENTREE);
-
- // auth_pin inode for duration of freeze, if we are not a subtree root.
- if (is_auth() && !is_subtree_root())
- inode->auth_pin();
-
- // continue to frozen land
- if (c) {
- c->finish(0);
- delete c;
- }
-}
-
void CDir::unfreeze_tree()
{
dout(10) << "unfreeze_tree " << *this << endl;
if (state_test(STATE_FROZENTREE)) {
// frozen. unfreeze.
state_clear(STATE_FROZENTREE);
+ put(PIN_FROZEN);
// unpin (may => FREEZEABLE) FIXME: is this order good?
if (is_auth() && !is_subtree_root())
{
dout(10) << "_freeze_dir " << *this << endl;
+ assert(is_freezeable_dir());
+
+ state_clear(STATE_FREEZINGDIR);
state_set(STATE_FROZENDIR);
+ get(PIN_FROZEN);
if (is_auth() && !is_subtree_root())
inode->auth_pin(); // auth_pin for duration of freeze
void CDir::freeze_dir_finish(Context *c)
{
+ // still freezing? (we may have been canceled)
+ if (!is_freezing()) {
+ dout(10) << "freeze_dir_finish no longer freezing, done on " << *this << endl;
+ c->finish(-1);
+ delete c;
+ return;
+ }
+
// freezeable now?
- if (is_freezeable_dir()) {
- // freeze now
- _freeze_dir(c);
- } else {
+ if (!is_freezeable_dir()) {
// wait again!
dout(10) << "freeze_dir_finish still waiting " << *this << endl;
state_set(STATE_FREEZINGDIR);
add_waiter(WAIT_FREEZEABLE, new C_MDS_FreezeDir(this, c));
+ return;
}
+
+ // freeze now
+ _freeze_dir(c);
}
void CDir::unfreeze_dir()
{
dout(10) << "unfreeze_dir " << *this << endl;
- state_clear(STATE_FROZENDIR);
-
- // unpin (may => FREEZEABLE) FIXME: is this order good?
- if (is_auth() && !is_subtree_root())
- inode->auth_unpin();
- // waiters?
- finish_waiting(WAIT_UNFREEZE);
+ if (state_test(STATE_FROZENDIR)) {
+ state_clear(STATE_FROZENDIR);
+ put(PIN_FROZEN);
+
+ // unpin (may => FREEZEABLE) FIXME: is this order good?
+ if (is_auth() && !is_subtree_root())
+ inode->auth_unpin();
+
+ // waiters?
+ finish_waiting(WAIT_UNFREEZE);
+ } else {
+ // still freezing. stop.
+ assert(state_test(STATE_FREEZINGDIR));
+ state_clear(STATE_FREEZINGDIR);
+
+ // cancel freeze waiters
+ finish_waiting(WAIT_UNFREEZE);
+ finish_waiting(WAIT_FREEZEABLE, -1);
+ }
}
typedef map<string, CDentry*> CDir_map_t;
-//extern int cdir_pins[CDIR_NUM_PINS];
-
class CDir : public MDSCacheObject {
public:
// -- pins --
- static const int PIN_DNWAITER = 1;
- static const int PIN_CHILD = 2;
- static const int PIN_EXPORT = 4;
- static const int PIN_AUTHPIN = 8;
- static const int PIN_IMPORTING = 9;
- static const int PIN_EXPORTING = 10;
- static const int PIN_IMPORTBOUND = 11;
- static const int PIN_EXPORTBOUND = 12;
- static const int PIN_LOGGINGEXPORTFINISH = 17;
+ static const int PIN_DNWAITER = 1;
+ static const int PIN_CHILD = 2;
+ static const int PIN_FROZEN = 3;
+ static const int PIN_FRAGMENTING = 4;
+ static const int PIN_EXPORT = 5;
+ static const int PIN_AUTHPIN = 6;
+ static const int PIN_IMPORTING = 7;
+ static const int PIN_EXPORTING = 8;
+ static const int PIN_IMPORTBOUND = 9;
+ static const int PIN_EXPORTBOUND = 10;
const char *pin_name(int p) {
switch (p) {
case PIN_DNWAITER: return "dnwaiter";
case PIN_CHILD: return "child";
+ case PIN_FROZEN: return "frozen";
+ case PIN_FRAGMENTING: return "fragmenting";
case PIN_EXPORT: return "export";
case PIN_EXPORTING: return "exporting";
case PIN_IMPORTING: return "importing";
case PIN_IMPORTBOUND: return "importbound";
case PIN_EXPORTBOUND: return "exportbound";
case PIN_AUTHPIN: return "authpin";
- case PIN_LOGGINGEXPORTFINISH: return "loggingexportfinish";
default: return generic_pin_name(p);
}
}
static const unsigned STATE_EXPORTBOUND = (1<<14);
static const unsigned STATE_EXPORTING = (1<<15);
static const unsigned STATE_IMPORTING = (1<<16);
+ static const unsigned STATE_FRAGMENTING = (1<<17);
// common states
static const unsigned STATE_CLEAN = 0;
// -- accessors --
inodeno_t ino() const { return inode->ino(); } // deprecate me?
+ frag_t get_frag() const { return frag; }
dirfrag_t dirfrag() const { return dirfrag_t(inode->ino(), frag); }
CInode *get_inode() { return inode; }
void link_inode( CDentry *dn, inodeno_t ino );
void link_inode( CDentry *dn, CInode *in );
void unlink_inode( CDentry *dn );
- private:
+private:
void link_inode_work( CDentry *dn, CInode *in );
void unlink_inode_work( CDentry *dn );
-
+ void steal_dentry(CDentry *dn); // from another dir. used by merge/split.
+ void purge_stolen(list<Context*>& waiters);
void remove_null_dentries();
// -- authority --
dir->popularity[MDS_POP_NESTED] -= st.popularity_curdom;
rep_by = dir->dir_rep_by;
- replicas = dir->replicas;
+ replicas = dir->replica_map;
}
dirfrag_t get_dirfrag() { return st.dirfrag; }
dir->replica_nonce = 0; // no longer defined
- if (!dir->replicas.empty())
- dout(0) << "replicas not empty non import, " << *dir << ", " << dir->replicas << endl;
+ if (!dir->replica_map.empty())
+ dout(0) << "replicas not empty non import, " << *dir << ", " << dir->replica_map << endl;
dir->dir_rep_by = rep_by;
- dir->replicas = replicas;
- dout(12) << "replicas in export is " << replicas << ", dir now " << dir->replicas << endl;
+ dir->replica_map = replicas;
+ dout(12) << "replicas in export is " << replicas << ", dir now " << dir->replica_map << endl;
if (!replicas.empty())
dir->get(CDir::PIN_REPLICATED);
if (dir->is_dirty()) {
}
+void CInode::fragment_dir(frag_t basefrag, int bits)
+{
+ dout(10) << "fragment_dir " << basefrag << " by " << bits << endl;
+
+ CDir *base = get_or_open_dirfrag(mdcache, basefrag);
+
+ list<frag_t> frags;
+ basefrag.split(bits, frags);
+
+ vector<CDir*> subfrags(1 << bits);
+
+ list<Context*> waiters;
+
+ if (bits > 0) {
+ // split.
+ // update fragtree
+ dirfragtree.split(basefrag, bits);
+
+ // create subfrag dirs
+ for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
+ CDir *f = new CDir(this, *p, mdcache, true);
+
+ // propogate flags
+ f->state_set(base->get_state() &
+ (CDir::STATE_DIRTY |
+ CDir::STATE_COMPLETE |
+ CDir::STATE_FROZENDIR));
+ f->set_version(base->get_version());
+
+ if (base->state_test(CDir::STATE_EXPORT)) {
+ f->state_set(CDir::STATE_EXPORT);
+ f->get(CDir::PIN_EXPORT);
+ }
+
+ // dup replica map
+ f->replica_map = base->replica_map;
+
+ dout(10) << " subfrag " << *p << " " << *f << endl;
+ subfrags.push_back(f);
+ add_dirfrag(f);
+ }
+ assert(subfrags.size() == frags.size());
+
+ // repartition dentries
+ while (!base->items.empty()) {
+ map<string,CDentry*>::iterator p = base->items.begin();
+
+ CDentry *dn = p->second;
+ frag_t frag = base->inode->pick_dirfrag(p->first);
+ int n = frag.value() >> basefrag.bits();
+ dout(15) << " subfrag " << frag << " n=" << n << " for " << p->first << endl;
+ CDir *f = dirfrags[n];
+
+ f->steal_dentry(dn);
+ }
+
+ // empty.
+ base->purge_stolen(waiters);
+ close_dirfrag(basefrag);
+ } else {
+ // merge.
+ dirfragtree.merge(basefrag, bits);
+
+ // enumerate subfrags
+ for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
+ CDir *dir = get_or_open_dirfrag(mdcache, *p);
+ dout(10) << " subfrag " << *p << " " << *dir << endl;
+
+ // steal dentries
+ while (!dir->items.empty())
+ base->steal_dentry(dir->items.begin()->second);
+
+ // merge replica map
+ for (map<int,int>::iterator p = dir->replica_map.begin();
+ p != dir->replica_map.end();
+ ++p)
+ base->replica_map[p->first] = MAX(base->replica_map[p->first], p->second);
+
+ dir->purge_stolen(waiters);
+ close_dirfrag(dir->dirfrag().frag);
+ }
+ }
+
+ mdcache->mds->queue_waiters(waiters);
+}
+
+
// pins
void close_dirfrags();
bool has_subtree_root_dirfrag();
+ void fragment_dir(frag_t base, int bits);
+
protected:
// parent dentries in cache
CDentry *parent; // primary link
dirfragtree = in->dirfragtree;
st.is_dirty = in->is_dirty();
- replicas = in->replicas;
+ replicas = in->replica_map;
in->authlock._encode(locks);
in->linklock._encode(locks);
if (st.is_dirty)
in->_mark_dirty();
- in->replicas = replicas;
+ in->replica_map = replicas;
if (!replicas.empty())
in->get(CInode::PIN_REPLICATED);
// dentry specific helpers
-// trace helpers
-
/** dentry_can_rdlock_trace
* see if we can _anonymously_ rdlock an entire trace.
* if not, and req is specified, wait and retry that message.
#include "events/EExport.h"
#include "events/EImportStart.h"
#include "events/EImportFinish.h"
+#include "events/EFragment.h"
#include "events/EUpdate.h"
#include "events/ESlaveUpdate.h"
case EVENT_EXPORT: le = new EExport; break;
case EVENT_IMPORTSTART: le = new EImportStart; break;
case EVENT_IMPORTFINISH: le = new EImportFinish; break;
+ case EVENT_FRAGMENT: le = new EFragment; break;
case EVENT_UPDATE: le = new EUpdate; break;
case EVENT_SLAVEUPDATE: le = new ESlaveUpdate; break;
#define EVENT_EXPORT 30
#define EVENT_IMPORTSTART 31
#define EVENT_IMPORTFINISH 32
+#define EVENT_FRAGMENT 33
#define EVENT_UPDATE 3
#define EVENT_SLAVEUPDATE 4
#define EVENT_OPEN 5
-#define EVENT_ALLOC 10
#define EVENT_PURGEFINISH 22
#define EVENT_ANCHOR 40
set<CInode*>::iterator p = rejoin_undef_inodes.begin();
while (p != rejoin_undef_inodes.end()) {
CInode *in = *p;
- in->clear_replicas();
+ in->clear_replica_map();
// close out dirfrags
if (in->is_dir()) {
p != dfls.end();
++p) {
CDir *dir = *p;
- dir->clear_replicas();
+ dir->clear_replica_map();
for (map<string,CDentry*>::iterator p = dir->items.begin();
p != dir->items.end();
++p) {
CDentry *dn = p->second;
- dn->clear_replicas();
+ dn->clear_replica_map();
dout(10) << " trimming " << *dn << endl;
dir->remove_dentry(dn);
CDentry *dn = in->get_parent_dn();
if (dn) {
- dn->clear_replicas();
+ dn->clear_replica_map();
dout(10) << " trimming " << *dn << endl;
dn->dir->remove_dentry(dn);
} else {
if (nonce == dir->get_replica_nonce(from)) {
// remove from our cached_by
dout(7) << " dir expire on " << *dir << " from mds" << from
- << " replicas was " << dir->replicas << endl;
+ << " replicas was " << dir->replica_map << endl;
dir->remove_replica(from);
}
else {
/*
- beautiful state diagram:
-
- STOPPED DNE FAILED
- / | \ / | |
- / | \________ _______/ | |
-| v v v v v
-| STARTING <--> STANDBY <--> CREATING REPLAY -> RECONNECT -> REJOIN
-| \ / /
-| \____ ____________/ /
- \ v v /
- \ ACTIVE <----------------------------------------/
+ beautiful state diagram:
+
+ STOPPED DNE FAILED
+ / | \ / | |
+ / | \________ _______/ | |
+| v v v v |
+| STARTING <--> STANDBY <--> CREATING |
+| \ / |
+| \____ ____________/ |
+ \ v v |
+ \ ACTIVE <-- REJOIN <-- RECONNECT <-- REPLAY
\ |
\ |
\ v
#include "events/EExport.h"
#include "events/EImportStart.h"
#include "events/EImportFinish.h"
+#include "events/EFragment.h"
#include "msg/Messenger.h"
if (in->is_dirty()) in->mark_clean();
// clear/unpin cached_by (we're no longer the authority)
- in->clear_replicas();
+ in->clear_replica_map();
// twiddle lock states for auth -> replica transition
in->authlock.export_twiddle();
dstate._encode( enc_dir );
// release open_by
- dir->clear_replicas();
+ dir->clear_replica_map();
// mark
assert(dir->is_auth());
}
// log export completion, then finish (unfreeze, trigger finish context, etc.)
- dir->get(CDir::PIN_LOGGINGEXPORTFINISH);
mds->mdlog->submit_entry(le,
new C_MDS_ExportFinishLogged(this, dir));
void Migrator::export_logged_finish(CDir *dir)
{
dout(7) << "export_logged_finish " << *dir << endl;
- dir->put(CDir::PIN_LOGGINGEXPORTFINISH);
cache->verify_subtree_bounds(dir, export_bounds[dir]);
// dir
assert(cur->is_auth());
cur->state_clear(CDir::STATE_AUTH);
- cur->clear_replicas();
+ cur->clear_replica_map();
if (cur->is_dirty())
cur->mark_clean();
// dentry
dn->state_clear(CDentry::STATE_AUTH);
- dn->clear_replicas();
+ dn->clear_replica_map();
if (dn->is_dirty())
dn->mark_clean();
if (dn->is_primary()) {
CInode *in = dn->get_inode();
in->state_clear(CDentry::STATE_AUTH);
- in->clear_replicas();
+ in->clear_replica_map();
if (in->is_dirty())
in->mark_clean();
in->authlock.clear_gather();
+
+
+
+
+
+
+
+
+
+
+
+// ===================================================================
+// FRAGMENT
+
+class C_MDC_FragmentFreeze : public Context {
+ Migrator *mig;
+ CDir *dir;
+ int bits;
+public:
+ C_MDC_FragmentFreeze(Migrator *m, CDir *d, int b) : mig(m), dir(d), bits(b) {}
+ virtual void finish(int r) {
+ if (r >= 0)
+ mig->fragment_frozen(dir, bits);
+ }
+};
+
+void Migrator::fragment_dir(CDir *dir, int bits)
+{
+ dout(7) << "fragment_dir " << *dir << " bits " << bits << endl;
+ assert(dir->is_auth());
+
+ if (mds->mdsmap->is_degraded()) {
+ dout(7) << "cluster degraded, no fragmenting for now" << endl;
+ return;
+ }
+
+ if (dir->inode->is_root()) {
+ dout(7) << "i won't fragment root" << endl;
+ //assert(0);
+ return;
+ }
+
+ if (dir->is_frozen() ||
+ dir->is_freezing()) {
+ dout(7) << " can't export, freezing|frozen. wait for other exports to finish first." << endl;
+ return;
+ }
+
+ if (dir->state_test(CDir::STATE_FRAGMENTING)) {
+ dout(7) << "already fragmenting" << endl;
+ return;
+ }
+
+ dir->state_set(CDir::STATE_FRAGMENTING);
+ dir->get(CDir::PIN_FRAGMENTING);
+
+ // first, freeze.
+ dir->freeze_dir(new C_MDC_FragmentFreeze(this, dir, bits));
+}
+
+class C_MDC_FragmentLogged : public Context {
+ Migrator *mig;
+ CDir *dir;
+ int bits;
+public:
+ C_MDC_FragmentLogged(Migrator *m, CDir *d, int b) : mig(m), dir(d), bits(b) {}
+ virtual void finish(int r) {
+ if (r >= 0)
+ mig->fragment_logged(dir, bits);
+ }
+};
+
+void Migrator::fragment_frozen(CDir *dir, int bits)
+{
+ dout(7) << "fragment_frozen " << *dir << " bits " << bits << endl;
+
+ // xlock
+ CInode *diri = dir->get_inode();
+
+ if (!diri->dirfragtreelock.is_stable()) {
+ dout(10) << "fragment_frozen waiting for stable" << endl;
+ diri->dirfragtreelock.add_waiter(SimpleLock::WAIT_STABLE,
+ new C_MDC_FragmentFreeze(this, dir, bits));
+ return;
+ }
+
+ //if (diri->dirfragtreelock.get_state() != LOCK_LOCK)
+ //mds->locker->simple_lock(&diri->dirfragtreelock);
+
+ if (diri->dirfragtreelock.get_state() != LOCK_LOCK) {
+ dout(10) << "fragment_frozen waiting for lock" << endl;
+ diri->dirfragtreelock.add_waiter(SimpleLock::WAIT_STABLE,
+ new C_MDC_FragmentFreeze(this, dir, bits));
+ }
+
+ // lock. do a manual xlock.
+ diri->dirfragtreelock.get_xlock((MDRequest*)1);
+
+ // journal it.
+ EFragment *le = new EFragment(dir->ino(), dir->get_frag(), bits);
+
+ // predirty and journal content
+ le->metablob.add_dir_context(dir);
+ for (map<string,CDentry*>::iterator p = dir->items.begin();
+ p != dir->items.end();
+ ++p) {
+ p->second->pre_dirty();
+ le->metablob.add_dentry(p->second, true);
+ }
+
+ // go
+ mds->mdlog->submit_entry(le);
+ mds->mdlog->wait_for_sync(new C_MDC_FragmentLogged(this, dir, bits));
+}
+
+void Migrator::fragment_logged(CDir *dir, int bits)
+{
+ dout(10) << "fragment_logged " << *dir << " bits " << bits << endl;
+
+ CInode *diri = dir->get_inode();
+ diri->fragment_dir(dir->get_frag(), bits);
+
+ // dirty everything
+
+
+ // create fragments
+
+ frag_t startfrag = dir->get_frag();
+ list<frag_t> frags;
+ startfrag.split(bits, frags);
+
+ vector<CDir*> dirfrags(1 << bits);
+ for (list<frag_t>::iterator p = frags.begin(); p != frags.end(); ++p) {
+ CDir *f = new CDir(diri, *p, cache, true);
+
+ // propogate flags
+ f->state_set(dir->get_state() &
+ (CDir::STATE_DIRTY |
+ CDir::STATE_COMPLETE |
+ CDir::STATE_FROZENDIR));
+ f->set_version(dir->get_version());
+ f->pre_dirty();
+
+ dout(10) << " new frag " << *p << " " << *f << endl;
+ dirfrags.push_back(f);
+ diri->add_dirfrag(f);
+ }
+ assert(dirfrags.size() == frags.size());
+
+ // update dirfragtree
+ dir->inode->dirfragtree.split(startfrag, bits);
+ dout(10) << "new inode dirfragtree is " << dir->inode->dirfragtree << endl;
+
+ // partition dentries
+ while (!dir->items.empty()) {
+ map<string,CDentry*>::iterator p = dir->items.begin();
+
+ CDentry *dn = p->second;
+ frag_t frag = dir->inode->pick_dirfrag(p->first);
+ int n = frag.value() >> startfrag.bits();
+ dout(15) << "frag " << frag << " n=" << n << " for " << p->first << endl;
+ CDir *f = dirfrags[n];
+
+ CDentry *newdn;
+ if (dn->is_primary()) {
+ CInode *in = dn->get_inode();
+ dir->unlink_inode(dn);
+ newdn = f->add_dentry(dn->name, in);
+ }
+ else if (dn->is_remote()) {
+ inodeno_t ino = dn->get_remote_ino();
+ newdn = f->add_dentry(dn->name, dn->get_remote_ino());
+ }
+ else if (dn->is_null()) {
+ newdn = f->add_dentry(dn->name);
+ }
+ else
+ assert(0);
+
+ dout(15) << " new dn " << *newdn << endl;
+
+ dir->remove_dentry(dn);
+ }
+
+
+
+
+
+ // remove old dir
+ diri->close_dirfrag(startfrag);
+
+
+}
class MExportDirNotifyAck;
class MExportDirFinish;
+class MFragmentDirNotify;
+
class EImportStart;
+
class Migrator {
private:
MDS *mds;
// bystander
void handle_export_notify(MExportDirNotify *m);
+
+ // -- fragmenting --
+ void fragment_dir(CDir *dir, int byn);
+ void fragment_frozen(CDir *dir, int byn);
+ friend class C_MDC_FragmentFreeze;
+ void fragment_logged(CDir *dir, int bits);
+ friend class C_MDC_FragmentLogged;
+
+ void handle_fragment_notify(MFragmentDirNotify *m);
};
rdlocks.insert(&srctrace[i]->lock);
xlocks.insert(&srcdn->lock);
wrlocks.insert(&srcdn->dir->inode->dirlock);
+ rdlocks.insert(&srcdn->dir->inode->dirfragtreelock); // rd lock on srci dirfragtree.
// rdlock destdir path, xlock dest dentry
for (int i=0; i<(int)desttrace.size(); i++)
--- /dev/null
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation. See file COPYING.
+ *
+ */
+
+#ifndef __MDS_EFRAGMENT_H
+#define __MDS_EFRAGMENT_H
+
+#include "../LogEvent.h"
+#include "EMetaBlob.h"
+
+class EFragment : public LogEvent {
+public:
+ inodeno_t ino;
+ frag_t basefrag;
+ int bits; // positive for split (from basefrag), negative for merge (to basefrag)
+ EMetaBlob metablob;
+
+ EFragment() : LogEvent(EVENT_FRAGMENT) { }
+ EFragment(inodeno_t i, frag_t bf, int b) :
+ LogEvent(EVENT_FRAGMENT),
+ ino(i), basefrag(bf), bits(b) { }
+ void print(ostream& out) {
+ out << "EFragment " << ino << " " << basefrag << " by " << bits << " " << metablob;
+ }
+
+ void encode_payload(bufferlist& bl) {
+ ::_encode(ino, bl);
+ ::_encode(basefrag, bl);
+ ::_encode(bits, bl);
+ metablob._encode(bl);
+ }
+ void decode_payload(bufferlist& bl, int& off) {
+ ::_decode(ino, bl, off);
+ ::_decode(basefrag, bl, off);
+ ::_decode(bits, bl, off);
+ metablob._decode(bl, off);
+ }
+
+ bool has_expired(MDS *mds);
+ void expire(MDS *mds, Context *c);
+ void replay(MDS *mds);
+};
+
+#endif
#include "events/EExport.h"
#include "events/EImportStart.h"
#include "events/EImportFinish.h"
+#include "events/EFragment.h"
#include "events/EAnchor.h"
#include "events/EAnchorClient.h"
+// -----------------------
+// EFragment
+
+bool EFragment::has_expired(MDS *mds)
+{
+ return metablob.has_expired(mds);
+}
+
+void EFragment::expire(MDS *mds, Context *c)
+{
+ metablob.expire(mds, c);
+}
+
+void EFragment::replay(MDS *mds)
+{
+ dout(10) << "EFragment.replay " << ino << " " << basefrag << " by " << bits << endl;
+
+ CInode *in = mds->mdcache->get_inode(ino);
+ assert(in);
+
+ in->fragment_dir(basefrag, bits);
+ metablob.replay(mds);
+}
+
+
// -----------------------
// EPurgeFinish
// --------------------------------------------
// replication
protected:
- map<int,int> replicas; // [auth] mds -> nonce
+ map<int,int> replica_map; // [auth] mds -> nonce
int replica_nonce; // [replica] defined on replica
public:
- bool is_replicated() { return !replicas.empty(); }
- bool is_replica(int mds) { return replicas.count(mds); }
- int num_replicas() { return replicas.size(); }
+ bool is_replicated() { return !replica_map.empty(); }
+ bool is_replica(int mds) { return replica_map.count(mds); }
+ int num_replicas() { return replica_map.size(); }
int add_replica(int mds) {
- if (replicas.count(mds))
- return ++replicas[mds]; // inc nonce
- if (replicas.empty())
+ if (replica_map.count(mds))
+ return ++replica_map[mds]; // inc nonce
+ if (replica_map.empty())
get(PIN_REPLICATED);
- return replicas[mds] = 1;
+ return replica_map[mds] = 1;
}
void add_replica(int mds, int nonce) {
- if (replicas.empty())
+ if (replica_map.empty())
get(PIN_REPLICATED);
- replicas[mds] = nonce;
+ replica_map[mds] = nonce;
}
int get_replica_nonce(int mds) {
- assert(replicas.count(mds));
- return replicas[mds];
+ assert(replica_map.count(mds));
+ return replica_map[mds];
}
void remove_replica(int mds) {
- assert(replicas.count(mds));
- replicas.erase(mds);
- if (replicas.empty())
+ assert(replica_map.count(mds));
+ replica_map.erase(mds);
+ if (replica_map.empty())
put(PIN_REPLICATED);
}
- void clear_replicas() {
- if (!replicas.empty())
+ void clear_replica_map() {
+ if (!replica_map.empty())
put(PIN_REPLICATED);
- replicas.clear();
+ replica_map.clear();
}
- map<int,int>::iterator replicas_begin() { return replicas.begin(); }
- map<int,int>::iterator replicas_end() { return replicas.end(); }
- const map<int,int>& get_replicas() { return replicas; }
+ map<int,int>::iterator replicas_begin() { return replica_map.begin(); }
+ map<int,int>::iterator replicas_end() { return replica_map.end(); }
+ const map<int,int>& get_replicas() { return replica_map; }
void list_replicas(set<int>& ls) {
- for (map<int,int>::const_iterator p = replicas.begin();
- p != replicas.end();
+ for (map<int,int>::const_iterator p = replica_map.begin();
+ p != replica_map.end();
++p)
ls.insert(p->first);
}