]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: update cap_snap code to mirror kclient
authorSage Weil <sage@newdream.net>
Mon, 8 Sep 2008 23:46:01 +0000 (16:46 -0700)
committerSage Weil <sage@newdream.net>
Mon, 8 Sep 2008 23:46:01 +0000 (16:46 -0700)
src/client/Client.cc
src/client/Client.h

index 79e6e8dbdf7293d4ebf7d6425e49ee8c0c0df283..d99a9d106b8c49aa39f4b863b8b7885c18af68c3 100644 (file)
@@ -1389,14 +1389,19 @@ bool Inode::put_cap_ref(int cap)
 void Client::put_cap_ref(Inode *in, int cap)
 {
   if (in->put_cap_ref(cap) && in->snapid == CEPH_NOSNAP) {
-    if ((cap & CEPH_CAP_WR) && in->cap_snap_pending) {
-      snapid_t seq = in->cap_snap_pending;
-      dout(10) << "put_cap_ref doing cap_snap_pending " << seq << " on " << *in << dendl;
-      in->cap_snap_pending = 0;
-      queue_cap_snap(in, seq);
+    if ((cap & CEPH_CAP_WR) &&
+       in->cap_snaps.size() &&
+       in->cap_snaps.begin()->second.writing) {
+      dout(10) << "put_cap_ref finishing pending cap_snap on " << *in << dendl;
+      in->cap_snaps.begin()->second.writing = 0;
+      finish_cap_snap(in, &in->cap_snaps.begin()->second, in->caps_used());
       signal_cond_list(in->waitfor_caps);  // wake up blocked sync writers
     }
     if (cap & CEPH_CAP_WRBUFFER) {
+      for (map<snapid_t,CapSnap>::iterator p = in->cap_snaps.begin();
+          p != in->cap_snaps.end();
+          p++)
+       p->second.dirty = 0;
       check_caps(in, false);
       signal_cond_list(in->waitfor_commit);
     }
@@ -1506,31 +1511,42 @@ struct C_SnapFlush : public Context {
 void Client::queue_cap_snap(Inode *in, snapid_t seq)
 {
   int used = in->caps_used();
+  dout(10) << "queue_cap_snap " << *in << " seq " << seq << " used " << cap_string(used) << dendl;
 
-  if (in->cap_snap_pending) {
-    dout(10) << "queue_cap_snap already cap_snap_pending on " << *in << dendl;
-  } else if (used & CEPH_CAP_WR) {
-    dout(10) << "queue_cap_snap WR used, marking cap_snap_pending on " << *in << dendl;
-    in->cap_snap_pending = seq;
-  } else if (used & CEPH_CAP_WRBUFFER) {
-    dout(10) << "queue_cap_snap took cap_snap, WRBUFFER used, flushing data on " << *in << dendl;
-    in->take_cap_snap(true, seq);
-    in->get();
-    if (g_conf.client_oc)
-      _flush(in, new C_SnapFlush(this, in, seq));
-    else {
-      /*
-       * FIXME: in sync mode, we have no way to wait for prior writes to commit
-       * without getting starved.  so don't wait, for now.
-       */
-      dout(10) << "queue_cap_snap FIXME can't safely flush in sync mode, skipping" << dendl;
-      in->cap_snaps[seq].flushing = false;
-      flush_snaps(in);
-    }
+  if (in->cap_snaps.size() &&
+      in->cap_snaps.begin()->second.writing) {
+    dout(10) << "queue_cap_snap already have pending cap_snap on " << *in << dendl;
+    return;
+  }
+
+  in->get();
+  CapSnap *capsnap = &in->cap_snaps[seq];
+  capsnap->context = in->snaprealm->cached_snap_context;
+  capsnap->issued = in->caps_issued();
+  
+  capsnap->dirty = (used & CEPH_CAP_WRBUFFER);
+
+  if (used & CEPH_CAP_WR) {
+    dout(10) << "queue_cap_snap WR used on " << *in << dendl;
+    capsnap->writing = 1;
   } else {
-    dout(10) << "queue_cap_snap took cap_snap, no WR|WRBUFFER used, flushing cap snap on " << *in << dendl;
-    in->take_cap_snap(false, seq);
-    in->get();
+    finish_cap_snap(in, capsnap, used);
+  }
+}
+
+void Client::finish_cap_snap(Inode *in, CapSnap *capsnap, int used)
+{
+  dout(10) << "finish_cap_snap " << *in << " capsnap " << (void*)capsnap << " used " << cap_string(used) << dendl;
+  capsnap->size = in->inode.size;
+  capsnap->mtime = in->inode.mtime;
+  capsnap->atime = in->inode.atime;
+  capsnap->ctime = in->inode.ctime;
+  capsnap->time_warp_seq = in->inode.time_warp_seq;
+  if (used & CEPH_CAP_WRBUFFER) {
+    dout(10) << "finish_cap_snap " << *in << " cap_snap " << capsnap << " used " << used
+            << " WRBUFFER, delaying" << dendl;
+  } else {
+    capsnap->dirty = 0;
     flush_snaps(in);
   }
 }
@@ -1539,7 +1555,7 @@ void Client::_flushed_cap_snap(Inode *in, snapid_t seq)
 {
   dout(10) << "_flushed_cap_snap seq " << seq << " on " << *in << dendl;
   assert(in->cap_snaps.count(seq));
-  in->cap_snaps[seq].flushing = 0;
+  in->cap_snaps[seq].dirty = 0;
   flush_snaps(in);
 }
 
@@ -1565,15 +1581,18 @@ void Client::flush_snaps(Inode *in)
             << " follows " << p->first
             << " size " << p->second.size
             << " mtime " << p->second.mtime
-            << " still flushing=" << p->second.flushing
+            << " dirty=" << p->second.dirty
+            << " writing=" << p->second.writing
             << " on " << *in << dendl;
-    if (p->second.flushing)
+    if (p->second.dirty || p->second.writing)
       continue;
     MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino(), mseq);
-    m->head.caps = in->caps_issued();
     m->head.snap_follows = p->first;
     m->head.size = p->second.size;
+    m->head.caps = p->second.issued;
+    p->second.ctime.encode_timeval(&m->head.ctime);
     p->second.mtime.encode_timeval(&m->head.mtime);
+    p->second.atime.encode_timeval(&m->head.atime);
     messenger->send_message(m, mdsmap->get_inst(mds));
   }
 }
@@ -3786,7 +3805,8 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf)
   
   // wait for caps, max_size
   while ((lazy && (in->caps_issued() & CEPH_CAP_LAZYIO) == 0) ||
-        (!lazy && (in->caps_issued() & CEPH_CAP_WR) == 0) ||
+        (!lazy && (in->caps_issued() & CEPH_CAP_WR) == 0 &&
+         (in->cap_snaps.empty() || !in->cap_snaps.begin()->second.writing)) ||
         endoff > in->inode.max_size) {
     dout(7) << "missing wr|lazy cap OR endoff " << endoff
            << " > max_size " << in->inode.max_size 
@@ -3794,12 +3814,6 @@ int Client::_write(Fh *f, __s64 offset, __u64 size, const char *buf)
     wait_on_list(in->waitfor_caps);
   }
 
-  // wait for cap snap?
-  while (in->cap_snap_pending) {
-    dout(7) << "waiting for cap_snap_pending " << in->cap_snap_pending << dendl;
-    wait_on_list(in->waitfor_caps);
-  }
-
   in->get_cap_ref(CEPH_CAP_WR);
 
   // avoid livelock with fsync?
index 3910249c41589581779418c2381f365ff19e6b28..e7d5d14fac5fb70cdc1d249167c2f5784a57b1ec 100644 (file)
@@ -169,10 +169,13 @@ struct InodeCap {
 
 struct CapSnap {
   //snapid_t follows;  // map key
+  SnapContext context;
+  int issued;
   __u64 size;
-  utime_t mtime;
-  bool flushing;
-  CapSnap() : flushing(false) {}
+  utime_t ctime, mtime, atime;
+  version_t time_warp_seq;
+  bool writing, dirty;
+  CapSnap() : issued(0), size(0), time_warp_seq(0), writing(false), dirty(false) {}
 };
 
 
@@ -201,7 +204,6 @@ class Inode {
   xlist<Inode*>::item snaprealm_item;
   Inode *snapdir_parent;  // only if we are a snapdir inode
   map<snapid_t,CapSnap> cap_snaps;   // pending flush to mds
-  snapid_t cap_snap_pending;
 
   //int open_by_mode[CEPH_FILE_MODE_NUM];
   map<int,int> open_by_mode;
@@ -331,13 +333,6 @@ class Inode {
     return want;
   }
 
-  void take_cap_snap(bool flushing, snapid_t seq) {
-    assert(snaprealm);
-    cap_snaps[seq].size = inode.size;
-    cap_snaps[seq].mtime = inode.mtime;
-    cap_snaps[seq].flushing = flushing;
-  }
-
   int get_effective_lease_mask(utime_t now) {
     int havemask = 0;
     if (now < lease_ttl && lease_mds >= 0)
@@ -868,6 +863,7 @@ protected:
   void put_cap_ref(Inode *in, int cap);
   void flush_snaps(Inode *in);
   void queue_cap_snap(Inode *in, snapid_t seq=0);
+  void finish_cap_snap(Inode *in, CapSnap *capsnap, int used);
   void _flushed_cap_snap(Inode *in, snapid_t seq);
 
   void _release(Inode *in, bool checkafter=true);