return mdr;
}
+MDRequest *MDCache::request_start_internal(int op)
+{
+ MDRequest *mdr = new MDRequest;
+ mdr->reqid.name = entity_name_t::MDS(mds->get_nodeid());
+ mdr->reqid.tid = mds->issue_tid();
+ mdr->internal_op = op;
+
+ assert(active_requests.count(mdr->reqid) == 0);
+ active_requests[mdr->reqid] = mdr;
+ dout(7) << "request_start_internal " << *mdr << " op " << op << dendl;
+ return mdr;
+}
+
MDRequest *MDCache::request_get(metareqid_t rid)
{
mds->server->dispatch_client_request(mdr);
} else if (mdr->slave_request) {
mds->server->dispatch_slave_request(mdr);
- } else
- assert(0);
+ } else {
+ switch (mdr->internal_op) {
+ case MDS_INTERNAL_OP_FRAGMENT:
+ dispatch_fragment(mdr);
+ break;
+
+ default:
+ assert(0);
+ }
+ }
}
-
void MDCache::request_forget_foreign_locks(MDRequest *mdr)
{
// xlocks
+// ===================================================================
+
+
+
// ===================================================================
// FRAGMENT
// yuck. we may have discovered the inode while it was being fragmented.
if (!diri->dirfragtree.is_leaf(basefrag))
diri->dirfragtree.force_to_leaf(basefrag);
- diri->dirfragtree.split(basefrag, bits);
- dout(10) << " new fragtree is " << diri->dirfragtree << dendl;
CDir *base = diri->get_or_open_dirfrag(this, basefrag);
+ diri->dirfragtree.split(basefrag, bits);
+ dout(10) << " new fragtree is " << diri->dirfragtree << dendl;
+
if (bits > 0) {
if (base) {
CDir *baseparent = base->get_parent_dir();
}
}
-class C_MDC_FragmentGo : public Context {
- MDCache *mdcache;
- CInode *diri;
- list<CDir*> dirs;
- frag_t basefrag;
- int bits;
-public:
- C_MDC_FragmentGo(MDCache *m, CInode *di, list<CDir*>& dls, frag_t bf, int b) :
- mdcache(m), diri(di), dirs(dls), basefrag(bf), bits(b) { }
- virtual void finish(int r) {
- mdcache->fragment_go(diri, dirs, basefrag, bits);
- }
-};
-
void MDCache::split_dir(CDir *dir, int bits)
{
dout(7) << "split_dir " << *dir << " bits " << bits << dendl;
return;
}
- // wrlock
- if (!mds->locker->scatter_wrlock_try(&dir->inode->dirfragtreelock, 0, false)) {
- dout(7) << "can't wrlock dirfragtree on " << *dir->inode << dendl;
- return;
- }
+ // register request
+ // this is primary so we can hold multiple locks, remote auth_pins, and all that
+ MDRequest *mdr = request_start_internal(MDS_INTERNAL_OP_FRAGMENT);
- list<CDir*> startfrags;
- startfrags.push_back(dir);
-
+ // describe the fragment mutation
+ mdr->more()->fragment_in = dir->inode;
+ mdr->more()->fragment_base = dir->dirfrag().frag;
+ mdr->more()->fragment_start.push_back(dir);
+ mdr->more()->fragment_bits = bits;
+
+ // mark start frag
+ //mdr->auth_pin(dir); // this will block the freeze, until mark_complete completes
+ //dir->auth_pin();
dir->state_set(CDir::STATE_FRAGMENTING);
-
- fragment_freeze(dir->get_inode(), startfrags, dir->get_frag(), bits);
- fragment_mark_and_complete(dir->get_inode(), startfrags, dir->get_frag(), bits);
+
+ dispatch_request(mdr);
}
-/*
- * initial the freeze, blocking with an auth_pin.
- *
- * some reason(s) we have to freeze:
- * - on merge, version/projected version are unified from all fragments;
- * concurrent pipelined updates in the directory will have divergent
- * versioning... and that's no good.
- */
-void MDCache::fragment_freeze(CInode *diri, list<CDir*>& frags, frag_t basefrag, int bits)
+class C_MDC_FragmentGo : public Context {
+ MDCache *mdcache;
+ MDRequest *mdr;
+public:
+ C_MDC_FragmentGo(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
+ virtual void finish(int r) {
+ mdcache->fragment_go(mdr);
+ }
+};
+
+void MDCache::dispatch_fragment(MDRequest *mdr)
{
- C_Gather *gather = new C_Gather(new C_MDC_FragmentGo(this, diri, frags, basefrag, bits));
-
- // freeze the dirs
- for (list<CDir*>::iterator p = frags.begin();
- p != frags.end();
+ CInode *diri = mdr->more()->fragment_in;
+ int bits = mdr->more()->fragment_bits;
+ dout(10) << "dispatch_fragment " << *mdr << " bits " << bits << " " << *diri << dendl;
+
+ // (try to re-) auth_pin start fragments (acquire_locks may have to drop auth_pins to avoid deadlock)
+ for (list<CDir*>::iterator p = mdr->more()->fragment_start.begin();
+ p != mdr->more()->fragment_start.end();
+ ++p) {
+ CDir *dir = *p;
+ if (!mdr->is_auth_pinned(dir) &&
+ (!dir->is_auth() || !dir->can_auth_pin())) {
+ dout(10) << " giving up, no longer auth+authpinnable on " << *dir << dendl;
+ for (list<CDir*>::iterator p = mdr->more()->fragment_start.begin();
+ p != mdr->more()->fragment_start.end();
+ ++p)
+ (*p)->state_clear(CDir::STATE_FRAGMENTING);
+ request_finish(mdr);
+ return;
+ }
+ mdr->auth_pin(dir);
+ }
+
+ // wrlock dirlock, dftlock
+ set<SimpleLock*> rdlocks, wrlocks, xlocks;
+ wrlocks.insert(&diri->dirfragtreelock);
+ wrlocks.insert(&diri->dirlock);
+ if (!mds->locker->acquire_locks(mdr, rdlocks, wrlocks, xlocks))
+ return;
+
+ /*
+ * initiate the freeze, blocking with an auth_pin.
+ *
+ * some reason(s) we have to freeze:
+ * - on merge, version/projected version are unified from all fragments;
+ * concurrent pipelined updates in the directory will have divergent
+ * versioning... and that's no good.
+ */
+ dout(10) << "dispatch_fragment freezing start frags" << dendl;
+ C_Gather *gather = new C_Gather(new C_MDC_FragmentGo(this, mdr));
+
+ for (list<CDir*>::iterator p = mdr->more()->fragment_start.begin();
+ p != mdr->more()->fragment_start.end();
++p) {
CDir *dir = *p;
- dir->auth_pin(); // this will block the freeze, until mark_and_complete
dir->freeze_dir();
assert(dir->is_freezing_dir());
dir->add_waiter(CDir::WAIT_FROZEN, gather->new_sub());
}
+
+ // initial mark+complete pass
+ fragment_mark_and_complete(mdr);
}
class C_MDC_FragmentMarking : public Context {
MDCache *mdcache;
- CInode *diri;
- list<CDir*> dirs;
- frag_t basefrag;
- int bits;
+ MDRequest *mdr;
public:
- C_MDC_FragmentMarking(MDCache *m, CInode *di, list<CDir*>& dls, frag_t bf, int b) :
- mdcache(m), diri(di), dirs(dls), basefrag(bf), bits(b) { }
+ C_MDC_FragmentMarking(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
virtual void finish(int r) {
- mdcache->fragment_mark_and_complete(diri, dirs, basefrag, bits);
+ mdcache->fragment_mark_and_complete(mdr);
}
};
-void MDCache::fragment_mark_and_complete(CInode *diri,
- list<CDir*>& startfrags,
- frag_t basefrag, int bits)
+void MDCache::fragment_mark_and_complete(MDRequest *mdr)
{
- dout(10) << "fragment_mark_and_complete " << basefrag << " by " << bits
+ CInode *diri = mdr->more()->fragment_in;
+ list<CDir*>& startfrags = mdr->more()->fragment_start;
+ frag_t basefrag = mdr->more()->fragment_base;
+ int bits = mdr->more()->fragment_bits;
+
+ dout(10) << "fragment_mark_and_complete " << *mdr << " " << basefrag << " by " << bits
<< " on " << *diri << dendl;
C_Gather *gather = 0;
if (!dir->is_complete()) {
dout(15) << " fetching incomplete " << *dir << dendl;
- if (!gather) gather = new C_Gather(new C_MDC_FragmentMarking(this, diri, startfrags, basefrag, bits));
+ if (!gather) gather = new C_Gather(new C_MDC_FragmentMarking(this, mdr));
dir->fetch(gather->new_sub(),
true); // ignore authpinnability
}
p->second->state_set(CDentry::STATE_FRAGMENTING);
}
dir->state_set(CDir::STATE_DNPINNEDFRAG);
- dir->auth_unpin(); // allow our freeze to complete
+ mdr->auth_unpin(dir); // allow our freeze to complete
+ //dir->auth_unpin();
}
else {
dout(15) << " marked " << *dir << dendl;
}
}
-
class C_MDC_FragmentStored : public Context {
MDCache *mdcache;
- CInode *diri;
- frag_t basefrag;
- int bits;
- list<CDir*> resultfrags;
+ MDRequest *mdr;
public:
- C_MDC_FragmentStored(MDCache *m, CInode *di, frag_t bf, int b,
- list<CDir*>& rf) :
- mdcache(m), diri(di), basefrag(bf), bits(b), resultfrags(rf) { }
+ C_MDC_FragmentStored(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
virtual void finish(int r) {
- mdcache->fragment_stored(diri, basefrag, bits, resultfrags);
+ mdcache->fragment_stored(mdr);
}
};
-void MDCache::fragment_go(CInode *diri, list<CDir*>& startfrags, frag_t basefrag, int bits)
+void MDCache::fragment_go(MDRequest *mdr)
{
- dout(10) << "fragment_go " << basefrag << " by " << bits
+ CInode *diri = mdr->more()->fragment_in;
+ frag_t basefrag = mdr->more()->fragment_base;
+ int bits = mdr->more()->fragment_bits;
+
+ dout(10) << "fragment_go " << *mdr << " " << basefrag << " by " << bits
<< " on " << *diri << dendl;
// refragment
- list<CDir*> resultfrags;
+ list<CDir*> &resultfrags = mdr->more()->fragment_result;
list<Context*> waiters;
adjust_dir_fragments(diri, basefrag, bits, resultfrags, waiters);
mds->queue_waiters(waiters);
- C_Gather *gather = new C_Gather(new C_MDC_FragmentStored(this, diri, basefrag, bits, resultfrags));
+ C_Gather *gather = new C_Gather(new C_MDC_FragmentStored(this, mdr));
// freeze, store resulting frags
for (list<CDir*>::iterator p = resultfrags.begin();
class C_MDC_FragmentLogged : public Context {
MDCache *mdcache;
- CInode *diri;
- frag_t basefrag;
- int bits;
- list<CDir*> resultfrags;
- Mutation *mut;
+ MDRequest *mdr;
public:
- C_MDC_FragmentLogged(MDCache *m, CInode *di, frag_t bf, int b,
- list<CDir*>& rf, Mutation *mu) :
- mdcache(m), diri(di), basefrag(bf), bits(b), mut(mu) {
- resultfrags.swap(rf);
- }
+ C_MDC_FragmentLogged(MDCache *m, MDRequest *r) : mdcache(m), mdr(r) {}
virtual void finish(int r) {
- mdcache->fragment_logged(diri, basefrag, bits,
- resultfrags, mut);
+ mdcache->fragment_logged(mdr);
}
};
-void MDCache::fragment_stored(CInode *diri, frag_t basefrag, int bits,
- list<CDir*>& resultfrags)
+void MDCache::fragment_stored(MDRequest *mdr)
{
- dout(10) << "fragment_stored " << basefrag << " by " << bits
+ CInode *diri = mdr->more()->fragment_in;
+ list<CDir*> &resultfrags = mdr->more()->fragment_result;
+ frag_t basefrag = mdr->more()->fragment_base;
+ int bits = mdr->more()->fragment_bits;
+
+ dout(10) << "fragment_stored " << *mdr << " " << basefrag << " by " << bits
<< " on " << *diri << dendl;
- Mutation *mut = new Mutation;
- mut->ls = mds->mdlog->get_current_segment();
+ mdr->ls = mds->mdlog->get_current_segment();
EFragment *le = new EFragment(mds->mdlog, diri->ino(), basefrag, bits);
le->metablob.add_dir_context(*resultfrags.begin());
+ // dft lock
mds->locker->mark_updated_scatterlock(&diri->dirfragtreelock);
- mut->ls->dirty_dirfrag_dirfragtree.push_back(&diri->xlist_dirty_dirfrag_dirfragtree);
- mut->add_updated_scatterlock(&diri->dirfragtreelock);
+ mdr->ls->dirty_dirfrag_dirfragtree.push_back(&diri->xlist_dirty_dirfrag_dirfragtree);
+ mdr->add_updated_scatterlock(&diri->dirfragtreelock);
+
+ // dirlock
+ mds->locker->mark_updated_scatterlock(&diri->dirlock);
+ mdr->ls->dirty_dirfrag_dir.push_back(&diri->xlist_dirty_dirfrag_dir);
+ mdr->add_updated_scatterlock(&diri->dirlock);
// journal new dirfrag fragstats for each new fragment.
- // mark complete. but not dirty.
for (list<CDir*>::iterator p = resultfrags.begin();
p != resultfrags.end();
p++) {
CDir *dir = *p;
dout(10) << " result frag " << *dir << dendl;
- le->metablob.add_dir(dir, true).mark_complete(); // mark complete
+ le->metablob.add_dir(dir, false);
}
mds->mdlog->submit_entry(le,
- new C_MDC_FragmentLogged(this, diri, basefrag, bits,
- resultfrags, mut));
+ new C_MDC_FragmentLogged(this, mdr));
}
-void MDCache::fragment_logged(CInode *diri, frag_t basefrag, int bits,
- list<CDir*>& resultfrags,
- Mutation *mut)
+void MDCache::fragment_logged(MDRequest *mdr)
{
- dout(10) << "fragment_logged " << basefrag << " bits " << bits
+ CInode *diri = mdr->more()->fragment_in;
+ list<CDir*> &resultfrags = mdr->more()->fragment_result;
+ frag_t basefrag = mdr->more()->fragment_base;
+ int bits = mdr->more()->fragment_bits;
+
+ dout(10) << "fragment_logged " << *mdr << " " << basefrag << " bits " << bits
<< " on " << *diri << dendl;
// tell peers
mds->send_message_mds(notify, p->first);
}
- mut->apply(); // mark scatterlocks, mainly.
+ mdr->apply(); // mark scatterlock
// unfreeze resulting frags
set<int> peers;
CDir *dir = *p;
dout(10) << " result frag " << *dir << dendl;
- // dirty, unpin, unfreeze
+ // unmark, unfreeze
dir->state_clear(CDir::STATE_FRAGMENTING);
for (CDir::map_t::iterator p = dir->items.begin();
dir->unfreeze_dir();
}
- mds->locker->scatter_wrlock_finish(&diri->dirfragtreelock, 0);
+ // done! clean up, drop locks, etc.
+ request_finish(mdr);
}
list<CDir*> projected_fnodes;
list<ScatterLock*> updated_scatterlocks;
- Mutation() : ls(0),
- done_locking(false), committing(false), aborted(false) {}
+ Mutation() :
+ ls(0),
+ slave_to_mds(-1),
+ done_locking(false), committing(false), aborted(false) {}
Mutation(metareqid_t ri, int slave_to=-1) :
reqid(ri),
ls(0),
}
}
void auth_unpin(MDSCacheObject *object) {
- assert(is_auth_pinned(object));
+ assert(auth_pins.count(object));
object->auth_unpin();
auth_pins.erase(object);
}
}
+enum {
+ MDS_INTERNAL_OP_FRAGMENT,
+};
+
/** active_request_t
* state we track for requests we are currently processing.
* mostly information about locks held, so that we can drop them all
// -- i am a slave request
MMDSSlaveRequest *slave_request; // slave request (if one is pending; implies slave == true)
+ // -- i am an internal op
+ int internal_op;
+
// break rarely-used fields into a separately allocated structure
// to save memory for most ops
Context *slave_commit;
bufferlist rollback_bl;
+ // internal ops
+ CInode *fragment_in;
+ frag_t fragment_base;
+ list<CDir*> fragment_start;
+ list<CDir*> fragment_result;
+ int fragment_bits;
+
More() :
src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
destdn_was_remote_inode(0), was_link_merge(false),
- slave_commit(0) { }
+ slave_commit(0),
+ fragment_in(0), fragment_bits(0) { }
} *_more;
MDRequest() :
session(0), client_request(0), ref(0),
slave_request(0),
+ internal_op(-1),
_more(0) {}
MDRequest(metareqid_t ri, MClientRequest *req) :
Mutation(ri),
session(0), client_request(req), ref(0),
slave_request(0),
+ internal_op(-1),
_more(0) {}
MDRequest(metareqid_t ri, int by) :
Mutation(ri, by),
session(0), client_request(0), ref(0),
slave_request(0),
+ internal_op(-1),
_more(0) {}
~MDRequest() {
delete _more;
if (is_slave()) out << " slave_to mds" << slave_to_mds;
if (client_request) out << " cr=" << client_request;
if (slave_request) out << " sr=" << slave_request;
+ if (internal_op == MDS_INTERNAL_OP_FRAGMENT) out << " fragment";
out << ")";
}
};
public:
MDRequest* request_start(MClientRequest *req);
MDRequest* request_start_slave(metareqid_t rid, int by);
+ MDRequest* request_start_internal(int op);
bool have_request(metareqid_t rid) {
return active_requests.count(rid);
}
void split_dir(CDir *dir, int byn);
private:
- void fragment_freeze(CInode *diri, list<CDir*>& startfrags, frag_t basefrag, int bits);
- void fragment_mark_and_complete(CInode *diri, list<CDir*>& startfrags, frag_t basefrag, int bits);
- void fragment_go(CInode *diri, list<CDir*>& startfrags, frag_t basefrag, int bits);
- void fragment_stored(CInode *diri, frag_t basefrag, int bits, list<CDir*>& resultfrags);
- void fragment_logged(CInode *diri, frag_t basefrag, int bits, list<CDir*>& resultfrags, Mutation *mut);
+ void dispatch_fragment(MDRequest *mdr);
+ void fragment_mark_and_complete(MDRequest *mdr);
+ void fragment_go(MDRequest *mdr);
+ void fragment_stored(MDRequest *mdr);
+ void fragment_logged(MDRequest *mdr);
friend class C_MDC_FragmentGo;
friend class C_MDC_FragmentMarking;
friend class C_MDC_FragmentStored;