for (hash_map<inodeno_t, Inode*>::iterator p = inode_map.begin();
p != inode_map.end();
p++) {
- if (p->second->caps.count(mds)) {
+ Inode *in = p->second;
+ if (in->caps.count(mds)) {
dout(10) << " caps on " << p->first
- << " " << cap_string(p->second->caps[mds].issued)
- << " wants " << cap_string(p->second->caps_wanted())
+ << " " << cap_string(in->caps[mds].issued)
+ << " wants " << cap_string(in->caps_wanted())
<< dendl;
- p->second->caps[mds].seq = 0; // reset seq.
+ in->caps[mds].seq = 0; // reset seq.
m->add_inode_caps(p->first, // ino
- p->second->caps_wanted(), // wanted
- p->second->caps[mds].issued, // issued
- p->second->inode.size, p->second->inode.mtime, p->second->inode.atime);
+ in->caps_wanted(), // wanted
+ in->caps[mds].issued, // issued
+ in->inode.size, in->inode.mtime, in->inode.atime);
filepath path;
- p->second->make_path(path);
+ in->make_path(path);
dout(10) << " path on " << p->first << " is " << path << dendl;
m->add_inode_path(p->first, path.get_path());
}
- /*
- if (p->second->stale_caps.count(mds)) {
- dout(10) << " clearing stale caps on " << p->first << dendl;
- p->second->stale_caps.erase(mds); // hrm, is this right?
+ if (in->exporting_mds == mds) {
+ dout(10) << " clearing exporting_caps on " << p->first << dendl;
+ in->exporting_mds = -1;
+ in->exporting_issued = 0;
+ in->exporting_mseq = 0;
}
- */
}
// reset my cap seq number
in->inode,
it->second.seq,
it->second.issued,
- wanted);
+ wanted,
+ 0);
in->reported_size = in->inode.size;
m->set_max_size(in->wanted_max_size);
in->requested_max_size = in->wanted_max_size;
messenger->send_message(m, mdsmap->get_inst(it->first));
- if (wanted == 0) {
- it->second.seq = 0;
- if (it->second.can_drop()) {
- mds_sessions[it->first].num_caps--;
- in->caps.erase(it);
- }
- }
+ if (wanted == 0)
+ mds_sessions[it->first].num_caps--;
}
- if (in->caps.empty()) {
+ if (wanted == 0) {
dout(10) << "last caps on " << *in << dendl;
+ in->caps.clear();
put_inode(in);
}
}
return;
}
- // reap?
if (m->get_op() == CEPH_CAP_OP_IMPORT) {
- int other = m->get_migrate_mds();
-
- /*
- * FIXME: handle mds failures
- */
-
// fresh from new mds?
if (!in->caps.count(mds)) {
mds_sessions[mds].num_caps++;
- if (in->caps.empty()) in->get();
- in->caps[mds].seq = m->get_seq();
- in->caps[mds].issued = m->get_caps();
- }
-
- if (in->caps.count(other) &&
- in->caps[other].issued_exporting) {
- dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " IMPORT to mds" << mds << " from mds" << other
- << " - already saw EXPORT, done" << dendl;
- in->caps[other].issued_exporting = 0;
- if (in->caps[other].can_drop()) {
- mds_sessions[other].num_caps--;
- in->caps.erase(other);
+ if (in->caps.empty())
+ in->get();
+ if (in->exporting_mds == mds) {
+ dout(10) << " clearing exporting_caps on " << mds << dendl;
+ in->exporting_mds = -1;
+ in->exporting_issued = 0;
+ in->exporting_mseq = 0;
}
+ }
+ in->caps[mds].seq = m->get_seq();
+ in->caps[mds].issued = m->get_caps();
+ in->caps[mds].mseq = m->get_mseq();
+
+ if (in->exporting_mseq < m->get_mseq()) {
+ dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq()
+ << " IMPORT from mds" << mds << ", clearing exporting_issued " << cap_string(in->exporting_issued)
+ << " mseq " << in->exporting_mseq << dendl;
+ in->exporting_issued = 0;
+ in->exporting_mseq = 0;
+ in->exporting_mds = -1;
} else {
- dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " IMPORT to mds" << mds << " from mds" << other
- << " - no EXPORT yet, marking" << dendl;
- in->caps[mds].importing_from.insert(other);
+ dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq()
+ << " IMPORT from mds" << mds << ", keeping exporting_issued " << cap_string(in->exporting_issued)
+ << " mseq " << in->exporting_mseq << " by mds" << in->exporting_mds << dendl;
}
-
- // fall-thru!
- // (wake ppl up, etc.)
+ delete m;
+ return;
+ }
+
+ // don't have cap?
+ // (it may be that we've reopened the file locally,
+ // and thus want caps, but don't have a cap yet.
+ // we should never reply to a cap out of turn.)
+ if (in->caps.count(mds) == 0) {
+ // silently drop.
+ dout(5) << "handle_file_caps on ino " << m->get_ino()
+ << " seq " << m->get_seq()
+ << " " << cap_string(m->get_caps())
+ << ", don't have this cap, silently dropping." << dendl;
+ delete m;
+ return;
}
+
+ // ok!
+ InodeCap &cap = in->caps[mds];
+
// stale?
if (m->get_op() == CEPH_CAP_OP_EXPORT) {
- // move to stale list
- bool found_importing = false;
+ // note
+ bool found_higher_mseq = false;
for (map<int,InodeCap>::iterator p = in->caps.begin();
p != in->caps.end();
p++) {
if (p->first == mds) continue;
- if (p->second.importing_from.count(mds) == 0) continue;
-
- dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " EXPORT from mds" << mds
- << " - already saw IMPORT, done" << dendl;
- p->second.importing_from.erase(mds);
- mds_sessions[mds].num_caps--;
- in->caps.erase(mds);
- found_importing = true;
- if (p->second.can_drop()) {
- mds_sessions[p->first].num_caps--;
- in->caps.erase(p);
- if (in->caps.empty())
- put_inode(in);
+ if (p->second.mseq > m->get_mseq()) {
+ found_higher_mseq = true;
+ dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq()
+ << " EXPORT from mds" << mds
+ << ", but mds" << p->first << " has higher mseq " << p->second.mseq << dendl;
+ break;
}
- break;
- }
- if (!found_importing) {
- dout(5) << "handle_file_caps ino " << m->get_ino() << " seq " << m->get_seq() << " EXPORT from mds" << mds
- << " - no IMPORT yet, marking" << dendl;
- if (in->caps.empty()) in->get();
- InodeCap &cap = in->caps[mds];
- cap.issued_exporting = in->caps[mds].issued;
- cap.issued = 0;
- cap.seq = 0;
}
+ if (!found_higher_mseq) {
+ dout(5) << "handle_file_caps ino " << m->get_ino() << " mseq " << m->get_mseq()
+ << " EXPORT from mds" << mds
+ << ", setting exporting_issued " << cap_string(cap.issued) << dendl;
+ in->exporting_issued = cap.issued;
+ in->exporting_mseq = m->get_mseq();
+ in->exporting_mds = mds;
+ in->caps.erase(mds);
+ if (in->caps.empty())
+ put_inode(in);
+ }
+
delete m;
return;
}
return;
}
- // don't have cap?
- // (it may be that we've reopened the file locally,
- // and thus want caps, but don't have a cap yet.
- // we should never reply to a cap out of turn.)
- if (in->caps.count(mds) == 0) {
- // silently drop.
- dout(5) << "handle_file_caps on ino " << m->get_ino()
- << " seq " << m->get_seq()
- << " " << cap_string(m->get_caps())
- << ", don't have this cap, silently dropping." << dendl;
- delete m;
- return;
- }
-
-
- // ok!
- InodeCap &cap = in->caps[mds];
cap.seq = m->get_seq();
// don't want it?
m->set_caps(0);
m->set_wanted(0);
messenger->send_message(m, m->get_source_inst());
- cap.seq = 0;
- if (cap.can_drop()) {
- mds_sessions[mds].num_caps--;
- in->caps.erase(mds);
- if (in->caps.empty())
- put_inode(in);
- }
+
+ // FIXME...
+
return;
}
dout(7) << " first caps on " << in->inode.ino << dendl;
in->get();
}
- if (in->caps.count(mds) == 0)
+ if (in->caps.count(mds) == 0) {
mds_sessions[mds].num_caps++;
+ if (in->exporting_mds == mds) {
+ dout(10) << " clearing exporting_caps on " << mds << dendl;
+ in->exporting_mds = -1;
+ in->exporting_issued = 0;
+ in->exporting_mseq = 0;
+ }
+ }
InodeCap &cap = in->caps[mds];
int new_caps = reply->get_file_caps();
cap.issued |= new_caps;
cap.implemented |= new_caps;
cap.seq = reply->get_file_caps_seq();
+ cap.mseq = reply->get_file_caps_mseq();
// any caps?
if (new_caps & ~old_caps)
public:
unsigned issued;
unsigned implemented;
- unsigned seq;
+ __u64 seq;
+ __u32 mseq; // migration seq
- int issued_exporting; // if export comes first
- set<int> importing_from; // if import comes first
-
- InodeCap() : issued(0), implemented(0), seq(0),
- issued_exporting(0) {}
-
- bool can_drop() {
- return issued == 0 &&
- issued_exporting == 0 &&
- importing_from.empty();
- }
+ InodeCap() : issued(0), implemented(0), seq(0), mseq(0) {}
};
// per-mds caps
map<int,InodeCap> caps; // mds -> InodeCap
+ unsigned exporting_issued;
+ int exporting_mds;
+ capseq_t exporting_mseq;
//int open_by_mode[CEPH_FILE_MODE_NUM];
map<int,int> open_by_mode;
//inode(_inode),
lease_mask(0), lease_mds(-1),
dir_auth(-1), dir_hashed(false), dir_replicated(false),
+ exporting_issued(0), exporting_mds(-1), exporting_mseq(0),
reported_size(0), wanted_max_size(0), requested_max_size(0),
ref(0), ll_ref(0),
dir(0), dn(0), symlink(0),
bool put_cap_ref(int cap);
int caps_issued() {
- int c = 0;
+ int c = exporting_issued;
for (map<int,InodeCap>::iterator it = caps.begin();
it != caps.end();
it++)
- c |= it->second.issued | it->second.issued_exporting;
+ c |= it->second.issued;
return c;
}