mds->server->dispatch_slave_request(mdr);
} else {
switch (mdr->internal_op) {
- case MDS_INTERNAL_OP_FRAGMENT:
- dispatch_fragment(mdr);
- break;
+
+ // ...
default:
assert(0);
}
}
+
+class C_MDC_FragmentFrozen : public Context {
+ MDCache *mdcache;
+ list<CDir*> dirs;
+ int by;
+public:
+ C_MDC_FragmentFrozen(MDCache *m, list<CDir*> d, int b) : mdcache(m), dirs(d), by(b) {}
+ virtual void finish(int r) {
+ mdcache->fragment_frozen(dirs, by);
+ }
+};
+
void MDCache::split_dir(CDir *dir, int bits)
{
dout(7) << "split_dir " << *dir << " bits " << bits << dendl;
dout(7) << "i won't split anything in stray" << dendl;
return;
}
- if (dir->state_test(CDir::STATE_FRAGMENTING)) {
- dout(7) << "already fragmenting" << dendl;
+ if (dir->is_frozen() ||
+ dir->is_freezing()) {
+ dout(7) << " can't fragment, freezing|frozen. wait for other exports to finish first." << dendl;
return;
}
- if (!dir->can_auth_pin()) {
- dout(7) << "not authpinnable on " << *dir << dendl;
+ if (dir->state_test(CDir::STATE_FRAGMENTING)) {
+ dout(7) << "already fragmenting" << dendl;
return;
}
- // register request
- // this is primary so we can hold multiple locks, remote auth_pins, and all that
- MDRequest *mdr = request_start_internal(MDS_INTERNAL_OP_FRAGMENT);
-
- // describe the fragment mutation
- mdr->more()->fragment_in = dir->inode;
- mdr->more()->fragment_base = dir->dirfrag().frag;
- mdr->more()->fragment_start.push_back(dir);
- mdr->more()->fragment_bits = bits;
+ // ok.
+ list<CDir*> dirs;
+ dirs.push_back(dir);
- // mark start frag
- //mdr->auth_pin(dir); // this will block the freeze, until mark_complete completes
- //dir->auth_pin();
- dir->state_set(CDir::STATE_FRAGMENTING);
+ C_Gather *gather = new C_Gather(new C_MDC_FragmentFrozen(this, dirs, bits));
+ fragment_freeze_dirs(dirs, gather);
- dispatch_request(mdr);
+ // initial mark+complete pass
+ fragment_mark_and_complete(dirs);
}
-class C_MDC_FragmentGo : public Context {
- MDCache *mdcache;
- MDRequest *mdr;
-public:
- C_MDC_FragmentGo(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
- virtual void finish(int r) {
- mdcache->fragment_go(mdr);
- }
-};
-
-void MDCache::dispatch_fragment(MDRequest *mdr)
+void MDCache::fragment_freeze_dirs(list<CDir*>& dirs, C_Gather *gather)
{
- CInode *diri = mdr->more()->fragment_in;
- int bits = mdr->more()->fragment_bits;
- dout(10) << "dispatch_fragment " << *mdr << " bits " << bits << " " << *diri << dendl;
-
- // (try to re-) auth_pin start fragments (acquire_locks may have to drop auth_pins to avoid deadlock)
- for (list<CDir*>::iterator p = mdr->more()->fragment_start.begin();
- p != mdr->more()->fragment_start.end();
- ++p) {
- CDir *dir = *p;
- if (!mdr->is_auth_pinned(dir) &&
- (!dir->is_auth() || !dir->can_auth_pin())) {
- dout(10) << " giving up, no longer auth+authpinnable on " << *dir << dendl;
- for (list<CDir*>::iterator p = mdr->more()->fragment_start.begin();
- p != mdr->more()->fragment_start.end();
- ++p)
- (*p)->state_clear(CDir::STATE_FRAGMENTING);
- request_finish(mdr);
- return;
- }
- mdr->auth_pin(dir);
- }
-
- // wrlock filelock, dftlock
- set<SimpleLock*> rdlocks, wrlocks, xlocks;
- wrlocks.insert(&diri->dirfragtreelock);
- wrlocks.insert(&diri->filelock);
- if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
- return;
-
- /*
- * initiate the freeze, blocking with an auth_pin.
- *
- * some reason(s) we have to freeze:
- * - on merge, version/projected version are unified from all fragments;
- * concurrent pipelined updates in the directory will have divergent
- * versioning... and that's no good.
- */
- dout(10) << "dispatch_fragment freezing start frags" << dendl;
- C_Gather *gather = new C_Gather(new C_MDC_FragmentGo(this, mdr));
-
- for (list<CDir*>::iterator p = mdr->more()->fragment_start.begin();
- p != mdr->more()->fragment_start.end();
- ++p) {
+ for (list<CDir*>::iterator p = dirs.begin(); p != dirs.end(); p++) {
CDir *dir = *p;
+ dir->auth_pin(dir); // until we mark and complete them
+ dir->state_set(CDir::STATE_FRAGMENTING);
dir->freeze_dir();
assert(dir->is_freezing_dir());
dir->add_waiter(CDir::WAIT_FROZEN, gather->new_sub());
}
+}
- // initial mark+complete pass
- fragment_mark_and_complete(mdr);
+/*
+void MDCache::fragment_unfreeze_dirs(list<CDir*>& dirs)
+{
+ for (list<CDir*>::iterator p = dirs.begin(); p != dirs.end(); p++) {
+ CDir *dir = *p;
+ dir->state_clear(CDir::STATE_FRAGMENTING);
+ dir->unfreeze_dir();
+ }
}
+*/
class C_MDC_FragmentMarking : public Context {
MDCache *mdcache;
- MDRequest *mdr;
+ list<CDir*>& dirs;
public:
- C_MDC_FragmentMarking(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
+ C_MDC_FragmentMarking(MDCache *m, list<CDir*>& d) : mdcache(m), dirs(d) {}
virtual void finish(int r) {
- mdcache->fragment_mark_and_complete(mdr);
+ mdcache->fragment_mark_and_complete(dirs);
}
};
-void MDCache::fragment_mark_and_complete(MDRequest *mdr)
+void MDCache::fragment_mark_and_complete(list<CDir*>& dirs)
{
- CInode *diri = mdr->more()->fragment_in;
- list<CDir*>& startfrags = mdr->more()->fragment_start;
- frag_t basefrag = mdr->more()->fragment_base;
- int bits = mdr->more()->fragment_bits;
+ CInode *diri = dirs.front()->get_inode();
+ dout(10) << "fragment_mark_and_complete " << dirs << " on " << *diri << dendl;
- dout(10) << "fragment_mark_and_complete " << *mdr << " " << basefrag << " by " << bits
- << " on " << *diri << dendl;
-
C_Gather *gather = 0;
- for (list<CDir*>::iterator p = startfrags.begin();
- p != startfrags.end();
+ for (list<CDir*>::iterator p = dirs.begin();
+ p != dirs.end();
++p) {
CDir *dir = *p;
if (!dir->is_complete()) {
dout(15) << " fetching incomplete " << *dir << dendl;
- if (!gather) gather = new C_Gather(new C_MDC_FragmentMarking(this, mdr));
+ if (!gather)
+ gather = new C_Gather(new C_MDC_FragmentMarking(this, dirs));
dir->fetch(gather->new_sub(),
true); // ignore authpinnability
}
p->second->state_set(CDentry::STATE_FRAGMENTING);
}
dir->state_set(CDir::STATE_DNPINNEDFRAG);
- mdr->auth_unpin(dir); // allow our freeze to complete
- //dir->auth_unpin();
+ dir->auth_unpin(dir);
}
else {
- dout(15) << " marked " << *dir << dendl;
+ dout(15) << " already marked " << *dir << dendl;
}
}
}
+
class C_MDC_FragmentStored : public Context {
MDCache *mdcache;
- MDRequest *mdr;
+ list<CDir*> dirs;
+ frag_t basefrag;
+ int bits;
public:
- C_MDC_FragmentStored(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
+ C_MDC_FragmentStored(MDCache *m, list<CDir*>& d, frag_t bf, int b) :
+ mdcache(m), dirs(d), basefrag(bf), bits(b) {}
virtual void finish(int r) {
- mdcache->fragment_stored(mdr);
+ mdcache->fragment_stored(dirs, basefrag, bits);
}
};
-void MDCache::fragment_go(MDRequest *mdr)
+void MDCache::fragment_frozen(list<CDir*>& dirs, int bits)
{
- CInode *diri = mdr->more()->fragment_in;
- frag_t basefrag = mdr->more()->fragment_base;
- int bits = mdr->more()->fragment_bits;
+ CInode *diri = dirs.front()->get_inode();
- dout(10) << "fragment_go " << *mdr << " " << basefrag << " by " << bits
+ frag_t basefrag;
+ if (bits > 0) {
+ assert(dirs.size() == 1);
+ basefrag = dirs.front()->get_frag();
+ } else {
+ basefrag = dirs.front()->get_frag();
+ //basefrag.parent_by_something(bits);
+ assert(0);
+ }
+
+ dout(10) << "fragment_frozen " << dirs << " " << basefrag << " by " << bits
<< " on " << *diri << dendl;
// refragment
- list<CDir*> &resultfrags = mdr->more()->fragment_result;
+ list<CDir*> resultfrags;
list<Context*> waiters;
adjust_dir_fragments(diri, basefrag, bits, resultfrags, waiters, false);
mds->queue_waiters(waiters);
- C_Gather *gather = new C_Gather(new C_MDC_FragmentStored(this, mdr));
+ C_Gather *gather = new C_Gather(new C_MDC_FragmentStored(this, resultfrags, basefrag, bits));
// freeze, store resulting frags
for (list<CDir*>::iterator p = resultfrags.begin();
class C_MDC_FragmentLogged : public Context {
MDCache *mdcache;
- MDRequest *mdr;
+ Mutation *mut;
+ list<CDir*> resultfrags;
+ frag_t basefrag;
+ int bits;
public:
- C_MDC_FragmentLogged(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
+ C_MDC_FragmentLogged(MDCache *m, Mutation *mu, list<CDir*>& r, frag_t bf, int bi) :
+ mdcache(m), mut(mu), resultfrags(r), basefrag(bf), bits(bi) {}
virtual void finish(int r) {
- mdcache->fragment_logged(mdr);
+ mdcache->fragment_logged(mut, resultfrags, basefrag, bits);
}
};
-void MDCache::fragment_stored(MDRequest *mdr)
+void MDCache::fragment_stored(list<CDir*>& resultfrags, frag_t basefrag, int bits)
{
- CInode *diri = mdr->more()->fragment_in;
- list<CDir*> &resultfrags = mdr->more()->fragment_result;
- frag_t basefrag = mdr->more()->fragment_base;
- int bits = mdr->more()->fragment_bits;
-
- dout(10) << "fragment_stored " << *mdr << " " << basefrag << " by " << bits
+ CInode *diri = resultfrags.front()->get_inode();
+ dout(10) << "fragment_stored " << resultfrags << " " << basefrag << " by " << bits
<< " on " << *diri << dendl;
- mdr->ls = mds->mdlog->get_current_segment();
+ Mutation *mut = new Mutation;
+
+ mut->ls = mds->mdlog->get_current_segment();
EFragment *le = new EFragment(mds->mdlog, diri->ino(), basefrag, bits);
mds->mdlog->start_entry(le);
// dft lock
mds->locker->mark_updated_scatterlock(&diri->dirfragtreelock);
- mdr->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree);
- mdr->add_updated_lock(&diri->dirfragtreelock);
+ mut->ls->dirty_dirfrag_dirfragtree.push_back(&diri->item_dirty_dirfrag_dirfragtree);
+ mut->add_updated_lock(&diri->dirfragtreelock);
+ /*
// filelock
mds->locker->mark_updated_scatterlock(&diri->filelock);
- mdr->ls->dirty_dirfrag_dir.push_back(&diri->item_dirty_dirfrag_dir);
- mdr->add_updated_lock(&diri->filelock);
+ mut->ls->dirty_dirfrag_dir.push_back(&diri->item_dirty_dirfrag_dir);
+ mut->add_updated_lock(&diri->filelock);
// dirlock
mds->locker->mark_updated_scatterlock(&diri->nestlock);
- mdr->ls->dirty_dirfrag_nest.push_back(&diri->item_dirty_dirfrag_nest);
- mdr->add_updated_lock(&diri->nestlock);
+ mut->ls->dirty_dirfrag_nest.push_back(&diri->item_dirty_dirfrag_nest);
+ mut->add_updated_lock(&diri->nestlock);
+ */
// journal new dirfrag fragstats for each new fragment.
for (list<CDir*>::iterator p = resultfrags.begin();
}
mds->mdlog->submit_entry(le,
- new C_MDC_FragmentLogged(this, mdr));
+ new C_MDC_FragmentLogged(this, mut, resultfrags, basefrag, bits));
mds->mdlog->flush();
}
-void MDCache::fragment_logged(MDRequest *mdr)
+void MDCache::fragment_logged(Mutation *mut, list<CDir*>& resultfrags, frag_t basefrag, int bits)
{
- CInode *diri = mdr->more()->fragment_in;
- list<CDir*> &resultfrags = mdr->more()->fragment_result;
- frag_t basefrag = mdr->more()->fragment_base;
- int bits = mdr->more()->fragment_bits;
+ CInode *diri = resultfrags.front()->get_inode();
- dout(10) << "fragment_logged " << *mdr << " " << basefrag << " bits " << bits
+ dout(10) << "fragment_logged " << resultfrags << " " << basefrag << " bits " << bits
<< " on " << *diri << dendl;
// tell peers
// freshly replicate new basedir to peer on merge
CDir *base = resultfrags.front();
replicate_dir(base, p->first, notify->basebl);
+ assert(0); // audit this
}
mds->send_message_mds(notify, p->first);
}
-
- mdr->apply(); // mark scatterlock
+
+ mut->apply(); // mark scatterlock
+ mds->locker->drop_locks(mut);
+ mut->cleanup();
+ delete mut;
// unfreeze resulting frags
- set<int> peers;
for (list<CDir*>::iterator p = resultfrags.begin();
p != resultfrags.end();
p++) {
dir->unfreeze_dir();
}
-
- // done! clean up, drop locks, etc.
- request_finish(mdr);
}