#include "MDS.h"
#include "MDLog.h"
+#include "LogSegment.h"
#include "events/EAnchorClient.h"
#include "messages/MAnchor.h"
*pending_create_prepare[ino].patid = atid;
pending_create_prepare.erase(ino);
- pending_commit.insert(atid);
-
if (onfinish) {
onfinish->finish(0);
delete onfinish;
*pending_destroy_prepare[ino].patid = atid;
pending_destroy_prepare.erase(ino);
- pending_commit.insert(atid);
-
if (onfinish) {
onfinish->finish(0);
delete onfinish;
*pending_update_prepare[ino].patid = atid;
pending_update_prepare.erase(ino);
- pending_commit.insert(atid);
-
if (onfinish) {
onfinish->finish(0);
delete onfinish;
// remove from committing list
assert(pending_commit.count(atid));
- pending_commit.erase(atid);
-
+ assert(pending_commit[atid]->pending_commit_atids.count(atid));
+
// log ACK.
- mds->mdlog->submit_entry(new EAnchorClient(ANCHOR_OP_ACK, atid));
-
- // kick any waiters
- if (ack_waiters.count(atid)) {
- dout(15) << "kicking waiters on atid " << atid << dendl;
- mds->queue_waiters(ack_waiters[atid]);
- ack_waiters.erase(atid);
- }
+ mds->mdlog->submit_entry(new EAnchorClient(ANCHOR_OP_ACK, atid),
+ new C_LoggedAck(this, atid));
}
break;
}
+void AnchorClient::_logged_ack(version_t atid)
+{
+ dout(10) << "_logged_ack" << dendl;
+
+ assert(pending_commit.count(atid));
+ assert(pending_commit[atid]->pending_commit_atids.count(atid));
+
+ pending_commit[atid]->pending_commit_atids.erase(atid);
+ pending_commit.erase(atid);
+
+ // kick any waiters (LogSegment trim)
+ if (ack_waiters.count(atid)) {
+ dout(15) << "kicking ack waiters on atid " << atid << dendl;
+ mds->queue_waiters(ack_waiters[atid]);
+ ack_waiters.erase(atid);
+ }
+}
+
/*
* public async interface
// COMMIT
-void AnchorClient::commit(version_t atid)
+void AnchorClient::commit(version_t atid, LogSegment *ls)
{
dout(10) << "commit " << atid << dendl;
- assert(pending_commit.count(atid));
- pending_commit.insert(atid);
+ assert(pending_commit.count(atid) == 0);
+ pending_commit[atid] = ls;
+ ls->pending_commit_atids.insert(atid);
// send message
MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, atid);
void AnchorClient::resend_commits()
{
- for (set<version_t>::iterator p = pending_commit.begin();
+ for (map<version_t,LogSegment*>::iterator p = pending_commit.begin();
p != pending_commit.end();
++p) {
- dout(10) << "resending commit on " << *p << dendl;
- MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, *p);
+ dout(10) << "resending commit on " << p->first << dendl;
+ MAnchor *req = new MAnchor(ANCHOR_OP_COMMIT, 0, p->first);
mds->send_message_mds(req,
mds->mdsmap->get_anchortable(),
MDS_PORT_ANCHORTABLE, MDS_PORT_ANCHORCLIENT);
class Context;
class MDS;
+class LogSegment;
class AnchorClient : public Dispatcher {
MDS *mds;
hash_map<inodeno_t, _pending_prepare> pending_update_prepare;
// pending commits
- set<version_t> pending_commit;
+ map<version_t, LogSegment*> pending_commit;
map<version_t, list<Context*> > ack_waiters;
void handle_anchor_reply(class MAnchor *m);
+ class C_LoggedAck : public Context {
+ AnchorClient *ac;
+ version_t atid;
+ public:
+ C_LoggedAck(AnchorClient *a, version_t t) : ac(a), atid(t) {}
+ void finish(int r) {
+ ac->_logged_ack(atid);
+ }
+ };
+ void _logged_ack(version_t atid);
+
public:
AnchorClient(MDS *m) : mds(m) {}
void prepare_destroy(inodeno_t ino, version_t *atid, Context *onfinish);
void prepare_update(inodeno_t ino, vector<Anchor>& trace, version_t *atid, Context *onfinish);
- void commit(version_t atid);
+ void commit(version_t atid, LogSegment *ls);
// for recovery (by other nodes)
void handle_mds_recovery(int mds); // called when someone else recovers
void resend_prepares(hash_map<inodeno_t, _pending_prepare>& prepares, int op);
// for recovery (by me)
- void got_journaled_agree(version_t atid) {
- pending_commit.insert(atid);
+ void got_journaled_agree(version_t atid, LogSegment *ls) {
+ pending_commit[atid] = ls;
}
void got_journaled_ack(version_t atid) {
pending_commit.erase(atid);
void CDentry::add_waiter(int tag, Context *c)
{
// wait on the directory?
- if (tag & (WAIT_AUTHPINNABLE|WAIT_SINGLEAUTH)) {
+ if (tag & (WAIT_UNFREEZE|WAIT_SINGLEAUTH)) {
dir->add_waiter(tag, c);
return;
}
// -- exporting
// note: this assumes the dentry already exists.
// i.e., the name is already extracted... so we just need the other state.
- void encode_export_state(bufferlist& bl) {
- bl.append((char*)&state, sizeof(state));
- bl.append((char*)&version, sizeof(version));
- bl.append((char*)&projected_version, sizeof(projected_version));
+ void encode_export(bufferlist& bl) {
+ ::_encode_simple(state, bl);
+ ::_encode_simple(version, bl);
+ ::_encode_simple(projected_version, bl);
lock._encode(bl);
- ::_encode(replica_map, bl);
-
+ ::_encode_simple(replica_map, bl);
+ }
+ void finish_export() {
// twiddle
clear_replica_map();
replica_nonce = EXPORT_NONCE;
if (is_dirty())
mark_clean();
}
+
void decode_import_state(bufferlist& bl, int& off, int from, int to, LogSegment *ls) {
int nstate;
bl.copy(off, sizeof(nstate), (char*)&nstate);
if (!can_auth_pin() && !ignore_authpinnability) {
dout(7) << "fetch waiting for authpinnable" << dendl;
- add_waiter(WAIT_AUTHPINNABLE, c);
+ add_waiter(WAIT_UNFREEZE, c);
return;
}
static const int WAIT_DNLOCK_OFFSET = 4;
static const int WAIT_ANY = (0xffffffff);
- static const int WAIT_ATFREEZEROOT = (WAIT_AUTHPINNABLE|WAIT_UNFREEZE);
+ static const int WAIT_ATFREEZEROOT = (WAIT_UNFREEZE);
static const int WAIT_ATSUBTREEROOT = (WAIT_SINGLEAUTH);
st.pop_me = dir->pop_me;
st.pop_auth_subtree = dir->pop_auth_subtree;
+ /*
dir->pop_auth_subtree_nested -= dir->pop_auth_subtree;
dir->pop_me.zero(now);
dir->pop_auth_subtree.zero(now);
-
+ */
rep_by = dir->dir_rep_by;
replicas = dir->replica_map;
}
void CInode::add_waiter(int tag, Context *c)
{
// wait on the directory?
- if (tag & (WAIT_AUTHPINNABLE|WAIT_SINGLEAUTH)) {
+ if (tag & (WAIT_UNFREEZE|WAIT_SINGLEAUTH)) {
parent->dir->add_waiter(tag, c);
return;
}
if (!object->can_auth_pin()) {
// wait
dout(10) << " can't auth_pin (freezing?), waiting to authpin " << *object << dendl;
- object->add_waiter(MDSCacheObject::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
+ object->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
mds->locker->drop_locks(mdr);
mdr->drop_local_auth_pins();
return false;
if (!lock->get_parent()->can_auth_pin()) {
dout(7) << "try_simple_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
//if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
- lock->get_parent()->add_waiter(MDSCacheObject::WAIT_AUTHPINNABLE, new C_Locker_SimpleEval(this, lock));
+ lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_SimpleEval(this, lock));
return;
}
if (!lock->get_parent()->can_auth_pin()) {
dout(7) << "try_scatter_eval can't auth_pin, waiting on " << *lock->get_parent() << dendl;
//if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
- lock->get_parent()->add_waiter(MDSCacheObject::WAIT_AUTHPINNABLE, new C_Locker_ScatterEval(this, lock));
+ lock->get_parent()->add_waiter(MDSCacheObject::WAIT_UNFREEZE, new C_Locker_ScatterEval(this, lock));
return;
}
if (!lock->get_parent()->can_auth_pin()) {
dout(7) << "try_file_eval can't auth_pin, waiting on " << *in << dendl;
//if (!lock->get_parent()->is_waiter(MDSCacheObject::WAIT_SINGLEAUTH))
- in->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_Locker_FileEval(this, lock));
+ in->add_waiter(CInode::WAIT_UNFREEZE, new C_Locker_FileEval(this, lock));
return;
}
#include "include/interval_set.h"
#include "include/Context.h"
+#include <ext/hash_set>
+using __gnu_cxx::hash_set;
+
class CDir;
class CInode;
class CDentry;
map<CInode*, map<off_t,off_t> > purging_inodes;
// committed anchor transactions
- interval_set<version_t> atids;
+ hash_set<version_t> pending_commit_atids;
// client request ids
map<int, tid_t> last_client_tids;
if (!in->can_auth_pin() &&
!mdr->is_auth_pinned(in)) {
dout(7) << "anchor_create not authpinnable, waiting on " << *in << dendl;
- in->add_waiter(CInode::WAIT_AUTHPINNABLE, onfinish);
+ in->add_waiter(CInode::WAIT_UNFREEZE, onfinish);
return;
}
MDCache *cache;
CInode *in;
version_t atid;
- version_t pdv;
LogSegment *ls;
public:
- C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t t, version_t v, LogSegment *s) :
- cache(c), in(i), atid(t), pdv(v), ls(s) {}
+ C_MDC_AnchorCreateLogged(MDCache *c, CInode *i, version_t t, LogSegment *s) :
+ cache(c), in(i), atid(t), ls(s) {}
void finish(int r) {
- cache->_anchor_create_logged(in, atid, pdv, ls);
+ cache->_anchor_create_logged(in, atid, ls);
}
};
dout(10) << "_anchor_create_prepared " << *in << " atid " << atid << dendl;
assert(in->inode.anchored == false);
- // predirty, prepare log entry
- version_t pdv = in->pre_dirty();
-
- EUpdate *le = new EUpdate(mds->mdlog, "anchor_create");
- le->metablob.add_dir_context(in->get_parent_dir());
-
// update the logged inode copy
- inode_t *pi = le->metablob.add_dentry(in->parent, true);
+ inode_t *pi = in->project_inode();
pi->anchored = true;
- pi->version = pdv;
+ pi->version = in->pre_dirty();
// note anchor transaction
+ EUpdate *le = new EUpdate(mds->mdlog, "anchor_create");
+ le->metablob.add_dir_context(in->get_parent_dir());
+ le->metablob.add_primary_dentry(in->parent, true, 0, pi);
le->metablob.add_anchor_transaction(atid);
-
- // log + wait
- mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, atid, pdv,
+ mds->mdlog->submit_entry(le, new C_MDC_AnchorCreateLogged(this, in, atid,
mds->mdlog->get_current_segment()));
}
-void MDCache::_anchor_create_logged(CInode *in, version_t atid, version_t pdv, LogSegment *ls)
+void MDCache::_anchor_create_logged(CInode *in, version_t atid, LogSegment *ls)
{
- dout(10) << "_anchor_create_logged pdv " << pdv << " on " << *in << dendl;
+ dout(10) << "_anchor_create_logged on " << *in << dendl;
// unpin
assert(in->state_test(CInode::STATE_ANCHORING));
in->auth_unpin();
// apply update to cache
- in->inode.anchored = true;
- in->mark_dirty(pdv, ls);
+ in->pop_and_dirty_projected_inode(ls);
// tell the anchortable we've committed
- mds->anchorclient->commit(atid);
+ mds->anchorclient->commit(atid, ls);
// trigger waiters
in->finish_waiting(CInode::WAIT_ANCHORED, 0);
if (!in->can_auth_pin()/* &&
!mdr->is_auth_pinned(in)*/) {
dout(7) << "anchor_destroy not authpinnable, waiting on " << *in << dendl;
- in->add_waiter(CInode::WAIT_AUTHPINNABLE, onfinish);
+ in->add_waiter(CInode::WAIT_UNFREEZE, onfinish);
return;
}
MDCache *cache;
CInode *in;
version_t atid;
- version_t pdv;
+ LogSegment *ls;
public:
- C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t t, version_t v) :
- cache(c), in(i), atid(t), pdv(v) {}
+ C_MDC_AnchorDestroyLogged(MDCache *c, CInode *i, version_t t, LogSegment *l) :
+ cache(c), in(i), atid(t), ls(l) {}
void finish(int r) {
- cache->_anchor_destroy_logged(in, atid, pdv);
+ cache->_anchor_destroy_logged(in, atid, ls);
}
};
assert(in->inode.anchored == true);
- // predirty, prepare log entry
- version_t pdv = in->pre_dirty();
-
- EUpdate *le = new EUpdate(mds->mdlog, "anchor_destroy");
- le->metablob.add_dir_context(in->get_parent_dir());
-
// update the logged inode copy
- inode_t *pi = le->metablob.add_dentry(in->parent, true);
+ inode_t *pi = in->project_inode();
pi->anchored = true;
- pi->version = pdv;
-
- // note anchor transaction
- le->metablob.add_anchor_transaction(atid);
+ pi->version = in->pre_dirty();
// log + wait
- mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, atid, pdv));
+ EUpdate *le = new EUpdate(mds->mdlog, "anchor_destroy");
+ le->metablob.add_dir_context(in->get_parent_dir());
+ le->metablob.add_primary_dentry(in->parent, true, 0, pi);
+ le->metablob.add_anchor_transaction(atid);
+ mds->mdlog->submit_entry(le, new C_MDC_AnchorDestroyLogged(this, in, atid, mds->mdlog->get_current_segment()));
}
-void MDCache::_anchor_destroy_logged(CInode *in, version_t atid, version_t pdv)
+void MDCache::_anchor_destroy_logged(CInode *in, version_t atid, LogSegment *ls)
{
- dout(10) << "_anchor_destroy_logged pdv " << pdv << " on " << *in << dendl;
+ dout(10) << "_anchor_destroy_logged on " << *in << dendl;
// unpin
assert(in->state_test(CInode::STATE_UNANCHORING));
in->auth_unpin();
// apply update to cache
- in->inode.anchored = false;
- in->inode.version = pdv;
-
+ in->pop_and_dirty_projected_inode(ls);
+
// tell the anchortable we've committed
- mds->anchorclient->commit(atid);
+ mds->anchorclient->commit(atid, ls);
// trigger waiters
in->finish_waiting(CInode::WAIT_UNANCHORED, 0);
version_t dst_reanchor_atid; // dst->stray
bufferlist inode_import;
version_t inode_import_v;
+ CInode *inode_export; // inode we're exporting, if any
CDentry *srcdn; // srcdn, if auth, on slave
// called when slave commits
ls(0),
done_locking(false), committing(false), aborted(false),
src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
+ inode_export(0), srcdn(0),
slave_commit(0) { }
MDRequest(metareqid_t ri, MClientRequest *req) :
reqid(ri), client_request(req), ref(0),
ls(0),
done_locking(false), committing(false), aborted(false),
src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
+ inode_export(0), srcdn(0),
slave_commit(0) { }
MDRequest(metareqid_t ri, int by) :
reqid(ri), client_request(0), ref(0),
ls(0),
done_locking(false), committing(false), aborted(false),
src_reanchor_atid(0), dst_reanchor_atid(0), inode_import_v(0),
+ inode_export(0), srcdn(0),
slave_commit(0) { }
bool is_master() { return slave_to_mds < 0; }
void anchor_destroy(CInode *in, Context *onfinish);
protected:
void _anchor_create_prepared(CInode *in, version_t atid);
- void _anchor_create_logged(CInode *in, version_t atid, version_t pdv, LogSegment *ls);
+ void _anchor_create_logged(CInode *in, version_t atid, LogSegment *ls);
void _anchor_destroy_prepared(CInode *in, version_t atid);
- void _anchor_destroy_logged(CInode *in, version_t atid, version_t pdv);
+ void _anchor_destroy_logged(CInode *in, version_t atid, LogSegment *ls);
friend class C_MDC_AnchorCreatePrepared;
friend class C_MDC_AnchorCreateLogged;
export_warning_ack_waiting.erase(dir);
export_state[dir] = EXPORT_EXPORTING;
- assert(export_data.count(dir) == 0);
assert(dir->get_cum_auth_pins() == 0);
// set ambiguous auth
// fill export message with cache data
utime_t now = g_clock.now();
- C_Contexts *fin = new C_Contexts; // collect all the waiters
map<int,entity_inst_t> exported_client_map;
- int num_exported_inodes = encode_export_dir( export_data[dir],
- fin,
- dir, // base
+ list<bufferlist> export_data;
+ int num_exported_inodes = encode_export_dir( export_data,
dir, // recur start point
- dest,
exported_client_map,
now );
bufferlist bl;
::_encode(exported_client_map, bl);
- export_data[dir].push_front(bl);
+ export_data.push_front(bl);
// send the export data!
MExportDir *req = new MExportDir(dir->dirfrag());
- req->set_dirstate(export_data[dir]);
+ req->take_dirstate(export_data);
// add bounds to message
set<CDir*> bounds;
++p)
req->add_export((*p)->dirfrag());
- //s end
+ // send
mds->send_message_mds(req, dest, MDS_PORT_MIGRATOR);
- // queue up the finisher
- dir->add_waiter( CDir::WAIT_UNFREEZE, fin );
-
// stats
if (mds->logger) mds->logger->inc("ex");
if (mds->logger) mds->logger->inc("iex", num_exported_inodes);
* encode relevant state to be sent over the wire.
* used by: encode_export_dir, file_rename (if foreign)
*/
-void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state, int new_auth,
+void Migrator::encode_export_inode(CInode *in, bufferlist& enc_state,
map<int,entity_inst_t>& exported_client_map,
utime_t now)
{
+ dout(7) << "encode_export_inode " << *in << dendl;
+ assert(!in->is_replica(mds->get_nodeid()));
+
+ CInodeExport istate(in, now);
+ istate._encode(enc_state);
+
+ // make note of clients named by exported capabilities
+ for (map<int, Capability>::iterator it = in->client_caps.begin();
+ it != in->client_caps.end();
+ it++)
+ exported_client_map[it->first] = mds->clientmap.get_inst(it->first);
+}
+
+void Migrator::finish_export_inode(CInode *in, C_Contexts *fin)
+{
+ dout(12) << "finish_export_inode " << *in << dendl;
+
// tell (all) clients about migrating caps.. mark STALE
for (map<int, Capability>::iterator it = in->client_caps.begin();
it != in->client_caps.end();
it++) {
- dout(7) << "encode_export_inode " << *in << " telling client" << it->first << " stale caps" << dendl;
+ dout(7) << "finish_export_inode telling client" << it->first
+ << " stale caps on " << *in << dendl;
MClientFileCaps *m = new MClientFileCaps(MClientFileCaps::OP_STALE,
in->inode,
it->second.get_last_seq(),
it->second.pending(),
it->second.wanted());
entity_inst_t inst = mds->clientmap.get_inst(it->first);
- exported_client_map[it->first] = inst;
mds->send_message_client_maybe_open(m, inst);
}
if (!in->is_replicated())
in->replicate_relax_locks();
- // add inode
- assert(!in->is_replica(mds->get_nodeid()));
- CInodeExport istate(in, now);
- istate._encode( enc_state );
-
- // we're export this inode; fix inode state
- dout(7) << "encode_export_inode " << *in << dendl;
-
+ // clean
if (in->is_dirty()) in->mark_clean();
// clear/unpin cached_by (we're no longer the authority)
in->state_clear(CInode::STATE_AUTH);
in->replica_nonce = CInode::EXPORT_NONCE;
+ // waiters
+ list<Context*> waiters;
+ in->take_waiting(CInode::WAIT_ANY, waiters);
+ fin->take(waiters);
+
// *** other state too?
// move to end of LRU so we drop out of cache quickly!
if (in->get_parent_dn())
cache->lru.lru_bottouch(in->get_parent_dn());
-}
+}
int Migrator::encode_export_dir(list<bufferlist>& dirstatelist,
- C_Contexts *fin,
- CDir *basedir,
CDir *dir,
- int newauth,
map<int,entity_inst_t>& exported_client_map,
utime_t now)
{
// dir
bufferlist enc_dir;
-
CDirExport dstate(dir, now);
dstate._encode( enc_dir );
- // release open_by
- dir->clear_replica_map();
-
- // mark
- assert(dir->is_auth());
- dir->state_clear(CDir::STATE_AUTH);
- dir->replica_nonce = CDir::NONCE_EXPORT;
-
- list<CDir*> subdirs;
-
- if (dir->is_dirty())
- dir->mark_clean();
-
- // discard most dir state
- dir->state &= CDir::MASK_STATE_EXPORT_KEPT; // i only retain a few things.
-
- // suck up all waiters
- list<Context*> waiting;
- dir->take_waiting(CDir::WAIT_ANY, waiting); // all dir waiters
- fin->take(waiting);
-
// dentries
+ list<CDir*> subdirs;
CDir::map_t::iterator it;
for (it = dir->begin(); it != dir->end(); it++) {
CDentry *dn = it->second;
// -- dentry
dout(7) << "encode_export_dir exporting " << *dn << dendl;
- // name
+ // dn name
::_encode(it->first, enc_dir);
// state
- it->second->encode_export_state(enc_dir);
+ dn->encode_export(enc_dir);
// points to...
// -- inode
enc_dir.append("I", 1); // inode dentry
- encode_export_inode(in, enc_dir, newauth, exported_client_map, now); // encode, and (update state for) export
+ encode_export_inode(in, enc_dir, exported_client_map, now); // encode, and (update state for) export
// directory?
list<CDir*> dfs;
subdirs.push_back(dir); // it's ours, recurse (later)
}
}
-
- // waiters
- list<Context*> waiters;
- in->take_waiting(CInode::WAIT_ANY, waiters);
- fin->take(waiters);
}
// add to dirstatelist
// subdirs
for (list<CDir*>::iterator it = subdirs.begin(); it != subdirs.end(); it++)
- num_exported += encode_export_dir(dirstatelist, fin, basedir, *it, newauth,
- exported_client_map, now);
+ num_exported += encode_export_dir(dirstatelist, *it, exported_client_map, now);
return num_exported;
}
+void Migrator::finish_export_dir(CDir *dir, C_Contexts *fin, utime_t now)
+{
+ dout(10) << "finish_export_dir " << *dir << dendl;
+
+ // release open_by
+ dir->clear_replica_map();
+
+ // mark
+ assert(dir->is_auth());
+ dir->state_clear(CDir::STATE_AUTH);
+ dir->replica_nonce = CDir::NONCE_EXPORT;
+
+ if (dir->is_dirty())
+ dir->mark_clean();
+
+ // discard most dir state
+ dir->state &= CDir::MASK_STATE_EXPORT_KEPT; // i only retain a few things.
+
+ // suck up all waiters
+ list<Context*> waiting;
+ dir->take_waiting(CDir::WAIT_ANY, waiting); // all dir waiters
+ fin->take(waiting);
+
+ // pop
+ dir->pop_auth_subtree_nested -= dir->pop_auth_subtree;
+ dir->pop_me.zero(now);
+ dir->pop_auth_subtree.zero(now);
+
+ // dentries
+ list<CDir*> subdirs;
+ CDir::map_t::iterator it;
+ for (it = dir->begin(); it != dir->end(); it++) {
+ CDentry *dn = it->second;
+ CInode *in = dn->get_inode();
+
+ // dentry
+ dn->finish_export();
+
+ // inode?
+ if (dn->is_primary()) {
+ finish_export_inode(in, fin);
+
+ // subdirs?
+ in->get_nested_dirfrags(subdirs);
+ }
+ }
+
+ // subdirs
+ for (list<CDir*>::iterator it = subdirs.begin(); it != subdirs.end(); it++)
+ finish_export_dir(*it, fin, now);
+}
class C_MDS_ExportFinishLogged : public Context {
Migrator *migrator;
export_warning_ack_waiting.erase(dir);
export_state[dir] = EXPORT_LOGGINGFINISH;
- export_data.erase(dir);
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
+
+
/*
* this happens if hte dest failes after i send teh export data but before it is acked
* that is, we don't know they safely received and logged it, so we reverse our changes
dout(7) << "export_reverse " << *dir << dendl;
assert(export_state[dir] == EXPORT_EXPORTING);
- assert(export_data.count(dir));
set<CDir*> bounds;
cache->get_subtree_bounds(dir, bounds);
bd->state_clear(CDir::STATE_EXPORTBOUND);
}
- // re-import the metadata
- map<int,entity_inst_t> imported_client_map;
- int off = 0;
- ::_decode(imported_client_map, export_data[dir].front(), off);
- export_data[dir].pop_front();
-
- while (!export_data[dir].empty()) {
- decode_import_dir(export_data[dir].front(),
- export_peer[dir],
- dir, // import root
- 0,
- imported_client_map,
- 0);
- export_data[dir].pop_front();
- }
-
// process delayed expires
cache->process_delayed_expire(dir);
// some clean up
- export_data.erase(dir);
export_warning_ack_waiting.erase(dir);
export_notify_ack_waiting.erase(dir);
dout(7) << "not sending MExportDirFinish, dest has failed" << dendl;
}
+ // finish export (adjust local cache state)
+ C_Contexts *fin = new C_Contexts;
+ finish_export_dir(dir, fin, g_clock.now());
+ dir->add_waiter(CDir::WAIT_UNFREEZE, fin);
+
// unfreeze
dout(7) << "export_finish unfreezing" << dendl;
dir->unfreeze_tree();
// export fun
map<CDir*,int> export_state;
map<CDir*,int> export_peer;
- map<CDir*,list<bufferlist> > export_data; // only during EXPORTING state
+ //map<CDir*,list<bufferlist> > export_data; // only during EXPORTING state
map<CDir*,set<int> > export_warning_ack_waiting;
map<CDir*,set<int> > export_notify_ack_waiting;
void export_dir_nicely(CDir *dir, int dest);
void maybe_do_queued_export();
- void encode_export_inode(CInode *in, bufferlist& enc_state, int newauth,
+ void encode_export_inode(CInode *in, bufferlist& enc_state,
map<int,entity_inst_t>& exported_client_map,
utime_t now);
+ void finish_export_inode(CInode *in, C_Contexts *fin);
int encode_export_dir(list<bufferlist>& dirstatelist,
- class C_Contexts *fin,
- CDir *basedir,
CDir *dir,
- int newauth,
map<int,entity_inst_t>& exported_client_map,
utime_t now);
+ void finish_export_dir(CDir *dir, class C_Contexts *fin, utime_t now);
void add_export_finish_waiter(CDir *dir, Context *c) {
export_finish_waiters[dir].push_back(c);
!(*p)->can_auth_pin()) {
// wait
dout(10) << " waiting for authpinnable on " << **p << dendl;
- (*p)->add_waiter(CDir::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
+ (*p)->add_waiter(CDir::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
mdr->drop_local_auth_pins();
return;
}
if (want_auth) {
if (ref->is_frozen()) {
dout(7) << "waiting for !frozen/authpinnable on " << *ref << dendl;
- ref->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
+ ref->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
return 0;
}
mdr->auth_pin(ref);
// make sure we can auth_pin (or have already authpinned) dir
if (dir->is_frozen()) {
dout(7) << "waiting for !frozen/authpinnable on " << *dir << dendl;
- dir->add_waiter(CInode::WAIT_AUTHPINNABLE, new C_MDS_RetryRequest(mdcache, mdr));
+ dir->add_waiter(CInode::WAIT_UNFREEZE, new C_MDS_RetryRequest(mdcache, mdr));
return 0;
}
// commit anchor update?
if (mdr->dst_reanchor_atid)
- mds->anchorclient->commit(mdr->dst_reanchor_atid);
+ mds->anchorclient->commit(mdr->dst_reanchor_atid, mdr->ls);
// bump pop
//mds->balancer->hit_dir(mdr->now, dn->dir, META_POP_DWR);
// commit anchor update?
if (mdr->dst_reanchor_atid)
- mds->anchorclient->commit(mdr->dst_reanchor_atid);
+ mds->anchorclient->commit(mdr->dst_reanchor_atid, mdr->ls);
//mds->balancer->hit_dir(mdr->now, dn->dir, META_POP_DWR);
_rename_apply(mdr, srcdn, destdn, straydn);
// commit anchor updates?
- if (mdr->src_reanchor_atid) mds->anchorclient->commit(mdr->src_reanchor_atid);
- if (mdr->dst_reanchor_atid) mds->anchorclient->commit(mdr->dst_reanchor_atid);
+ if (mdr->src_reanchor_atid) mds->anchorclient->commit(mdr->src_reanchor_atid, mdr->ls);
+ if (mdr->dst_reanchor_atid) mds->anchorclient->commit(mdr->dst_reanchor_atid, mdr->ls);
// bump popularity
//if (srcdn->is_auth())
// bump popularity
//if (srcdn->is_auth())
//mds->balancer->hit_dir(mdr->now, srcdn->get_dir(), META_POP_DWR);
- if (destdn->inode->is_auth())
+ if (destdn->inode && destdn->inode->is_auth())
mds->balancer->hit_inode(mdr->now, destdn->inode, META_POP_IWR);
// done.
// commit
_rename_apply(mdr, srcdn, destdn, straydn);
+ if (mdr->inode_export) {
+ C_Contexts *fin = new C_Contexts;
+ mdcache->migrator->finish_export_inode(mdr->inode_export, fin);
+ mds->queue_waiter(fin);
+ }
+
// write a commit to the journal
le = new ESlaveUpdate(mdlog, "slave_rename_commit", mdr->reqid, mdr->slave_to_mds, ESlaveUpdate::OP_COMMIT);
} else {
map<int,entity_inst_t> exported_client_map;
bufferlist inodebl;
- mdcache->migrator->encode_export_inode(mdr->srcdn->inode, inodebl, mdr->slave_to_mds,
+ mdcache->migrator->encode_export_inode(mdr->srcdn->inode, inodebl,
exported_client_map,
mdr->now);
::_encode(exported_client_map, reply->inode_export);
reply->inode_export_v = mdr->srcdn->inode->inode.version;
- mdr->inode_import = reply->inode_export; // keep a copy locally, in case we have to rollback
-
mds->send_message_mds(reply, mdr->slave_to_mds, MDS_PORT_SERVER);
// clean up.
dir->commit(0, gather->new_sub());
} else {
dout(10) << " waiting for unfreeze on " << *dir << dendl;
- dir->add_waiter(CDir::WAIT_AUTHPINNABLE, gather->new_sub());
+ dir->add_waiter(CDir::WAIT_UNFREEZE, gather->new_sub());
}
}
}
(*p)->dirlock.add_waiter(SimpleLock::WAIT_STABLE, gather->new_sub());
}
- //
+ // pending commit atids
+ for (hash_set<version_t>::iterator p = pending_commit_atids.begin();
+ p != pending_commit_atids.end();
+ ++p) {
+ if (!gather) gather = new C_Gather;
+ assert(!mds->anchorclient->has_committed(*p));
+ dout(10) << " anchor transaction " << *p
+ << " pending commit (not yet acked), waiting" << dendl;
+ mds->anchorclient->wait_for_ack(*p, gather->new_sub());
+ }
return gather;
}
else
// pbly about to export|split|merge.
// just wait for it to unfreeze, then retry
- p->first->add_waiter(CDir::WAIT_AUTHPINNABLE, gather->new_sub());
+ p->first->add_waiter(CDir::WAIT_UNFREEZE, gather->new_sub());
}
for (list<CDir*>::iterator p = waitfor_export.begin();
p != waitfor_export.end();
void EMetaBlob::update_segment(LogSegment *ls)
{
// atids?
- for (list<version_t>::iterator p = atids.begin(); p != atids.end(); ++p)
- ls->atids.insert(*p);
+ //for (list<version_t>::iterator p = atids.begin(); p != atids.end(); ++p)
+ // ls->pending_commit_atids[*p] = ls;
+ // -> handled directly by AnchorClient
// dirty inode mtimes
// -> handled directly by Server.cc, replay()
ls->allocv = alloc_tablev;
// truncated inodes
- // -> handled directory by Server.cc
+ // -> handled directly by Server.cc
// client requests
// note the newest request per client
p != atids.end();
++p) {
dout(10) << "EMetaBlob.replay noting anchor transaction " << *p << dendl;
- mds->anchorclient->got_journaled_agree(*p);
+ mds->anchorclient->got_journaled_agree(*p, logseg);
}
// dirtied inode mtimes
// -- wait --
const static int WAIT_SINGLEAUTH = (1<<30);
- const static int WAIT_AUTHPINNABLE = (1<<29);
- const static int WAIT_UNFREEZE = WAIT_AUTHPINNABLE;
+ const static int WAIT_UNFREEZE = (1<<29); // pka AUTHPINNABLE
// ============================================
void set_dirstate(const list<bufferlist>& ls) {
dirstate = ls;
}
+ void take_dirstate(list<bufferlist>& ls) {
+ dirstate.swap(ls);
+ }
void add_export(dirfrag_t df) {
bounds.push_back(df);
}