// CDir
CDir::CDir(CInode *in, frag_t fg, MDCache *mdcache, bool auth) :
+ mseq(0),
dirty_rstat_inodes(member_offset(CInode, dirty_rstat_item)),
item_dirty(this), item_new(this),
pop_me(ceph_clock_now(g_ceph_context)),
void CDir::encode_export(bufferlist& bl)
{
assert(!is_projected());
+ ceph_seq_t seq = mseq + 1;
+ ::encode(seq, bl);
::encode(first, bl);
::encode(fnode, bl);
::encode(dirty_old_rstat, bl);
void CDir::decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls)
{
+ ::decode(mseq, blp);
::decode(first, blp);
::decode(fnode, blp);
::decode(dirty_old_rstat, blp);
fnode_t fnode;
snapid_t first;
+ ceph_seq_t mseq; // migrate sequence
map<snapid_t,old_rstat_t> dirty_old_rstat; // [value.first,key]
// my inodes with dirty rstat data
// -- import/export --
void encode_export(bufferlist& bl);
void finish_export(utime_t now);
- void abort_export() {
+ void abort_export() {
+ mseq += 2;
put(PIN_TEMPEXPORTING);
}
void decode_import(bufferlist::iterator& blp, utime_t now, LogSegment *ls);
dout(20) << fg << " fragstat " << pf->fragstat << dendl;
dout(20) << fg << " accounted_fragstat " << pf->accounted_fragstat << dendl;
::encode(fg, tmp);
+ ::encode(dir->mseq, tmp);
::encode(dir->first, tmp);
::encode(pf->fragstat, tmp);
::encode(pf->accounted_fragstat, tmp);
dout(10) << fg << " " << pf->rstat << dendl;
dout(10) << fg << " " << dir->dirty_old_rstat << dendl;
::encode(fg, tmp);
+ ::encode(dir->mseq, tmp);
::encode(dir->first, tmp);
::encode(pf->rstat, tmp);
::encode(pf->accounted_rstat, tmp);
dout(10) << " ...got " << n << " fragstats on " << *this << dendl;
while (n--) {
frag_t fg;
+ ceph_seq_t mseq;
snapid_t fgfirst;
frag_info_t fragstat;
frag_info_t accounted_fragstat;
::decode(fg, p);
+ ::decode(mseq, p);
::decode(fgfirst, p);
::decode(fragstat, p);
::decode(accounted_fragstat, p);
assert(dir); // i am auth; i had better have this dir open
dout(10) << fg << " first " << dir->first << " -> " << fgfirst
<< " on " << *dir << dendl;
+ if (dir->fnode.fragstat.version == get_projected_inode()->dirstat.version &&
+ ceph_seq_cmp(mseq, dir->mseq) < 0) {
+ dout(10) << " mseq " << mseq << " < " << dir->mseq << ", ignoring" << dendl;
+ continue;
+ }
+ dir->mseq = mseq;
dir->first = fgfirst;
dir->fnode.fragstat = fragstat;
dir->fnode.accounted_fragstat = accounted_fragstat;
::decode(n, p);
while (n--) {
frag_t fg;
+ ceph_seq_t mseq;
snapid_t fgfirst;
nest_info_t rstat;
nest_info_t accounted_rstat;
map<snapid_t,old_rstat_t> dirty_old_rstat;
::decode(fg, p);
+ ::decode(mseq, p);
::decode(fgfirst, p);
::decode(rstat, p);
::decode(accounted_rstat, p);
assert(dir); // i am auth; i had better have this dir open
dout(10) << fg << " first " << dir->first << " -> " << fgfirst
<< " on " << *dir << dendl;
+ if (dir->fnode.rstat.version == get_projected_inode()->rstat.version &&
+ ceph_seq_cmp(mseq, dir->mseq) < 0) {
+ dout(10) << " mseq " << mseq << " < " << dir->mseq << ", ignoring" << dendl;
+ continue;
+ }
+ dir->mseq = mseq;
dir->first = fgfirst;
dir->fnode.rstat = rstat;
dir->fnode.accounted_rstat = accounted_rstat;
}
}
+/*
+ * set dirfrag_version to inode_version - 1. so that we can use dirfrag version
+ * to check if we have gathered scatter state for a given dirfrag.
+ */
+void CInode::start_scatter_gather(ScatterLock *lock, int auth)
+{
+ assert(is_auth());
+ inode_t *pi = get_projected_inode();
+
+ for (map<frag_t,CDir*>::iterator p = dirfrags.begin();
+ p != dirfrags.end();
+ ++p) {
+ CDir *dir = p->second;
+
+ if (dir->is_auth())
+ continue;
+ if (auth >= 0 && dir->authority().first != auth)
+ continue;
+
+ switch (lock->get_type()) {
+ case CEPH_LOCK_IFILE:
+ dir->fnode.fragstat.version = pi->dirstat.version - 1;
+ break;
+ case CEPH_LOCK_INEST:
+ dir->fnode.rstat.version = pi->rstat.version - 1;
+ break;
+ }
+ }
+}
+
struct C_Inode_FragUpdate : public Context {
CInode *in;
CDir *dir;
void clear_scatter_dirty(); // on rejoin ack
void start_scatter(ScatterLock *lock);
+ void start_scatter_gather(ScatterLock *lock, int auth=-1);
void finish_scatter_update(ScatterLock *lock, CDir *dir,
version_t inode_version, version_t dir_accounted_version);
void finish_scatter_gather_update(int type);
lock->get_parent()->is_replicated()) {
dout(10) << " finished (local) gather for mix->lock, now gathering from replicas" << dendl;
send_lock_message(lock, LOCK_AC_LOCK);
- lock->init_gather();
lock->set_state(LOCK_MIX_LOCK2);
+ lock->init_gather();
+ in->start_scatter_gather(static_cast<ScatterLock *>(lock));
return;
}
assert(lock->is_stable());
CInode *in = 0;
- if (lock->get_cap_shift())
+ if (lock->get_type() != CEPH_LOCK_DN)
in = static_cast<CInode *>(lock->get_parent());
int old_state = lock->get_state();
if (lock->get_parent()->is_replicated() && old_state == LOCK_MIX) {
send_lock_message(lock, LOCK_AC_SYNC);
lock->init_gather();
+ in->start_scatter_gather(static_cast<ScatterLock *>(lock));
gather++;
}
- if (in && in->is_head()) {
+ if (lock->get_cap_shift() && in->is_head()) {
if (in->issued_caps_need_gather(lock)) {
if (need_issue)
*need_issue = true;
assert(lock->get_state() != LOCK_LOCK);
CInode *in = 0;
- if (lock->get_cap_shift())
+ if (lock->get_type() != CEPH_LOCK_DN)
in = static_cast<CInode *>(lock->get_parent());
int old_state = lock->get_state();
}
if (lock->is_rdlocked())
gather++;
- if (in && in->is_head()) {
+ if (lock->get_cap_shift() && in->is_head()) {
if (in->issued_caps_need_gather(lock)) {
if (need_issue)
*need_issue = true;
gather++;
send_lock_message(lock, LOCK_AC_LOCK);
lock->init_gather();
+ if (lock->get_state() == LOCK_MIX_LOCK2)
+ in->start_scatter_gather(static_cast<ScatterLock *>(lock));
}
}
if (lock->get_state() == LOCK_MIX_TSYN &&
in->is_replicated()) {
- lock->init_gather();
send_lock_message(lock, LOCK_AC_LOCK);
+ lock->init_gather();
+ in->start_scatter_gather(static_cast<ScatterLock *>(lock));
gather++;
}
lock->get_state() != LOCK_XSYN_EXCL) { // if we were lock, replicas are already lock.
send_lock_message(lock, LOCK_AC_LOCK);
lock->init_gather();
+ if (lock->get_state() == LOCK_MIX_EXCL)
+ in->start_scatter_gather(static_cast<ScatterLock *>(lock));
gather++;
}
if (lock->is_leased()) {
dout(15) << " add_strong_dirfrag " << *dir << dendl;
rejoin->add_strong_dirfrag(dir->dirfrag(), dir->get_replica_nonce(), dir->get_dir_rep());
dir->state_set(CDir::STATE_REJOINING);
+ dir->mseq = 0;
for (CDir::map_t::iterator p = dir->items.begin();
p != dir->items.end();
++p) {
CInode *in = get_inode(p->first);
assert(in);
+ if (survivor) {
+ in->start_scatter_gather(&in->filelock, from);
+ in->start_scatter_gather(&in->nestlock, from);
+ } else {
+ rejoin_potential_updated_scatterlocks.insert(in);
+ }
in->decode_lock_state(CEPH_LOCK_IFILE, p->second.file);
in->decode_lock_state(CEPH_LOCK_INEST, p->second.nest);
in->decode_lock_state(CEPH_LOCK_IDFT, p->second.dft);
- if (!survivor)
- rejoin_potential_updated_scatterlocks.insert(in);
}
// recovering peer may send incorrect dirfrags here. we need to
#include "SessionMap.h"
-#define CEPH_MDS_PROTOCOL 17 /* cluster internal */
+#define CEPH_MDS_PROTOCOL 18 /* cluster internal */
enum {