/ - STICKY dir state and pin? make sure it's kept across import/export/fragment
/ - pull _bound maps out of Migrator; they are redundant (trust the subtree map!)
-** fix fetch/commit to avoid carrying a CDir* pointer around
-
- handle_resolve needs to infer splits/merges
+ - rejoin, too!
- auth journals and applies update in the request update pipeline
- dirfragtree is lazily consistent. no lock. bcast by primary when it updates.
- CDentry objects will be moved to the new frag(s)
- Server etc. must take care not to carry CDir pointers around; they're unstable!
+ - what about flushing the old dirfrag storage off disk...?
+
- journal epoch, or something similar
- reduce size of EMetaBlob by skipping context when inode was already journaled since the last
int c = rand() % s;
char src[80];
sprintf(src, "syn.0.0/dir.%d/dir.%d/file.%d", a, b, c);
- int fd = client->open(src, O_RDONLY);
+ //int fd =
+ client->open(src, O_RDONLY);
}
return;
}
};
-inline ostream& operator<<(ostream& out, frag_t& hb)
+inline ostream& operator<<(ostream& out, frag_t hb)
{
return out << hex << hb.value() << dec << "/" << hb.bits();
}
public:
// -- state --
static const int STATE_NEW = 1;
+ static const int STATE_FRAGMENTING = 2;
// -- pins --
static const int PIN_INODEPIN = 1; // linked inode is pinned
*/
+#include "include/types.h"
#include "CDir.h"
#include "CDentry.h"
string path;
dir.get_inode()->make_path(path);
out << "[dir " << dir.ino();
- if (!dir.frag.is_root()) out << "%" << dir.frag;
+ if (!dir.frag.is_root()) out << "_" << dir.frag;
out << " " << path << "/";
if (dir.is_auth()) {
out << " auth";
#include "config.h"
#undef dout
-#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << g_clock.now() << " mds" << cache->mds->get_nodeid() << ".cache.dir(" << dirfrag() << ") "
+#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << g_clock.now() << " mds" << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") "
//#define dout(x) if (x <= g_conf.debug || x <= g_conf.debug_mds) cout << g_clock.now() << " mds" << cache->mds->get_nodeid() << ".cache." << *this << " "
ostream& CDir::print_db_line_prefix(ostream& out)
{
- return out << g_clock.now() << " mds" << cache->mds->get_nodeid() << ".cache.dir(" << get_inode()->inode.ino << ") ";
+ return out << g_clock.now() << " mds" << cache->mds->get_nodeid() << ".cache.dir(" << this->dirfrag() << ") ";
}
void CDir::try_remove_unlinked_dn(CDentry *dn)
{
assert(dn->dir == this);
+ assert(dn->is_null());
+ assert(dn->is_dirty());
- if (dn->is_new() && dn->is_dirty() &&
- dn->get_num_ref() == 1) {
+ // no pins (besides dirty)?
+ if (dn->get_num_ref() != 1)
+ return;
+
+ // was the dn new? or is the dir complete (i.e. we don't need negatives)?
+ if (dn->is_new() || is_complete()) {
dout(10) << "try_remove_unlinked_dn " << *dn << " in " << *this << endl;
dn->mark_clean();
remove_dentry(dn);
void CDir::purge_stolen(list<Context*>& waiters)
{
+ // take waiters _before_ unfreeze...
+ take_waiting(WAIT_ANY, waiters);
+
+ assert(is_frozen_dir());
+ unfreeze_dir();
+
nnull = nitems = 0;
if (is_dirty()) mark_clean();
-
if (state_test(STATE_EXPORT)) put(PIN_EXPORT);
if (state_test(STATE_IMPORTBOUND)) put(PIN_IMPORTBOUND);
if (state_test(STATE_EXPORTBOUND)) put(PIN_EXPORTBOUND);
- if (state_test(STATE_FROZENDIR)) put(PIN_FROZEN);
if (auth_pins > 0) put(PIN_AUTHPIN);
- take_waiting(WAIT_ANY, waiters);
-
assert(get_num_ref() == 0);
}
void CDir::init_fragment_pins()
{
if (state_test(STATE_DIRTY)) get(PIN_DIRTY);
- if (state_test(STATE_FROZENDIR)) get(PIN_FROZEN);
if (state_test(STATE_EXPORT)) get(PIN_EXPORT);
if (state_test(STATE_EXPORTBOUND)) get(PIN_EXPORTBOUND);
if (state_test(STATE_IMPORTBOUND)) get(PIN_IMPORTBOUND);
f->version = version;
f->projected_version = projected_version;
f->replica_map = replica_map;
+ f->freeze_dir(0);
dout(10) << " subfrag " << *p << " " << *f << endl;
subfrags[n++] = f;
subs.push_back(f);
f->steal_dentry(dn);
}
- put(PIN_FRAGMENTING);
purge_stolen(waiters);
inode->close_dirfrag(frag); // selft deletion, watch out.
}
// merge state
state_set(dir->get_state() & MASK_STATE_FRAGMENT_KEPT);
- dir->put(PIN_FRAGMENTING);
dir->purge_stolen(waiters);
inode->close_dirfrag(dir->get_frag());
}
}
};
-void CDir::fetch(Context *c)
+void CDir::fetch(Context *c, bool ignore_authpinnability)
{
dout(10) << "fetch on " << *this << endl;
assert(is_auth());
assert(!is_complete());
+ if (!can_auth_pin() && !ignore_authpinnability) {
+ dout(7) << "fetch waiting for authpinnable" << endl;
+ add_waiter(WAIT_AUTHPINNABLE, c);
+ return;
+ }
+
if (c) add_waiter(WAIT_COMPLETE, c);
// already fetching?
return;
}
+ auth_pin();
state_set(CDir::STATE_FETCHING);
if (cache->mds->logger) cache->mds->logger->inc("fdir");
void CDir::_fetched(bufferlist &bl)
{
- dout(10) << "_fetched " << 0 << "~" << bl.length()
- << " on " << *this
+ dout(10) << "_fetched " << bl.length()
+ << " bytes for " << *this
<< endl;
- // give up?
- if (!is_auth() || is_frozen()) {
- dout(10) << "_fetched canceling (!auth or frozen)" << endl;
- //ondisk_bl.clear();
- //ondisk_size = 0;
-
- // kick waiters?
- state_clear(CDir::STATE_FETCHING);
- finish_waiting(WAIT_COMPLETE, -1);
- return;
- }
+ assert(is_auth());
+ assert(!is_frozen());
// decode.
int len = bl.length();
int off = 0;
- version_t got_version;
+ version_t got_version;
- bl.copy(off, sizeof(got_version), (char*)&got_version);
- off += sizeof(got_version);
+ ::_decode(got_version, bl, off);
dout(10) << "_fetched version " << got_version
<< ", " << len << " bytes"
/**
* commit
*
- * @param want min version i want committed
- * @param c callback for completion
+ * @param want - min version i want committed
+ * @param c - callback for completion
*/
void CDir::commit(version_t want, Context *c)
{
// preconditions
assert(want <= version || version == 0); // can't commit the future
- assert(committed_version < want); // the caller is stupid
+ assert(want > committed_version); // the caller is stupid
assert(is_auth());
assert(can_auth_pin());
if (cache->mds->logger) cache->mds->logger->inc("cdir");
- // encode dentries
+ // encode
bufferlist bl;
- bl.append((char*)&version, sizeof(version));
+
+ ::_encode(version, bl);
for (CDir_map_t::iterator it = items.begin();
it != items.end();
static const int PIN_DNWAITER = 1;
static const int PIN_CHILD = 2;
static const int PIN_FROZEN = 3;
- static const int PIN_FRAGMENTING = 4;
static const int PIN_EXPORT = 5;
static const int PIN_IMPORTING = 7;
static const int PIN_EXPORTING = 8;
case PIN_DNWAITER: return "dnwaiter";
case PIN_CHILD: return "child";
case PIN_FROZEN: return "frozen";
- case PIN_FRAGMENTING: return "fragmenting";
case PIN_EXPORT: return "export";
case PIN_EXPORTING: return "exporting";
case PIN_IMPORTING: return "importing";
static const unsigned MASK_STATE_FRAGMENT_KEPT =
(STATE_DIRTY |
STATE_COMPLETE |
- STATE_FROZENDIR |
STATE_EXPORT |
STATE_EXPORTBOUND |
STATE_IMPORTBOUND |
// -- fetch --
object_t get_ondisk_object() { return object_t(ino(), frag); }
- void fetch(Context *c);
+ void fetch(Context *c, bool ignore_authpinnability=false);
void _fetched(bufferlist &bl);
// -- commit --
if (auth_pins > 0)
return false;
- // if not subtree root, inode must not be frozen.
- if (!is_subtree_root() && inode->is_frozen())
+ // if not subtree root, inode must not be frozen (tree--frozen_dir is okay).
+ if (!is_subtree_root() && inode->is_frozen() && !inode->is_frozen_dir())
return false;
return true;
private:
int _type;
off_t _start_off,_end_off;
+
friend class MDLog;
public:
- LogEvent(int t) : _type(t), _start_off(0), _end_off(0) { }
+ LogEvent(int t) :
+ _type(t), _start_off(0), _end_off(0) { }
virtual ~LogEvent() { }
int get_type() { return _type; }
out << "event(" << _type << ")";
}
-
/*** live journal ***/
/* obsolete() - is this entry committed to primary store, such that
for (map<dirfrag_t, list<dirfrag_t> >::iterator pi = m->subtrees.begin();
pi != m->subtrees.end();
++pi) {
- CDir *im = get_dirfrag(pi->first);
- if (im) {
- adjust_bounded_subtree_auth(im, pi->second, from);
- try_subtree_merge(im);
- }
+ CInode *diri = get_inode(pi->first.ino);
+ if (!diri) continue;
+ diri->dirfragtree.force_to_leaf(pi->first.frag);
+ CDir *dir = diri->get_dirfrag(pi->first.frag);
+ if (!dir) continue;
+ adjust_bounded_subtree_auth(dir, pi->second, from);
+ try_subtree_merge(dir);
}
// am i a surviving ambiguous importer?
}
}
-
+class C_MDC_FragmentGo : public Context {
+ MDCache *mdcache;
+ CInode *diri;
+ list<CDir*> dirs;
+ frag_t basefrag;
+ int bits;
+public:
+ C_MDC_FragmentGo(MDCache *m, CInode *di, list<CDir*>& dls, frag_t bf, int b) :
+ mdcache(m), diri(di), dirs(dls), basefrag(bf), bits(b) { }
+ virtual void finish(int r) {
+ mdcache->fragment_go(diri, dirs, basefrag, bits);
+ }
+};
void MDCache::split_dir(CDir *dir, int bits)
{
return;
}
- dir->auth_pin();
- dir->state_set(CDir::STATE_FRAGMENTING);
- dir->get(CDir::PIN_FRAGMENTING);
-
- // make complete
list<CDir*> startfrags;
startfrags.push_back(dir);
-
+
+ dir->state_set(CDir::STATE_FRAGMENTING);
+
+ fragment_freeze(dir->get_inode(), startfrags, dir->get_frag(), bits);
fragment_mark_and_complete(dir->get_inode(), startfrags, dir->get_frag(), bits);
}
+/*
+ * initial the freeze, blocking with an auth_pin.
+ */
+void MDCache::fragment_freeze(CInode *diri, list<CDir*>& frags, frag_t basefrag, int bits)
+{
+ C_Gather *gather = new C_Gather(new C_MDC_FragmentGo(this, diri, frags, basefrag, bits));
+
+ for (list<CDir*>::iterator p = frags.begin();
+ p != frags.end();
+ ++p) {
+ CDir *dir = *p;
+ dir->auth_pin(); // this will block the freeze
+ dir->freeze_dir(gather->new_sub());
+ }
+}
+
class C_MDC_FragmentMarking : public Context {
MDCache *mdcache;
CInode *diri;
dout(10) << "fragment_mark_and_complete " << basefrag << " by " << bits
<< " on " << *diri << endl;
- int waiting = 0;
+ C_Gather *gather = 0;
+
for (list<CDir*>::iterator p = startfrags.begin();
p != startfrags.end();
++p) {
CDir *dir = *p;
- if (dir->state_test(CDir::STATE_DNPINNEDFRAG)) {
- dout(15) << " marked " << *dir << endl;
- } else if (dir->is_complete()) {
+
+ if (!dir->is_complete()) {
+ dout(15) << " fetching incomplete " << *dir << endl;
+ if (!gather) gather = new C_Gather(new C_MDC_FragmentMarking(this, diri, startfrags, basefrag, bits));
+ dir->fetch(gather->new_sub(),
+ true); // ignore authpinnability
+ }
+ else if (!dir->state_test(CDir::STATE_DNPINNEDFRAG)) {
dout(15) << " marking " << *dir << endl;
for (map<string,CDentry*>::iterator p = dir->items.begin();
p != dir->items.end();
- ++p)
+ ++p) {
p->second->get(CDentry::PIN_FRAGMENTING);
+ p->second->state_set(CDentry::STATE_FRAGMENTING);
+ }
dir->state_set(CDir::STATE_DNPINNEDFRAG);
- } else {
- dout(15) << " fetching incomplete " << *dir << endl;
- dir->fetch(new C_MDC_FragmentMarking(this, diri, startfrags, basefrag, bits));
- waiting++;
+ dir->auth_unpin(); // allow our freeze to complete
+ }
+ else {
+ dout(15) << " marked " << *dir << endl;
}
- }
-
- if (!waiting) {
- fragment_go(diri, startfrags, basefrag, bits);
}
}
frag_t basefrag;
int bits;
list<CDir*> resultfrags;
- version_t maxpv;
vector<version_t> pvs;
public:
C_MDC_FragmentLogged(MDCache *m, CInode *di, frag_t bf, int b,
- list<CDir*>& rf, version_t mpv, vector<version_t>& p) :
- mdcache(m), diri(di), basefrag(bf), bits(b), maxpv(mpv) {
+ list<CDir*>& rf, vector<version_t>& p) :
+ mdcache(m), diri(di), basefrag(bf), bits(b) {
resultfrags.swap(rf);
pvs.swap(p);
}
virtual void finish(int r) {
mdcache->fragment_logged(diri, basefrag, bits,
- resultfrags, maxpv, pvs);
+ resultfrags, pvs);
}
};
dir->state_set(CDir::STATE_FRAGMENTING);
- // add new dirfrag
+ // new dirfrag
+ pvs.push_back(dir->pre_dirty());
le->metablob.add_dir(dir, true);
- // add all the dentries, partitioned.
- pvs.push_back(dir->pre_dirty());
+ // all the dentries
for (map<string,CDentry*>::iterator p = dir->items.begin();
p != dir->items.end();
++p) {
- pvs.push_back(p->second->pre_dirty());
- le->metablob.add_dentry(p->second, true);
+ if (p->second->state_test(CDentry::STATE_FRAGMENTING)) {
+ pvs.push_back(p->second->pre_dirty());
+ le->metablob.add_dentry(p->second, true);
+ }
}
}
- version_t maxpv = 0;
- if (!pvs.empty()) maxpv = pvs.back();
// journal
mds->mdlog->submit_entry(le,
new C_MDC_FragmentLogged(this, diri, basefrag, bits,
- resultfrags, maxpv, pvs));
+ resultfrags, pvs));
// announcelist<CDir*>& resultfrags,
for (set<int>::iterator p = peers.begin();
void MDCache::fragment_logged(CInode *diri, frag_t basefrag, int bits,
list<CDir*>& resultfrags,
- version_t maxpv, vector<version_t>& pvs)
+ vector<version_t>& pvs)
{
dout(10) << "fragment_logged " << basefrag << " bits " << bits
<< " on " << *diri << endl;
p != dir->items.end();
++p) {
CDentry *dn = p->second;
- if (dn->version >= maxpv) continue; // skip it; created after the frag event
- dn->put(CDentry::PIN_FRAGMENTING);
- dn->mark_dirty(*pv);
- pv++;
+ if (dn->state_test(CDentry::STATE_FRAGMENTING)) {
+ dn->put(CDentry::PIN_FRAGMENTING);
+ dn->mark_dirty(*pv);
+ pv++;
+ }
}
+
+ dir->unfreeze_dir();
}
}
void split_dir(CDir *dir, int byn);
private:
+ void fragment_freeze(CInode *diri, list<CDir*>& startfrags,
+ frag_t basefrag, int bits);
void fragment_mark_and_complete(CInode *diri, list<CDir*>& startfrags,
frag_t basefrag, int bits);
void fragment_go(CInode *diri, list<CDir*>& startfrags,
frag_t basefrag, int bits);
void fragment_logged(CInode *diri, frag_t basefrag, int bits,
- list<CDir*>& resultfrags, version_t maxpv, vector<version_t>& pvs);
+ list<CDir*>& resultfrags, vector<version_t>& pvs);
+ friend class C_MDC_FragmentGo;
friend class C_MDC_FragmentMarking;
friend class C_MDC_FragmentLogged;
dirfrag_t(inodeno_t i, frag_t f) : ino(i), frag(f) { }
};
-inline ostream& operator<<(ostream& out, const dirfrag_t& df) {
- return out << df.ino << "#" << df.frag;
+inline ostream& operator<<(ostream& out, const dirfrag_t df) {
+ return out << df.ino << "_" << df.frag;
}
inline bool operator<(dirfrag_t l, dirfrag_t r) {
if (l.ino < r.ino) return true;