snapid_t seq = in->cap_snap_pending;
dout(10) << "put_cap_ref doing cap_snap_pending " << seq << " on " << *in << dendl;
in->cap_snap_pending = 0;
- in->queue_cap_snap(in, seq);
+ queue_cap_snap(in, seq);
signal_cond_list(in->waitfor_caps); // wake up blocked sync writers
}
if (cap & CEPH_CAP_WRBUFFER) {
}
}
+struct C_SnapFlush : public Context {
+ Client *client;
+ Inode *in;
+ snapid_t seq;
+ C_SnapFlush(Client *c, Inode *i, snapid_t s) : client(c), in(i), seq(s) {}
+ void finish(int r) {
+ client->_flushed_cap_snap(in, seq);
+ }
+};
+
void Client::queue_cap_snap(Inode *in, snapid_t seq)
{
int used = in->caps_used();
} else if (used & CEPH_CAP_WR) {
dout(10) << "queue_cap_snap WR used, marking cap_snap_pending on " << *in << dendl;
in->cap_snap_pending = seq;
- } else {
- if (used & CEPH_CAP_WRBUFFER) {
- dout(10) << "queue_cap_snap took cap_snap, WRBUFFER used, flushing data" << dendl;
- in->take_cap_snap(1, seq);
- if (g_conf.client_oc)
- _flush(in);
- } else {
- dout(10) << "queue_cap_snap took cap_snap, no WR|WRBUFFER used, flushing cap snap" << dendl;
- in->take_cap_snap(0, seq);
+ } else if (used & CEPH_CAP_WRBUFFER) {
+ dout(10) << "queue_cap_snap took cap_snap, WRBUFFER used, flushing data on " << *in << dendl;
+ in->take_cap_snap(true, seq);
+ in->get();
+ if (g_conf.client_oc)
+ _flush(in, new C_SnapFlush(this, in, seq));
+ else {
+ /*
+ * FIXME: in sync mode, we have no way to wait for prior writes to commit
+ * without getting starved. so don't wait, for now.
+ */
+ dout(10) << "queue_cap_snap FIXME can't safely flush in sync mode, skipping" << dendl;
+ in->cap_snaps[seq].flushing = false;
flush_snaps(in);
}
+ } else {
+ dout(10) << "queue_cap_snap took cap_snap, no WR|WRBUFFER used, flushing cap snap on " << *in << dendl;
+ in->take_cap_snap(false, seq);
+ in->get();
+ flush_snaps(in);
}
}
+void Client::_flushed_cap_snap(Inode *in, snapid_t seq)
+{
+ dout(10) << "_flushed_cap_snap seq " << seq << " on " << *in << dendl;
+ assert(in->cap_snaps.count(seq));
+ in->cap_snaps[seq].flushing = 0;
+ flush_snaps(in);
+}
+
void Client::flush_snaps(Inode *in)
{
dout(10) << "flush_snaps on " << *in << dendl;
// pick auth mds
int mds = -1;
+ int mseq;
for (map<int,InodeCap*>::iterator p = in->caps.begin(); p != in->caps.end(); p++) {
if (p->second->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL)) {
mds = p->first;
+ mseq = p->second->mseq;
break;
}
}
assert(mds >= 0);
- for (map<snapid_t,SnapCap>::iterator p = in->cap_snaps.begin(); p != in->cap_snaps.end(); p++) {
+ for (map<snapid_t,CapSnap>::iterator p = in->cap_snaps.begin(); p != in->cap_snaps.end(); p++) {
dout(10) << "flush_snaps mds" << mds
<< " follows " << p->first
<< " size " << p->second.size
<< " mtime " << p->second.mtime
+ << " still flushing=" << p->second.flushing
<< " on " << *in << dendl;
- MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino());
+ if (p->second.flushing)
+ continue;
+ MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino(), mseq);
+ m->head.caps = in->caps_issued();
m->head.snap_follows = p->first;
m->head.size = p->second.size;
p->second.mtime.encode_timeval(&m->head.mtime);
// queue for snap writeback
queue_cap_snap(in);
- if (g_conf.client_oc)
- _flush(in);
-
if (in->snaprealm) {
put_snap_realm(in->snaprealm);
in->snaprealm_item.remove_myself();
assert(in->caps[mds]);
snapid_t follows = m->get_snap_follows();
- dout(5) << "handle_cap_flushedsnap mds" << mds << " flushed snap follows " << follows
- << " on " << *in << dendl;
- assert(in->cap_snaps.count(follows));
- in->cap_snaps.erase(follows);
-
+ if (in->cap_snaps.count(follows)) {
+ dout(5) << "handle_cap_flushedsnap mds" << mds << " flushed snap follows " << follows
+ << " on " << *in << dendl;
+ in->cap_snaps.erase(follows);
+ put_inode(in);
+ } else {
+ dout(5) << "handle_cap_flushedsnap DUP(?) mds" << mds << " flushed snap follows " << follows
+ << " on " << *in << dendl;
+ // we may not have it if we send multiple FLUSHSNAP requests and (get multiple FLUSHEDSNAPs back)
+ }
+
delete m;
}
{
//dout(3) << "op: client->close(open_files[ " << fh << " ]);" << dendl;
//dout(3) << "op: open_files.erase( " << fh << " );" << dendl;
- dout(5) << "_release " << f << " mode " << f->mode << dendl;
Inode *in = f->inode;
+ dout(5) << "_release " << f << " mode " << f->mode << " on " << *in << dendl;
if (in->snapid == CEPH_NOSNAP) {
if (in->put_open_ref(f->mode)) {
InodeCap() : issued(0), implemented(0), seq(0), mseq(0) {}
};
-struct SnapCap {
+struct CapSnap {
//snapid_t follows; // map key
__u64 size;
utime_t mtime;
- int flushing;
- SnapCap() : flushing(0) {}
+ bool flushing;
+ CapSnap() : flushing(false) {}
};
SnapRealm *snaprealm;
xlist<Inode*>::item snaprealm_item;
Inode *snapdir_parent; // only if we are a snapdir inode
- map<snapid_t,SnapCap> cap_snaps; // pending flush to mds
+ map<snapid_t,CapSnap> cap_snaps; // pending flush to mds
snapid_t cap_snap_pending;
//int open_by_mode[CEPH_FILE_MODE_NUM];
return want;
}
- void take_cap_snap(int flushing, snapid_t seq) {
+ void take_cap_snap(bool flushing, snapid_t seq) {
assert(snaprealm);
cap_snaps[seq].size = inode.size;
cap_snaps[seq].mtime = inode.mtime;
void check_caps(Inode *in, bool is_delayed);
void put_cap_ref(Inode *in, int cap);
void flush_snaps(Inode *in);
- void queue_cap_snap(Inode *in);
+ void queue_cap_snap(Inode *in, snapid_t seq=0);
+ void _flushed_cap_snap(Inode *in, snapid_t seq);
void _release(Inode *in, bool checkafter=true);
void _flush(Inode *in, Context *onfinish=NULL);
if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
dout(7) << " flushsnap follows " << follows
<< " client" << client << " on " << *in << dendl;
- int had = cap->confirm_receipt(m->get_seq(), m->get_caps());
- int has = cap->confirmed();
-
// this cap now follows a later snap (i.e. the one initiating this flush, or later)
cap->client_follows = follows+1;
MClientCaps *ack = new MClientCaps(CEPH_CAP_OP_FLUSHEDSNAP, in->inode, 0, 0, 0, 0, 0);
ack->set_snap_follows(follows);
- _do_cap_update(in, has|had, in->get_caps_wanted(), follows, m, ack);
+ _do_cap_update(in, m->get_caps(), in->get_caps_wanted(), follows, m, ack);
} else {
// for this and all subsequent versions of this inode,