- fix mds initial osdmap weirdness (which will currently screw up on standby -> almost anything)
- incremental mdsmaps
- client failure
+- dirfrag split
+ - make sure we are freezing _before_ we fetch to complete the dirfrag, else
+ we break commit()'s preconditions when it fetches an incomplete dir.
- EMetablob should return 'expired' if they have higher versions (and are thus described by a newer journal entry)
- dir version/committed/etc versus migration, log expires.
- will need to ditch 10s client metadata caching before this is useful
- implement truncate
- statfs?
-- btree directories (for efficient large directories)
+? btree directories (for efficient large directories)
- consistency points/snapshots
- fix MExportAck and others to use dir+dentry, not inode
}
}
+class C_NoopContext : public Context {
+public:
+ void finish(int r) { }
+};
+
+
/*
* C_Contexts - set of Contexts
*/
if (dn.get_num_ref()) {
out << " |";
- for(set<int>::iterator it = dn.get_ref_set().begin();
- it != dn.get_ref_set().end();
- it++)
- out << " " << CDentry::pin_name(*it);
+ dn.print_pin_set(out);
}
out << " " << &dn;
static const int PIN_DIRTY = 2; //
static const int PIN_PROXY = 3; //
static const int PIN_XLOCK = 4;
- static const char *pin_name(int p) {
+ const char *pin_name(int p) {
switch (p) {
case PIN_INODEPIN: return "inodepin";
case PIN_REPLICATED: return "replicated";
if (dir.get_num_ref()) {
out << " |";
- for(set<int>::iterator it = dir.get_ref_set().begin();
- it != dir.get_ref_set().end();
- it++)
- out << " " << CDir::pin_name(*it);
+ dir.print_pin_set(out);
}
out << " " << &dir;
committing_version = 0;
committed_version = 0;
- ref = 0;
-
// dir_auth
dir_auth = CDIR_AUTH_DEFAULT;
assert(in->is_dir());
if (auth)
state |= STATE_AUTH;
- /*
- if (in->dir_is_hashed()) {
- assert(0); // when does this happen?
- state |= STATE_HASHED;
- }
- */
auth_pins = 0;
nested_auth_pins = 0;
// -----------------------
// COMMIT
+/**
+ * commit
+ *
+ * @param want min version i want committed
+ * @param c callback for completion
+ */
+void CDir::commit(version_t want, Context *c)
+{
+ dout(10) << "commit want " << want << " on " << *this << endl;
+ if (want == 0) want = version;
+
+ // preconditions
+ assert(want <= version); // can't commit the future
+ assert(committed_version < want); // the caller is stupid
+ assert(is_auth());
+ assert(can_auth_pin());
+
+ // note: queue up a noop if necessary, so that we always
+ // get an auth_pin.
+ if (!c)
+ c = new C_NoopContext;
+
+ // auth_pin on first waiter
+ if (waiting_for_commit.empty())
+ auth_pin();
+ waiting_for_commit[want].push_back(c);
+
+ // ok.
+ _commit(want);
+}
+
+
class C_Dir_RetryCommit : public Context {
CDir *dir;
version_t want;
public:
- C_Dir_RetryCommit(CDir *d, version_t v) : dir(d), want(v) { }
+ C_Dir_RetryCommit(CDir *d, version_t v) :
+ dir(d), want(v) { }
void finish(int r) {
- dir->commit(want, 0);
+ dir->_commit(want);
}
};
}
};
-/**
- * commit
- *
- * @param want min version i want committed
- * @param c callback for completion
- */
-void CDir::commit(version_t want, Context *c)
+void CDir::_commit(version_t want)
{
- dout(10) << "commit want " << want << " on " << *this << endl;
- if (want == 0) want = version;
-
- if (c) {
- assert(committed_version < want);
- waiting_for_commit[want].push_back(c);
- }
+ dout(10) << "_commit want " << want << " on " << *this << endl;
+
+ // we can't commit things in the future.
+ // (even the projected future.)
+ assert(want <= version);
+
+ // check pre+postconditions.
+ assert(is_auth());
- // not auth?
- if (!is_auth()) {
- dout(10) << "not auth. must have exported. kicking all waiters." << endl;
- for (map<version_t, list<Context*> >::iterator p = waiting_for_commit.begin();
- p != waiting_for_commit.end();
- ++p)
- cache->mds->queue_finished(p->second);
- waiting_for_commit.clear();
- return;
- }
-
// already committed?
if (committed_version >= want) {
dout(10) << "already committed " << committed_version << " >= " << want << endl;
return;
}
+ // already committing >= want?
if (committing_version >= want) {
dout(10) << "already committing " << committing_version << " >= " << want << endl;
- return;
- }
-
- // authpinnable?
- if (!can_auth_pin()) {
- dout(7) << "can't auth_pin, waiting" << endl;
- add_waiter(WAIT_AUTHPINNABLE,
- new C_Dir_RetryCommit(this, want));
+ assert(state_test(STATE_COMMITTING));
return;
}
return;
}
- // pin.
- auth_pin();
-
// commit.
- state_set(CDir::STATE_COMMITTING);
committing_version = version;
+
+ // mark committing (if not already)
+ if (!state_test(STATE_COMMITTING)) {
+ dout(10) << "marking committing" << endl;
+ state_set(STATE_COMMITTING);
+ }
if (cache->mds->logger) cache->mds->logger->inc("cdir");
assert(v <= committing_version);
committed_version = v;
+ // _all_ commits done?
+ if (committing_version == committed_version)
+ state_clear(CDir::STATE_COMMITTING);
+
// dir clean?
if (committed_version == version)
mark_clean();
- if (committing_version == committed_version)
- state_clear(CDir::STATE_COMMITTING);
-
// dentries clean?
for (CDir_map_t::iterator it = items.begin();
it != items.end(); ) {
}
}
- // unpin
- auth_unpin();
-
// finishers?
+ bool were_waiters = !waiting_for_commit.empty();
+
map<version_t, list<Context*> >::iterator p = waiting_for_commit.begin();
while (p != waiting_for_commit.end()) {
map<version_t, list<Context*> >::iterator n = p;
n++;
- if (p->first > committed_version) break; // haven't commit this far yet.
+ if (p->first > committed_version) break; // haven't committed this far yet.
cache->mds->queue_finished(p->second);
waiting_for_commit.erase(p);
p = n;
}
+
+ // unpin if we kicked the last waiter.
+ if (were_waiters &&
+ waiting_for_commit.empty())
+ auth_unpin();
}
static const int PIN_DIRTY = 15;
static const int PIN_REQUEST = 16;
static const int PIN_LOGGINGEXPORTFINISH = 17;
- static const char *pin_name(int p) {
+ const char *pin_name(int p) {
switch (p) {
case PIN_CHILD: return "child";
case PIN_OPENED: return "opened";
// context
MDCache *cache;
- // my inode
- CInode *inode;
-
- // my frag
- frag_t frag;
+ CInode *inode; // my inode
+ frag_t frag; // my frag
protected:
// contents
// state
version_t version;
version_t committing_version;
- version_t committed_version; // slight lie; we bump this on import.
+ version_t committed_version;
version_t projected_version;
// lock nesting, freeze
int nested_auth_pins;
int request_pins;
- // hashed dirs
- set<int> hashed_subset; // HASHING: subset of mds's that are hashed
- public:
- // for class MDS
- map<int, pair< list<class InodeStat*>, list<string> > > hashed_readdir;
- protected:
-
-
// waiters
multimap<int, Context*> waiting; // tag -> context
// -- accessors --
- inodeno_t ino() { return inode->ino(); }
+ inodeno_t ino() { return inode->ino(); } // deprecate me?
dirfrag_t dirfrag() { return dirfrag_t(inode->ino(), frag); }
CInode *get_inode() { return inode; }
map<version_t, list<Context*> > waiting_for_commit;
void commit_to(version_t want);
- void commit(version_t want,Context *c);
+ void commit(version_t want, Context *c);
+ void _commit(version_t want);
void _committed(version_t v);
void wait_for_commit(Context *c, version_t v=0);
// -- auth pins --
- bool can_auth_pin() { return !(is_frozen() || is_freezing()); }
+ bool can_auth_pin() { return is_auth() && !(is_frozen() || is_freezing()); }
int is_auth_pinned() { return auth_pins; }
int get_cum_auth_pins() { return auth_pins + nested_auth_pins; }
int get_auth_pins() { return auth_pins; }
long nitems; // actual real entries
long nden; // num dentries (including null ones)
version_t version;
+ version_t committed_version;
unsigned state;
meta_load_t popularity_justme;
meta_load_t popularity_curdom;
st.nitems = dir->nitems;
st.nden = dir->items.size();
st.version = dir->version;
+ st.committed_version = dir->committed_version;
st.state = dir->state;
st.dir_rep = dir->dir_rep;
//dir->nitems = st.nitems;
// set committed_version at old version
- dir->committing_version = dir->committed_version = st.version;
- dir->projected_version = dir->version = st.version; // this is bumped, below, if dirty
+ dir->committing_version = dir->committed_version = st.committed_version;
+ dir->projected_version = dir->version = st.version;
// twiddle state
if (dir->state & CDir::STATE_HASHED)
dir->get(CDir::PIN_OPENED);
if (dir->is_dirty()) {
dir->get(CDir::PIN_DIRTY);
-
- // bump dir version + 1 if dirty
- dir->projected_version = dir->version = st.version + 1;
}
}
if (in.get_num_ref()) {
out << " |";
- for(set<int>::iterator it = in.get_ref_set().begin();
- it != in.get_ref_set().end();
- it++)
- out << " " << CInode::pin_name(*it);
+ in.print_pin_set(out);
}
// hack: spit out crap on which clients have caps
// ====== CInode =======
-CInode::CInode(MDCache *c, bool auth) {
+CInode::CInode(MDCache *c, bool auth)
+{
mdcache = c;
- ref = 0;
-
//num_parents = 0;
parent = NULL;
auth_pins = 0;
nested_auth_pins = 0;
- num_request_pins = 0;
+ //num_request_pins = 0;
state = 0;
{
assert(is_dir());
if (1) { // old
- if (!dir)
+ if (!dir) {
+ assert(is_auth());
dir = new CDir(this, fg, mdcache, true);
+ }
return dir;
} else { // new
// have it?
static const int PIN_WAITER = 6; // waiter
static const int PIN_CAPS = 7; // local fh's
static const int PIN_AUTHPIN = 8;
- static const int PIN_IMPORTING = 9; // multipurpose, for importing
- static const int PIN_REQUEST = 10; // request is logging, finishing
+ static const int PIN_IMPORTING = -9; // importing
+ static const int PIN_REQUEST = -10; // request is logging, finishing
static const int PIN_RENAMESRC = 11; // pinned on dest for foreign rename
static const int PIN_ANCHORING = 12;
static const int PIN_OPENINGDIR = 13;
static const int PIN_REMOTEPARENT = 14;
static const int PIN_DENTRYLOCK = 15;
- static const char *pin_name(int p) {
+ const char *pin_name(int p) {
switch (p) {
case PIN_CACHED: return "cached";
case PIN_DIR: return "dir";
// distributed caching (old)
pair<int,int> dangling_auth; // explicit auth, when dangling.
- int num_request_pins;
+ //int num_request_pins;
// waiters
multimap<int, Context*> waiting;
linked to an active_request, so they're automatically cleaned
up when a request is finished. pin at will! */
void request_pin_get() {
- if (num_request_pins == 0) get(PIN_REQUEST);
- num_request_pins++;
+ //if (num_request_pins == 0)
+ get(PIN_REQUEST);
+ //num_request_pins++;
}
void request_pin_put() {
- num_request_pins--;
- if (num_request_pins == 0) put(PIN_REQUEST);
- assert(num_request_pins >= 0);
+ //num_request_pins--;
+ //if (num_request_pins == 0)
+ put(PIN_REQUEST);
+ //assert(num_request_pins >= 0);
}
void bad_put(int by) {
void MDCache::adjust_bounded_subtree_auth(CDir *dir, set<CDir*>& bounds, pair<int,int> auth)
{
dout(7) << "adjust_bounded_subtree_auth " << dir->get_dir_auth() << " -> " << auth
- << " on " << *dir << endl;
+ << " on " << *dir
+ << " bounds " << bounds
+ << endl;
show_subtrees();
m->get_dir(i).update_dir(ndir);
// is this a dir_auth delegation boundary?
- if (m->get_source().num() != cur->authority().first)
+ if (m->get_source().num() != cur->authority().first ||
+ cur->ino() == 1)
adjust_subtree_auth(ndir, m->get_source().num());
dout(7) << "added " << *ndir << " nonce " << ndir->replica_nonce << endl;
void MDLog::_trimmed(LogEvent *le)
{
+ // successful trim?
+ if (!le->has_expired(mds)) {
+ dout(7) << "retrimming : " << le->get_start_off() << " : " << *le << endl;
+ le->expire(mds, new C_MDL_Trimmed(this, le));
+ return;
+ }
+
dout(7) << "trimmed : " << le->get_start_off() << " : " << *le << endl;
- assert(le->has_expired(mds));
if (trimming.begin()->first == le->_end_off) {
// we trimmed off the front!
list<CDir*> ls;
p->second->get_dirfrags(ls);
+ if (ls.empty()) continue; // must be an open dir.
CDir *dir = ls.front();
- if (!dir) continue; // must be a dir.
if (!dir->get_parent_dir()) continue; // must be linked.
if (!dir->is_auth()) continue; // must be auth.
}
// re-import the metadata
- list<dirfrag_t> imported_subdirs;
int num_imported_inodes = 0;
for (list<bufferlist>::iterator p = export_data[dir].begin();
decode_import_dir(*p,
export_peer[dir],
dir, // import root
- imported_subdirs,
0);
}
// open export dirs/bounds?
assert(import_bound_inos.count(dir->dirfrag()) == 0);
import_bound_inos[dir->dirfrag()].clear();
- for (list<dirfrag_t>::iterator it = m->get_exports().begin();
- it != m->get_exports().end();
+ for (list<dirfrag_t>::iterator it = m->get_bounds().begin();
+ it != m->get_bounds().end();
it++) {
dout(7) << " checking dir " << hex << *it << dec << endl;
CInode *in = cache->get_inode(it->ino);
}
- // verify we have all exports
+ // verify we have all bounds
int waiting_for = 0;
- for (list<dirfrag_t>::iterator it = m->get_exports().begin();
- it != m->get_exports().end();
+ for (list<dirfrag_t>::iterator it = m->get_bounds().begin();
+ it != m->get_bounds().end();
it++) {
dirfrag_t df = *it;
- CDir *dir = cache->get_dirfrag(df);
- if (dir) {
- if (!dir->state_test(CDir::STATE_IMPORTBOUND)) {
- dout(7) << " pinning import bound " << *dir << endl;
- dir->get(CDir::PIN_IMPORTBOUND);
- dir->state_set(CDir::STATE_IMPORTBOUND);
- import_bounds[dir].insert(dir);
+ CDir *bound = cache->get_dirfrag(df);
+ if (bound) {
+ if (!bound->state_test(CDir::STATE_IMPORTBOUND)) {
+ dout(7) << " pinning import bound " << *bound << endl;
+ bound->get(CDir::PIN_IMPORTBOUND);
+ bound->state_set(CDir::STATE_IMPORTBOUND);
+ import_bounds[dir].insert(bound);
} else {
dout(7) << " already pinned import bound " << *dir << endl;
}
Migrator *migrator;
CDir *dir;
int from;
- list<dirfrag_t> imported_subdirs;
- list<dirfrag_t> exports;
public:
- C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, int f,
- list<dirfrag_t>& is, list<dirfrag_t>& e) :
+ C_MDS_ImportDirLoggedStart(Migrator *m, CDir *d, int f) :
migrator(m), dir(d), from(f) {
- imported_subdirs.swap(is);
- exports.swap(e);
}
void finish(int r) {
- migrator->import_logged_start(dir, from, imported_subdirs, exports);
+ migrator->import_logged_start(dir, from);
}
};
cache->show_subtrees();
// start the journal entry
- EImportStart *le = new EImportStart(dir->dirfrag(), m->get_exports());
+ EImportStart *le = new EImportStart(dir->dirfrag(), m->get_bounds());
le->metablob.add_dir_context(dir);
// adjust auth (list us _first_)
cache->verify_subtree_bounds(dir, import_bounds[dir]);
// add this crap to my cache
- list<dirfrag_t> imported_subdirs;
int num_imported_inodes = 0;
for (list<bufferlist>::iterator p = m->get_dirstate().begin();
decode_import_dir(*p,
oldauth,
dir, // import root
- imported_subdirs,
le);
}
- dout(10) << " " << imported_subdirs.size() << " imported subdirs" << endl;
- dout(10) << " " << m->get_exports().size() << " imported nested exports" << endl;
+ dout(10) << " " << m->get_bounds().size() << " imported bounds" << endl;
// include bounds in EImportStart
for (set<CDir*>::iterator it = import_bounds[dir].begin();
// log it
mds->mdlog->submit_entry(le,
- new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num(),
- imported_subdirs, m->get_exports()));
+ new C_MDS_ImportDirLoggedStart(this, dir, m->get_source().num()));
// note state
import_state[dir->dirfrag()] = IMPORT_LOGGINGSTART;
}
-void Migrator::import_logged_start(CDir *dir, int from,
- list<dirfrag_t> &imported_subdirs,
- list<dirfrag_t> &exports)
+void Migrator::import_logged_start(CDir *dir, int from)
{
dout(7) << "import_logged " << *dir << endl;
int Migrator::decode_import_dir(bufferlist& bl,
int oldauth,
CDir *import_root,
- list<dirfrag_t>& imported_subdirs,
EImportStart *le)
{
int off = 0;
dout(7) << "decode_import_dir " << *dir << endl;
- // add to list
- if (dir != import_root)
- imported_subdirs.push_back(dir->dirfrag());
-
// assimilate state
dstate.update_dir( dir );
int decode_import_dir(bufferlist& bl,
int oldauth,
CDir *import_root,
- list<dirfrag_t>& imported_subdirs,
EImportStart *le);
/*
void got_hashed_replica(CDir *import,
void import_reverse_unfreeze(CDir *dir);
void import_reverse_unpin(CDir *dir);
void import_notify_abort(CDir *dir);
- void import_logged_start(CDir *dir, int from,
- list<dirfrag_t> &imported_subdirs,
- list<dirfrag_t> &exports);
+ void import_logged_start(CDir *dir, int from);
void handle_export_finish(MExportDirFinish *m);
public:
void import_finish(CDir *dir, bool now=false);
CDir* Server::try_open_dir(CInode *in, frag_t fg, MClientRequest *req)
{
- if (!in->get_dirfrag(fg) && in->is_frozen_dir()) {
- // doh!
- dout(10) << " dir inode is frozen, can't open dir, waiting " << *in << endl;
+ CDir *dir = in->get_dirfrag(fg);
+ if (dir)
+ return dir;
+
+ if (in->is_frozen_dir()) {
+ dout(10) << "try_open_dir: dir inode is frozen, waiting " << *in << endl;
assert(in->get_parent_dir());
in->get_parent_dir()->add_waiter(CDir::WAIT_UNFREEZE,
new C_MDS_RetryRequest(mds, req, in));
return 0;
}
-
+
return in->get_or_open_dirfrag(mds->mdcache, fg);
}
+CDir* Server::try_open_auth_dir(CInode *diri, frag_t fg, MClientRequest *req)
+{
+ CDir *dir = diri->get_dirfrag(fg);
+ // not open and inode not mine?
+ if (!dir && !diri->is_auth()) {
+ int inauth = diri->authority().first;
+ dout(7) << "try_open_auth_dir: not open, not inode auth, fw to mds" << inauth << endl;
+ mdcache->request_forward(req, inauth);
+ return 0;
+ }
+
+ // not open and inode frozen?
+ if (!dir && diri->is_frozen_dir()) {
+ dout(10) << "try_open_dir: dir inode is frozen, waiting " << *diri << endl;
+ assert(diri->get_parent_dir());
+ diri->get_parent_dir()->add_waiter(CDir::WAIT_UNFREEZE,
+ new C_MDS_RetryRequest(mds, req, diri));
+ return 0;
+ }
+
+ // invent?
+ if (!dir) {
+ assert(diri->is_auth());
+ dir = diri->get_or_open_dirfrag(mds->mdcache, fg);
+ }
+ assert(dir);
+
+ // am i auth for the dirfrag?
+ if (!dir->is_auth()) {
+ int auth = dir->authority().first;
+ dout(7) << "try_open_auth_dir: not auth for " << *dir
+ << ", fw to mds" << auth << endl;
+ mdcache->request_forward(req, auth);
+ return 0;
+ }
+
+ return dir;
+}
return;
}
- // get the dir?
- if (!diri->get_dirfrag(fg) && !diri->is_auth()) {
- dout(10) << "not auth for " << fg << " or the inode " << *diri << ", fwd" << endl;
- mdcache->request_forward(req, diri->authority().first);
- return;
- }
-
- CDir *dir = try_open_dir(diri, fg, req);
+ CDir *dir = try_open_auth_dir(diri, fg, req);
if (!dir) return;
- assert(dir);
-
- if (!dir->is_auth()) {
- dout(10) << "not auth for " << *dir << ", fwd" << endl;
- mdcache->request_forward(req, dir->authority().first);
- return;
- }
// ok!
assert(dir->is_auth());
// which dirfrag?
frag_t fg = diri->pick_dirfrag(name);
- CDir *dir = diri->get_dirfrag(fg);
-
- // not open?
- if (!dir && !diri->is_auth()) {
- int dirauth = diri->authority().first;
- dout(7) << "validate_new_dentry_dir: don't know dir auth, not open, auth is i think mds" << dirauth << endl;
- mdcache->request_forward(req, dirauth);
- return false;
- }
-
- // not me?
- if (dir && !dir->is_auth()) {
- int auth = dir->authority().first;
- dout(7) << "validate_new_dentry_dir on " << req->get_path() << ", dentry " << *dir
- << " dn " << name
- << " not mine, fw to mds" << auth << endl;
- mdcache->request_forward(req, auth);
- return false;
- }
- // ok, let's open it then.
- assert(diri->is_auth());
- dir = try_open_dir(diri, fg, req);
+ CDir *dir = try_open_auth_dir(diri, fg, req);
if (!dir)
- return false;
-
+ return 0;
+
// dir auth pinnable?
if (!dir->can_auth_pin()) {
dout(7) << "validate_new_dentry_dir: dir " << *dir << " not pinnable, waiting" << endl;
return;
}
- // am i not open, not auth?
- frag_t fg = diri->pick_dirfrag(name);
- if (!diri->get_dirfrag(fg) && !diri->is_auth()) {
- int dirauth = diri->authority().first;
- dout(7) << "don't know dir auth, not open, auth is i think " << dirauth << endl;
- mdcache->request_forward(req, dirauth);
- return;
- }
-
- CDir *dir = try_open_dir(diri, fg, req);
+ // get the dir, if it's not frozen etc.
+ CDir *dir = validate_new_dentry_dir(req, diri, name);
if (!dir) return;
+ // ok, it's auth, and authpinnable.
- // does it exist?
+ // does the dentry exist?
CDentry *dn = dir->lookup(name);
if (!dn) {
if (!dir->is_complete()) {
return;
}
- CDir *srcdir = try_open_dir(srcdiri, srcfg, req);
+ CDir *srcdir = try_open_auth_dir(srcdiri, srcfg, req);
if (!srcdir) return;
dout(7) << "handle_client_rename srcdir is " << *srcdir << endl;
LogEvent *event2 = 0);
CDir *try_open_dir(CInode *in, frag_t fg, MClientRequest *req);
+ CDir* try_open_auth_dir(CInode *diri, frag_t, MClientRequest *req);
// clients
}
void print(ostream& out) const {
- out << "[metablob " << lump_order.front()
- << ", " << lump_map.size() << " dirs]";
+ if (lump_order.empty())
+ out << "[metablob empty]";
+ else
+ out << "[metablob " << lump_order.front()
+ << ", " << lump_map.size() << " dirs]";
}
bool has_expired(MDS *mds);
*
* - been safely committed to its dirslice.
*
- * - has been safely exported. note that !is_auth() && !is_proxy()
- * implies safely exported. if !is_auth() && is_proxy(), we need to
- * add a waiter for the export to complete.
+ * - has been safely exported. i.e., authority().first != us.
+ * in particular, auth of <us, them> is not enough, we need to
+ * wait for <them,-2>.
+ *
+ * note that this check is overly conservative, in that we'll
+ * try to flush the dir again if we reimport the subtree, even though
+ * later journal entries contain the same dirty data (from the import).
*
*/
bool EMetaBlob::has_expired(MDS *mds)
// FIXME: check the slice only
- if (dir->is_proxy()) {
- dout(10) << "EMetaBlob.has_expired am proxy, needed dirv " << lp->second.dirv
- << " for " << *dir << endl;
- return false; // we need to wait until the export flushes!
- }
- if (!dir->is_auth()) {
+ if (dir->authority().first != mds->get_nodeid()) {
dout(10) << "EMetaBlob.has_expired not auth, needed dirv " << lp->second.dirv
<< " for " << *dir << endl;
continue; // not our problem
}
+ if (dir->get_committed_version() >= lp->second.dirv) {
+ dout(10) << "EMetaBlob.has_expired have dirv " << lp->second.dirv
+ << " for " << *dir << endl;
+ continue; // yay
+ }
+
+ if (dir->auth_is_ambiguous()) {
+ dout(10) << "EMetaBlob.has_expired ambiguous auth on "
+ << *dir << endl;
+ return false; // not committed.
+ }
if (dir->get_committed_version() < lp->second.dirv) {
dout(10) << "EMetaBlob.has_expired need dirv " << lp->second.dirv
<< " for " << *dir << endl;
return false; // not committed.
- } else {
- dout(10) << "EMetaBlob.has_expired have dirv " << lp->second.dirv
- << " for " << *dir << endl;
}
+
+ assert(0); // i goofed the logic
}
return true; // all dirlumps expired.
}
+
void EMetaBlob::expire(MDS *mds, Context *c)
{
map<CDir*,version_t> commit; // dir -> version needed
// FIXME: check the slice only
+ if (dir->authority().first != mds->get_nodeid()) {
+ dout(10) << "EMetaBlob.expire not auth, needed dirv " << lp->second.dirv
+ << " for " << *dir << endl;
+ continue; // not our problem
+ }
+ if (dir->get_committed_version() >= lp->second.dirv) {
+ dout(10) << "EMetaBlob.expire have dirv " << lp->second.dirv
+ << " on " << *dir << endl;
+ continue; // yay
+ }
+
if (dir->auth_is_ambiguous()) {
// wait until export is acked (logged on remote) and committed (logged locally)
CDir *ex = mds->mdcache->get_subtree_root(dir);
waitfor_export.push_back(ex);
continue;
}
- if (!dir->is_auth()) {
- dout(10) << "EMetaBlob.expire not auth, needed dirv " << lp->second.dirv
- << " for " << *dir << endl;
- continue; // not our problem
- }
if (dir->get_committed_version() < lp->second.dirv) {
dout(10) << "EMetaBlob.expire need dirv " << lp->second.dirv
<< ", committing " << *dir << endl;
commit[dir] = MAX(commit[dir], lp->second.dirv);
ncommit++;
- } else {
- dout(10) << "EMetaBlob.expire have dirv " << lp->second.dirv
- << " on " << *dir << endl;
+ continue;
}
+
+ assert(0); // hrm
}
- // commit
- assert(!commit.empty());
+ // commit or wait for export
+ // FIXME: what if export aborts? need to retry!
+ assert(!commit.empty() || !waitfor_export.empty());
+ //C_Gather *gather = new C_Gather(new C_journal_RetryExpire(mds, this, c));
C_Gather *gather = new C_Gather(c);
for (map<CDir*,version_t>::iterator p = commit.begin();
p != commit.end();
- ++p)
- p->first->commit(p->second, gather->new_sub());
+ ++p) {
+ if (p->first->can_auth_pin())
+ p->first->commit(p->second, gather->new_sub());
+ else
+ // pbly about to export|split|merge.
+ // just wait for it to unfreeze, then retry
+ p->first->add_waiter(CDir::WAIT_AUTHPINNABLE, gather->new_sub());
+ }
for (list<CDir*>::iterator p = waitfor_export.begin();
p != waitfor_export.end();
++p)
unsigned state; // state bits
int ref; // reference count
- set<int> ref_set;
+ multiset<int> ref_set;
map<int,int> replicas; // [auth] mds -> nonce
int replica_nonce; // [replica] defined on replica
// pins
int get_num_ref() { return ref; }
bool is_pinned_by(int by) { return ref_set.count(by); }
- set<int>& get_ref_set() { return ref_set; }
+ multiset<int>& get_ref_set() { return ref_set; }
+ virtual const char *pin_name(int by) = 0;
virtual void last_put() {}
virtual void bad_put(int by) {
- assert(ref_set.count(by) == 1);
+ assert(ref_set.count(by) > 0);
assert(ref > 0);
}
void put(int by) {
- if (ref == 0 || ref_set.count(by) != 1) {
+ if (ref == 0 || ref_set.count(by) == 0) {
bad_put(by);
} else {
ref--;
- ref_set.erase(by);
+ ref_set.erase(ref_set.find(by));
assert(ref == (int)ref_set.size());
if (ref == 0)
last_put();
virtual void first_get() {}
virtual void bad_get(int by) {
- assert(ref_set.count(by) == 0);
+ assert(by < 0 || ref_set.count(by) == 0);
assert(0);
}
void get(int by) {
- if (ref_set.count(by)) {
+ if (by >= 0 && ref_set.count(by)) {
bad_get(by);
} else {
if (ref == 0)
}
}
+ void print_pin_set(ostream& out) {
+ multiset<int>::iterator it = ref_set.begin();
+ while (it != ref_set.end()) {
+ out << " " << pin_name(*it);
+ int last = *it;
+ int c = 1;
+ do {
+ it++;
+ if (it == ref_set.end()) break;
+ } while (*it == last);
+ if (c > 1)
+ out << "*" << c;
+ }
+ }
// --------------------------------------------
dirfrag_t dirfrag;
list<bufferlist> dirstate; // a bl for reach dir
- list<dirfrag_t> exports;
+ list<dirfrag_t> bounds;
public:
MExportDir() {}
dirfrag_t get_dirfrag() { return dirfrag; }
list<bufferlist>& get_dirstate() { return dirstate; }
- list<dirfrag_t>& get_exports() { return exports; }
+ list<dirfrag_t>& get_bounds() { return bounds; }
void add_dir(bufferlist& dir) {
dirstate.push_back(dir);
dirstate = ls;
}
void add_export(dirfrag_t df) {
- exports.push_back(df);
+ bounds.push_back(df);
}
virtual void decode_payload() {
int off = 0;
payload.copy(off, sizeof(dirfrag), (char*)&dirfrag);
off += sizeof(dirfrag);
- ::_decode(exports, payload, off);
+ ::_decode(bounds, payload, off);
::_decode(dirstate, payload, off);
}
virtual void encode_payload() {
payload.append((char*)&dirfrag, sizeof(dirfrag));
- ::_encode(exports, payload);
+ ::_encode(bounds, payload);
::_encode(dirstate, payload);
}
dentries are the links to each inode.
dirs map includes base dir (ino)
*/
- list<dirfrag_t> exports;
+ list<dirfrag_t> bounds;
list<CInodeDiscover*> inodes;
map<inodeno_t,dirfrag_t> inode_dirfrag;
public:
dirfrag_t get_dirfrag() { return dirfrag; }
- list<dirfrag_t>& get_exports() { return exports; }
+ list<dirfrag_t>& get_bounds() { return bounds; }
list<CInodeDiscover*>& get_inodes() { return inodes; }
list<frag_t>& get_inode_dirfrags(inodeno_t ino) {
return frags_by_ino[ino];
}
void add_export(dirfrag_t df) {
- exports.push_back( df );
+ bounds.push_back( df );
}
void add_inode(dirfrag_t df, const string& dentry, CInodeDiscover *in) {
inodes.push_back(in);
payload.copy(off, sizeof(dirfrag), (char*)&dirfrag);
off += sizeof(dirfrag);
- ::_decode(exports, payload, off);
+ ::_decode(bounds, payload, off);
// inodes
int ni;
virtual void encode_payload() {
payload.append((char*)&dirfrag, sizeof(dirfrag));
- ::_encode(exports, payload);
+ ::_encode(bounds, payload);
// inodes
int ni = inodes.size();