}
// modifiers
+ void swap(fragtree_t& other) {
+ _splits.swap(other._splits);
+ }
void split(frag_t hb, int b) {
assert(_splits.count(hb) == 0);
_splits[hb] = b;
return;
}
- // add to our buffer
- size_t ondisk_size;
- assert(bl.length() > sizeof(ondisk_size));
- bl.copy(0, sizeof(ondisk_size), (char*)&ondisk_size);
- off_t have = bl.length() - sizeof(ondisk_size);
- dout(10) << "ondisk_size " << ondisk_size << ", have " << have << endl;
- assert(have == ondisk_size);
-
// decode.
- int off = sizeof(ondisk_size);
-
- __uint32_t num_dn;
+ int len = bl.length();
+ int off = 0;
version_t got_version;
- bl.copy(off, sizeof(num_dn), (char*)&num_dn);
- off += sizeof(num_dn);
bl.copy(off, sizeof(got_version), (char*)&got_version);
off += sizeof(got_version);
- dout(10) << "_fetched " << num_dn << " dn, got_version " << got_version
- << ", " << ondisk_size << " bytes"
+ dout(10) << "_fetched version " << got_version
+ << ", " << len << " bytes"
<< endl;
- while (num_dn--) {
- // dentry
+ while (off < len) {
+ // marker
+ char type = bl[off];
+ ++off;
+
+ // dname
string dname;
::_decode(dname, bl, off);
- dout(24) << "parse filename '" << dname << "'" << endl;
+ dout(24) << "_fetched parsed marker '" << type << "' dname '" << dname << "'" << endl;
CDentry *dn = lookup(dname); // existing dentry?
-
- char type = bl[off];
- ++off;
+
if (type == 'L') {
// hard link
inodeno_t ino;
if (dn) {
if (dn->get_inode() == 0) {
- dout(12) << "readdir had NEG dentry " << *dn << endl;
+ dout(12) << "_fetched had NEG dentry " << *dn << endl;
} else {
- dout(12) << "readdir had dentry " << *dn << endl;
+ dout(12) << "_fetched had dentry " << *dn << endl;
}
} else {
// (remote) link
CInode *in = cache->get_inode(ino); // we may or may not have it.
if (in) {
dn->link_remote(in);
- dout(12) << "readdir got remote link " << ino << " which we have " << *in << endl;
+ dout(12) << "_fetched got remote link " << ino << " which we have " << *in << endl;
} else {
- dout(12) << "readdir got remote link " << ino << " (dont' have it)" << endl;
+ dout(12) << "_fetched got remote link " << ino << " (dont' have it)" << endl;
}
}
}
string symlink;
if (inode.is_symlink())
::_decode(symlink, bl, off);
+
+ fragtree_t fragtree;
+ fragtree._decode(bl,off);
if (dn) {
if (dn->get_inode() == 0) {
- dout(12) << "readdir had NEG dentry " << *dn << endl;
+ dout(12) << "_fetched had NEG dentry " << *dn << endl;
} else {
- dout(12) << "readdir had dentry " << *dn << endl;
+ dout(12) << "_fetched had dentry " << *dn << endl;
}
} else {
// add inode
CInode *in = 0;
if (cache->have_inode(inode.ino)) {
in = cache->get_inode(inode.ino);
- dout(12) << "readdir got (but i already had) " << *in
+ dout(12) << "_fetched got (but i already had) " << *in
<< " mode " << in->inode.mode
<< " mtime " << in->inode.mtime << endl;
assert(0); // this shouldn't happen!!
if (in->is_symlink())
in->symlink = symlink;
+ // dirfragtree
+ in->dirfragtree.swap(fragtree);
+
// add
cache->add_inode( in );
// link
add_dentry( dname, in );
- dout(12) << "readdir got " << *in << " mode " << in->inode.mode << " mtime " << in->inode.mtime << endl;
+ dout(12) << "_fetched got " << *in << " mode " << in->inode.mode << " mtime " << in->inode.mtime << endl;
}
}
} else {
dn &&
dn->get_version() <= got_version &&
dn->is_dirty()) {
- dout(10) << "readdir had underwater dentry " << *dn << ", marking clean" << endl;
+ dout(10) << "_fetched had underwater dentry " << *dn << ", marking clean" << endl;
dn->mark_clean();
if (dn->get_inode()) {
assert(dn->get_inode()->get_version() <= got_version);
- dout(10) << "readdir had underwater inode " << *dn->get_inode() << ", marking clean" << endl;
+ dout(10) << "_fetched had underwater inode " << *dn->get_inode() << ", marking clean" << endl;
dn->get_inode()->mark_clean();
}
}
}
+ assert(off == len);
// take the loaded version?
// only if we are a fresh CDir* with no prior state.
if (cache->mds->logger) cache->mds->logger->inc("cdir");
// encode dentries
- bufferlist dnbl;
- __uint32_t num_dn = 0;
-
+ bufferlist bl;
+ bl.append((char*)&version, sizeof(version));
+
for (CDir_map_t::iterator it = items.begin();
it != items.end();
it++) {
// primary or remote?
if (dn->is_remote()) {
inodeno_t ino = dn->get_remote_ino();
- dout(14) << " pos " << dnbl.length() << " dn '" << it->first << "' remote ino " << ino << endl;
+ dout(14) << " pos " << bl.length() << " dn '" << it->first << "' remote ino " << ino << endl;
- // name, marker, ino
- dnbl.append( it->first.c_str(), it->first.length() + 1);
- dnbl.append( "L", 1 ); // remote link
- dnbl.append((char*)&ino, sizeof(ino));
+ // marker, name, ino
+ bl.append( "L", 1 ); // remote link
+ bl.append( it->first.c_str(), it->first.length() + 1);
+ bl.append((char*)&ino, sizeof(ino));
} else {
// primary link
CInode *in = dn->get_inode();
assert(in);
- dout(14) << " pos " << dnbl.length() << " dn '" << it->first << "' inode " << *in << endl;
+ dout(14) << " pos " << bl.length() << " dn '" << it->first << "' inode " << *in << endl;
- // name, marker, inode, [symlink string]
- dnbl.append( it->first.c_str(), it->first.length() + 1);
- dnbl.append( "I", 1 ); // inode
- dnbl.append( (char*) &in->inode, sizeof(inode_t));
+ // marker, name, inode, [symlink string]
+ bl.append( "I", 1 ); // inode
+ bl.append( it->first.c_str(), it->first.length() + 1);
+ bl.append( (char*) &in->inode, sizeof(inode_t));
if (in->is_symlink()) {
// include symlink destination!
dout(18) << " inlcuding symlink ptr " << in->symlink << endl;
- dnbl.append( (char*) in->symlink.c_str(), in->symlink.length() + 1);
+ bl.append( (char*) in->symlink.c_str(), in->symlink.length() + 1);
}
+
+ in->dirfragtree._encode(bl);
}
- num_dn++;
}
- // wrap it up
- bufferlist bl;
- size_t size;
- size = dnbl.length() + sizeof(num_dn) + sizeof(version);
- bl.append((char*)&size, sizeof(size));
- bl.append((char*)&num_dn, sizeof(num_dn));
- bl.append((char*)&version, sizeof(version));
- bl.claim_append(dnbl);
- assert(size == bl.length() - sizeof(size));
-
// write it.
cache->mds->objecter->write( get_ondisk_object(),
0, bl.length(),
class CDirDiscover {
dirfrag_t dirfrag;
int nonce;
- int dir_auth;
+ //int dir_auth2;
int dir_rep;
set<int> rep_by;
CDirDiscover(CDir *dir, int nonce) {
dirfrag = dir->dirfrag();
this->nonce = nonce;
- //dir_auth = dir->dir_auth.first;
+ //dir_auth2 = dir->dir_auth.second;
dir_rep = dir->dir_rep;
rep_by = dir->dir_rep_by;
}
void _encode(bufferlist& bl) {
bl.append((char*)&dirfrag, sizeof(dirfrag));
bl.append((char*)&nonce, sizeof(nonce));
- bl.append((char*)&dir_auth, sizeof(dir_auth));
+ //bl.append((char*)&dir_auth, sizeof(dir_auth));
bl.append((char*)&dir_rep, sizeof(dir_rep));
::_encode(rep_by, bl);
}
off += sizeof(dirfrag);
bl.copy(off, sizeof(nonce), (char*)&nonce);
off += sizeof(nonce);
- bl.copy(off, sizeof(dir_auth), (char*)&dir_auth);
- off += sizeof(dir_auth);
+ //bl.copy(off, sizeof(dir_auth), (char*)&dir_auth);
+ //off += sizeof(dir_auth);
bl.copy(off, sizeof(dir_rep), (char*)&dir_rep);
off += sizeof(dir_rep);
::_decode(rep_by, bl, off);
}
if (!cur->dir) cur->get_or_open_dir(this);
-
+ assert(cur->dir);
+
+ // is dir frozen?
+ if (cur->dir->is_frozen()) {
+ dout(7) << *cur->dir << " is frozen, stopping" << endl;
+ break;
+ }
+
+ // add the dir.
reply->add_dir( new CDirDiscover( cur->dir,
cur->dir->add_replica( dis->get_asker() ) ) );
dout(7) << "added dir " << *cur->dir << endl;
}
if (dis->get_want().depth() == 0) break;
+
+ // is dir frozen?
+ if (cur->dir->is_frozen()) {
+ dout(7) << *cur->dir << " is frozen, stopping" << endl;
+ break;
+ }
// lookup dentry
int dentry_auth = cur->dir->dentry_authority( dis->get_dentry(i) ).first;
if (dentry_auth != mds->get_nodeid()) {
- dout(7) << *cur->dir << "dentry " << dis->get_dentry(i) << " auth " << dentry_auth << ", i'm done." << endl;
+ dout(7) << *cur->dir << " dentry " << dis->get_dentry(i) << " auth " << dentry_auth << ", i'm done." << endl;
break; // that's it for us!
}
// set dir_auth hint?
if (cur->is_dir() && cur->dir &&
- cur->is_auth() && !cur->dir->is_auth()) {
+ !cur->dir->is_auth()) {
dout(7) << "setting dir_auth_hint for " << *cur->dir << endl;
reply->set_dir_auth_hint(cur->dir->authority().first);
}
bc, no flag to indicate a dir discover is underway, (as there is w/ a dentry one).
this is actually good, since (dir aside) they're asking for different information.
*/
- dout(7) << "had " << *cur->dir;
+ dout(7) << "had " << *cur->dir << endl;
m->get_dir(i).update_dir(cur->dir);
- dout2(7) << ", now " << *cur->dir << endl;
+ dout2(7) << "now " << *cur->dir << endl;
} else {
// add it (_replica_)
CDir *ndir = cur->add_dirfrag( new CDir(cur, frag_t(), this, false) ); // FIXME dirfrag_t
#include "messages/MExportDirDiscoverAck.h"
#include "messages/MExportDirPrep.h"
#include "messages/MExportDirPrepAck.h"
-#include "messages/MExportDirWarning.h"
-#include "messages/MExportDirWarningAck.h"
#include "messages/MExportDir.h"
#include "messages/MExportDirAck.h"
#include "messages/MExportDirNotify.h"
case MSG_MDS_EXPORTDIRPREPACK:
handle_export_prep_ack((MExportDirPrepAck*)m);
break;
- case MSG_MDS_EXPORTDIRWARNINGACK:
- handle_export_warning_ack((MExportDirWarningAck*)m);
- break;
case MSG_MDS_EXPORTDIRACK:
handle_export_ack((MExportDirAck*)m);
break;
break;
// export 3rd party (dir_auth adjustments)
- case MSG_MDS_EXPORTDIRWARNING:
- handle_export_warning((MExportDirWarning*)m);
- break;
case MSG_MDS_EXPORTDIRNOTIFY:
handle_export_notify((MExportDirNotify*)m);
break;
-
- // hashing
- /*
- case MSG_MDS_HASHDIRDISCOVER:
- handle_hash_dir_discover((MHashDirDiscover*)m);
- break;
- case MSG_MDS_HASHDIRDISCOVERACK:
- handle_hash_dir_discover_ack((MHashDirDiscoverAck*)m);
- break;
- case MSG_MDS_HASHDIRPREP:
- handle_hash_dir_prep((MHashDirPrep*)m);
- break;
- case MSG_MDS_HASHDIRPREPACK:
- handle_hash_dir_prep_ack((MHashDirPrepAck*)m);
- break;
- case MSG_MDS_HASHDIR:
- handle_hash_dir((MHashDir*)m);
- break;
- case MSG_MDS_HASHDIRACK:
- handle_hash_dir_ack((MHashDirAck*)m);
- break;
- case MSG_MDS_HASHDIRNOTIFY:
- handle_hash_dir_notify((MHashDirNotify*)m);
- break;
-
- // unhashing
- case MSG_MDS_UNHASHDIRPREP:
- handle_unhash_dir_prep((MUnhashDirPrep*)m);
- break;
- case MSG_MDS_UNHASHDIRPREPACK:
- handle_unhash_dir_prep_ack((MUnhashDirPrepAck*)m);
- break;
- case MSG_MDS_UNHASHDIR:
- handle_unhash_dir((MUnhashDir*)m);
- break;
- case MSG_MDS_UNHASHDIRACK:
- handle_unhash_dir_ack((MUnhashDirAck*)m);
- break;
- case MSG_MDS_UNHASHDIRNOTIFY:
- handle_unhash_dir_notify((MUnhashDirNotify*)m);
- break;
- case MSG_MDS_UNHASHDIRNOTIFYACK:
- handle_unhash_dir_notify_ack((MUnhashDirNotifyAck*)m);
- break;
- */
-
default:
assert(0);
}
// send ExportDirDiscover (ask target)
mds->send_message_mds(new MExportDirDiscover(dir), dest, MDS_PORT_MIGRATOR);
- dir->auth_pin(); // pin dir, to hang up our freeze (unpin on prep ack)
+ dir->auth_pin(); // pin dir, to hang up our freeze (unpin on discover ack)
// take away the popularity we're sending. FIXME: do this later?
mds->balancer->subtract_export(dir);
inode_trace.push_front(cur->inode);
dout(7) << " will add " << *cur->inode << endl;
- // include dir? note: this'll include everything except the nested exports themselves,
- // since someone else is obviously auth.
- if (cur->is_auth()) {
+ // include dir?
+ // note: don't replicate ambiguous auth items! they're
+ // frozen anyway.
+ if (cur->is_auth() && !cur->auth_is_ambiguous()) {
prep->add_dir( new CDirDiscover(cur, cur->add_replica(dest)) ); // yay!
dout(7) << " added " << *cur << endl;
}
export_warning_ack_waiting[dir].insert(p->first);
export_notify_ack_waiting[dir].insert(p->first); // we'll eventually get a notifyack, too!
- //mds->send_message_mds(new MExportDirWarning(dir->ino(), export_peer[dir]),
- //p->first, MDS_PORT_MIGRATOR);
MExportDirNotify *notify = new MExportDirNotify(dir->dirfrag(), true,
pair<int,int>(mds->get_nodeid(),CDIR_AUTH_UNKNOWN),
pair<int,int>(mds->get_nodeid(),export_peer[dir]));
}
-void Migrator::handle_export_warning_ack(MExportDirWarningAck *m)
-{
- CInode *in = cache->get_inode(m->get_ino());
- assert(in);
- CDir *dir = in->dir;
- assert(dir);
-
- dout(7) << "export_warning_ack " << *dir << " from " << m->get_source() << endl;
-
- if (export_state.count(dir) == 0 ||
- export_state[dir] != EXPORT_WARNING) {
- // export must have aborted.
- dout(7) << "export must have aborted" << endl;
- delete m;
- return;
- }
-
- // process the warning_ack
- int from = m->get_source().num();
- assert(export_warning_ack_waiting.count(dir));
- export_warning_ack_waiting[dir].erase(from);
-
- if (export_warning_ack_waiting[dir].empty())
- export_go(dir); // start export.
-
- // done
- delete m;
-}
-
-
void Migrator::export_go(CDir *dir)
{
assert(export_peer.count(dir));
// send finish/commit to new auth
if (mds->mdsmap->is_active_or_stopping(export_peer[dir])) {
- mds->send_message_mds(new MExportDirFinish(dir->ino()), export_peer[dir], MDS_PORT_MIGRATOR);
+ mds->send_message_mds(new MExportDirFinish(dir->dirfrag()),
+ export_peer[dir], MDS_PORT_MIGRATOR);
} else {
dout(7) << "not sending MExportDirFinish, dest has failed" << endl;
}
for (list<dirfrag_t>::iterator it = m->get_bounds().begin();
it != m->get_bounds().end();
it++) {
- dout(7) << " checking dir " << hex << *it << dec << endl;
+ dout(7) << " checking bound " << hex << *it << dec << endl;
CInode *in = cache->get_inode(it->ino);
assert(in);
bound->state_set(CDir::STATE_IMPORTBOUND);
import_bounds[dir].insert(bound);
} else {
- dout(7) << " already pinned import bound " << *dir << endl;
+ dout(7) << " already pinned import bound " << *bound << endl;
}
} else {
dout(7) << " waiting for nested export dir on " << *cache->get_inode(df.ino) << endl;
void Migrator::handle_export_finish(MExportDirFinish *m)
{
- CDir *dir = cache->get_dir(m->get_ino());
+ CDir *dir = cache->get_dirfrag(m->get_dirfrag());
assert(dir);
dout(7) << "handle_export_finish on " << *dir << endl;
import_finish(dir);
void Migrator::decode_import_inode(CDentry *dn, bufferlist& bl, int& off, int oldauth)
{
+ dout(15) << "decode_import_inode on " << *dn << endl;
+
CInodeExport istate;
off = istate._decode(bl, off);
- dout(15) << "got a cinodeexport " << endl;
bool added = false;
CInode *in = cache->get_inode(istate.get_ino());
// dentry
string dname;
_decode(dname, bl, off);
- dout(15) << "dname is " << dname << endl;
CDentry *dn = dir->lookup(dname);
if (!dn)
// decode state
dn->decode_import_state(bl, off, oldauth, mds->get_nodeid());
+ dout(15) << "decode_import_dir got " << *dn << endl;
// points to...
char icode;
// authority bystander
-void Migrator::handle_export_warning(MExportDirWarning *m)
-{
- CDir *dir = cache->get_dir(m->get_ino());
-
- int oldauth = m->get_source().num();
- int newauth = m->get_new_dir_auth();
- if (dir) {
- dout(7) << "handle_export_warning mds" << oldauth
- << " -> mds" << newauth
- << " on " << *dir << endl;
- cache->adjust_subtree_auth(dir, oldauth, newauth);
- // verify?
- } else {
- dout(7) << "handle_export_warning on dir " << m->get_ino() << ", acking" << endl;
- }
-
- // send the ack
- mds->send_message_mds(new MExportDirWarningAck(m->get_ino()),
- m->get_source().num(), MDS_PORT_MIGRATOR);
-
- delete m;
-
-}
-
-
void Migrator::handle_export_notify(MExportDirNotify *m)
{
CDir *dir = cache->get_dirfrag(m->get_dirfrag());
class MExportDirDiscoverAck;
class MExportDirPrep;
class MExportDirPrepAck;
-class MExportDirWarning;
-class MExportDirWarningAck;
class MExportDir;
class MExportDirAck;
class MExportDirNotify;
protected:
void handle_export_discover_ack(MExportDirDiscoverAck *m);
void export_frozen(CDir *dir, int dest);
- //void export_start_logged(CDir *dir, MExportDirPrep *prep, int dest);
void handle_export_prep_ack(MExportDirPrepAck *m);
- void handle_export_warning_ack(MExportDirWarningAck *m);
void export_go(CDir *dir);
int encode_export_dir(list<bufferlist>& dirstatelist,
class C_Contexts *fin,
void export_finish(CDir *dir);
friend class C_MDC_ExportFreeze;
- friend class C_MDC_ExportStartLogged;
friend class C_MDS_ExportFinishLogged;
// importer
void handle_export_discover(MExportDirDiscover *m);
int oldauth,
CDir *import_root,
EImportStart *le);
- /*
- void got_hashed_replica(CDir *import,
- inodeno_t dir_ino,
- inodeno_t replica_ino);
- */
public:
void import_reverse(CDir *dir, bool fix_dir_auth=true);
protected:
friend class C_MDS_ImportDirLoggedFinish;
// bystander
- void handle_export_warning(MExportDirWarning *m);
void handle_export_notify(MExportDirNotify *m);
}
virtual char *get_type_name() { return "ExDis"; }
void print(ostream& o) {
- o << "export_discover " << dirfrag << " " << path;
+ o << "export_discover(" << dirfrag << " " << path << ")";
}
virtual void decode_payload() {
virtual char *get_type_name() { return "ExDisA"; }
void print(ostream& o) {
- o << "export_discover_ack " << dirfrag;
+ o << "export_discover_ack(" << dirfrag;
if (success)
- o << " success";
+ o << " success)";
else
- o << " failure";
+ o << " failure)";
}
virtual void decode_payload() {
#include "msg/Message.h"
class MExportDirFinish : public Message {
- inodeno_t ino;
+ dirfrag_t dirfrag;
public:
- inodeno_t get_ino() { return ino; }
+ dirfrag_t get_dirfrag() { return dirfrag; }
MExportDirFinish() {}
- MExportDirFinish(inodeno_t ino) :
+ MExportDirFinish(dirfrag_t dirfrag) :
Message(MSG_MDS_EXPORTDIRFINISH) {
- this->ino = ino;
+ this->dirfrag = dirfrag;
}
virtual char *get_type_name() { return "ExFin"; }
void print(ostream& o) {
- o << "export_finish(" << ino << ")";
+ o << "export_finish(" << dirfrag << ")";
}
virtual void decode_payload() {
int off = 0;
- payload.copy(off, sizeof(ino), (char*)&ino);
- off += sizeof(ino);
+ payload.copy(off, sizeof(dirfrag), (char*)&dirfrag);
+ off += sizeof(dirfrag);
}
virtual void encode_payload() {
- payload.append((char*)&ino, sizeof(ino));
+ payload.append((char*)&dirfrag, sizeof(dirfrag));
}
};
dout(1) << "---- " << m->get_dest()
<< " <- " << m->get_source()
<< " ---- " << *m
+ << " (" << m << ")"
<< endl;
if (g_conf.fakemessenger_serialize) {
}
dm->queue_incoming(m);
- dout(1) << "--> " << get_myname() << " -> " << inst.name << " " << *m << endl;
+ dout(1) << "--> " << get_myname() << " -> " << inst.name
+ << " " << *m
+ << " (" << m << ")"
+ << endl;
}
catch (...) {