void Client::put_cap_ref(Inode *in, int cap)
{
if (in->put_cap_ref(cap) && in->snapid == CEPH_NOSNAP) {
- if ((cap & CEPH_CAP_WR) && in->cap_snap_pending) {
- snapid_t seq = in->cap_snap_pending;
- dout(10) << "put_cap_ref doing cap_snap_pending " << seq << " on " << *in << dendl;
- in->cap_snap_pending = 0;
- queue_cap_snap(in, seq);
+ if ((cap & CEPH_CAP_WR) &&
+ in->cap_snaps.size() &&
+ in->cap_snaps.begin()->second.writing) {
+ dout(10) << "put_cap_ref finishing pending cap_snap on " << *in << dendl;
+ in->cap_snaps.begin()->second.writing = 0;
+ finish_cap_snap(in, &in->cap_snaps.begin()->second, in->caps_used());
signal_cond_list(in->waitfor_caps); // wake up blocked sync writers
}
if (cap & CEPH_CAP_WRBUFFER) {
+ for (map<snapid_t,CapSnap>::iterator p = in->cap_snaps.begin();
+ p != in->cap_snaps.end();
+ p++)
+ p->second.dirty = 0;
check_caps(in, false);
signal_cond_list(in->waitfor_commit);
}
void Client::queue_cap_snap(Inode *in, snapid_t seq)
{
int used = in->caps_used();
+ dout(10) << "queue_cap_snap " << *in << " seq " << seq << " used " << cap_string(used) << dendl;
- if (in->cap_snap_pending) {
- dout(10) << "queue_cap_snap already cap_snap_pending on " << *in << dendl;
- } else if (used & CEPH_CAP_WR) {
- dout(10) << "queue_cap_snap WR used, marking cap_snap_pending on " << *in << dendl;
- in->cap_snap_pending = seq;
- } else if (used & CEPH_CAP_WRBUFFER) {
- dout(10) << "queue_cap_snap took cap_snap, WRBUFFER used, flushing data on " << *in << dendl;
- in->take_cap_snap(true, seq);
- in->get();
- if (g_conf.client_oc)
- _flush(in, new C_SnapFlush(this, in, seq));
- else {
- /*
- * FIXME: in sync mode, we have no way to wait for prior writes to commit
- * without getting starved. so don't wait, for now.
- */
- dout(10) << "queue_cap_snap FIXME can't safely flush in sync mode, skipping" << dendl;
- in->cap_snaps[seq].flushing = false;
- flush_snaps(in);
- }
+ if (in->cap_snaps.size() &&
+ in->cap_snaps.begin()->second.writing) {
+ dout(10) << "queue_cap_snap already have pending cap_snap on " << *in << dendl;
+ return;
+ }
+
+ in->get();
+ CapSnap *capsnap = &in->cap_snaps[seq];
+ capsnap->context = in->snaprealm->cached_snap_context;
+ capsnap->issued = in->caps_issued();
+
+ capsnap->dirty = (used & CEPH_CAP_WRBUFFER);
+
+ if (used & CEPH_CAP_WR) {
+ dout(10) << "queue_cap_snap WR used on " << *in << dendl;
+ capsnap->writing = 1;
} else {
- dout(10) << "queue_cap_snap took cap_snap, no WR|WRBUFFER used, flushing cap snap on " << *in << dendl;
- in->take_cap_snap(false, seq);
- in->get();
+ finish_cap_snap(in, capsnap, used);
+ }
+}
+
+void Client::finish_cap_snap(Inode *in, CapSnap *capsnap, int used)
+{
+ dout(10) << "finish_cap_snap " << *in << " capsnap " << (void*)capsnap << " used " << cap_string(used) << dendl;
+ capsnap->size = in->inode.size;
+ capsnap->mtime = in->inode.mtime;
+ capsnap->atime = in->inode.atime;
+ capsnap->ctime = in->inode.ctime;
+ capsnap->time_warp_seq = in->inode.time_warp_seq;
+ if (used & CEPH_CAP_WRBUFFER) {
+ dout(10) << "finish_cap_snap " << *in << " cap_snap " << capsnap << " used " << used
+ << " WRBUFFER, delaying" << dendl;
+ } else {
+ capsnap->dirty = 0;
flush_snaps(in);
}
}
{
dout(10) << "_flushed_cap_snap seq " << seq << " on " << *in << dendl;
assert(in->cap_snaps.count(seq));
- in->cap_snaps[seq].flushing = 0;
+ in->cap_snaps[seq].dirty = 0;
flush_snaps(in);
}
<< " follows " << p->first
<< " size " << p->second.size
<< " mtime " << p->second.mtime
- << " still flushing=" << p->second.flushing
+ << " dirty=" << p->second.dirty
+ << " writing=" << p->second.writing
<< " on " << *in << dendl;
- if (p->second.flushing)
+ if (p->second.dirty || p->second.writing)
continue;
MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino(), mseq);
- m->head.caps = in->caps_issued();
m->head.snap_follows = p->first;
m->head.size = p->second.size;
+ m->head.caps = p->second.issued;
+ p->second.ctime.encode_timeval(&m->head.ctime);
p->second.mtime.encode_timeval(&m->head.mtime);
+ p->second.atime.encode_timeval(&m->head.atime);
messenger->send_message(m, mdsmap->get_inst(mds));
}
}
// wait for caps, max_size
while ((lazy && (in->caps_issued() & CEPH_CAP_LAZYIO) == 0) ||
- (!lazy && (in->caps_issued() & CEPH_CAP_WR) == 0) ||
+ (!lazy && (in->caps_issued() & CEPH_CAP_WR) == 0 &&
+ (in->cap_snaps.empty() || !in->cap_snaps.begin()->second.writing)) ||
endoff > in->inode.max_size) {
dout(7) << "missing wr|lazy cap OR endoff " << endoff
<< " > max_size " << in->inode.max_size
wait_on_list(in->waitfor_caps);
}
- // wait for cap snap?
- while (in->cap_snap_pending) {
- dout(7) << "waiting for cap_snap_pending " << in->cap_snap_pending << dendl;
- wait_on_list(in->waitfor_caps);
- }
-
in->get_cap_ref(CEPH_CAP_WR);
// avoid livelock with fsync?
struct CapSnap {
//snapid_t follows; // map key
+ SnapContext context;
+ int issued;
__u64 size;
- utime_t mtime;
- bool flushing;
- CapSnap() : flushing(false) {}
+ utime_t ctime, mtime, atime;
+ version_t time_warp_seq;
+ bool writing, dirty;
+ CapSnap() : issued(0), size(0), time_warp_seq(0), writing(false), dirty(false) {}
};
xlist<Inode*>::item snaprealm_item;
Inode *snapdir_parent; // only if we are a snapdir inode
map<snapid_t,CapSnap> cap_snaps; // pending flush to mds
- snapid_t cap_snap_pending;
//int open_by_mode[CEPH_FILE_MODE_NUM];
map<int,int> open_by_mode;
return want;
}
- void take_cap_snap(bool flushing, snapid_t seq) {
- assert(snaprealm);
- cap_snaps[seq].size = inode.size;
- cap_snaps[seq].mtime = inode.mtime;
- cap_snaps[seq].flushing = flushing;
- }
-
int get_effective_lease_mask(utime_t now) {
int havemask = 0;
if (now < lease_ttl && lease_mds >= 0)
void put_cap_ref(Inode *in, int cap);
void flush_snaps(Inode *in);
void queue_cap_snap(Inode *in, snapid_t seq=0);
+ void finish_cap_snap(Inode *in, CapSnap *capsnap, int used);
void _flushed_cap_snap(Inode *in, snapid_t seq);
void _release(Inode *in, bool checkafter=true);