]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
client: delay snapflush until after data is flushed, sync writes complete
authorSage Weil <sage@newdream.net>
Wed, 20 Aug 2008 22:20:36 +0000 (15:20 -0700)
committerSage Weil <sage@newdream.net>
Wed, 20 Aug 2008 22:20:36 +0000 (15:20 -0700)
src/client/Client.cc
src/client/Client.h
src/mds/Locker.cc
src/mds/Server.cc
src/messages/MClientCaps.h

index f662a8d6311017210f429e1bfcdd915c9663b8f3..dc7685bda0ba53ef156311d12e3236d034c939d8 100644 (file)
@@ -1391,7 +1391,7 @@ void Client::put_cap_ref(Inode *in, int cap)
       snapid_t seq = in->cap_snap_pending;
       dout(10) << "put_cap_ref doing cap_snap_pending " << seq << " on " << *in << dendl;
       in->cap_snap_pending = 0;
-      in->queue_cap_snap(in, seq);
+      queue_cap_snap(in, seq);
       signal_cond_list(in->waitfor_caps);  // wake up blocked sync writers
     }
     if (cap & CEPH_CAP_WRBUFFER) {
@@ -1491,6 +1491,16 @@ void Client::check_caps(Inode *in, bool is_delayed)
   }
 }
 
+struct C_SnapFlush : public Context {
+  Client *client;
+  Inode *in;
+  snapid_t seq;
+  C_SnapFlush(Client *c, Inode *i, snapid_t s) : client(c), in(i), seq(s) {}
+  void finish(int r) {
+    client->_flushed_cap_snap(in, seq);
+  }
+};
+
 void Client::queue_cap_snap(Inode *in, snapid_t seq)
 {
   int used = in->caps_used();
@@ -1503,20 +1513,37 @@ void Client::queue_cap_snap(Inode *in, snapid_t seq)
   } else if (used & CEPH_CAP_WR) {
     dout(10) << "queue_cap_snap WR used, marking cap_snap_pending on " << *in << dendl;
     in->cap_snap_pending = seq;
-  } else {
-    if (used & CEPH_CAP_WRBUFFER) {
-      dout(10) << "queue_cap_snap took cap_snap, WRBUFFER used, flushing data" << dendl;
-      in->take_cap_snap(1, seq);
-      if (g_conf.client_oc)
-       _flush(in);
-    } else {
-      dout(10) << "queue_cap_snap took cap_snap, no WR|WRBUFFER used, flushing cap snap" << dendl;
-      in->take_cap_snap(0, seq);
+  } else if (used & CEPH_CAP_WRBUFFER) {
+    dout(10) << "queue_cap_snap took cap_snap, WRBUFFER used, flushing data on " << *in << dendl;
+    in->take_cap_snap(true, seq);
+    in->get();
+    if (g_conf.client_oc)
+      _flush(in, new C_SnapFlush(this, in, seq));
+    else {
+      /*
+       * FIXME: in sync mode, we have no way to wait for prior writes to commit
+       * without getting starved.  so don't wait, for now.
+       */
+      dout(10) << "queue_cap_snap FIXME can't safely flush in sync mode, skipping" << dendl;
+      in->cap_snaps[seq].flushing = false;
       flush_snaps(in);
     }
+  } else {
+    dout(10) << "queue_cap_snap took cap_snap, no WR|WRBUFFER used, flushing cap snap on " << *in << dendl;
+    in->take_cap_snap(false, seq);
+    in->get();
+    flush_snaps(in);
   }
 }
 
+void Client::_flushed_cap_snap(Inode *in, snapid_t seq)
+{
+  dout(10) << "_flushed_cap_snap seq " << seq << " on " << *in << dendl;
+  assert(in->cap_snaps.count(seq));
+  in->cap_snaps[seq].flushing = 0;
+  flush_snaps(in);
+}
+
 void Client::flush_snaps(Inode *in)
 {
   dout(10) << "flush_snaps on " << *in << dendl;
@@ -1524,21 +1551,27 @@ void Client::flush_snaps(Inode *in)
 
   // pick auth mds
   int mds = -1;
+  int mseq;
   for (map<int,InodeCap*>::iterator p = in->caps.begin(); p != in->caps.end(); p++) {
     if (p->second->issued & (CEPH_CAP_WR|CEPH_CAP_WRBUFFER|CEPH_CAP_EXCL)) {
       mds = p->first;
+      mseq = p->second->mseq;
       break;
     }
   }
   assert(mds >= 0);
 
-  for (map<snapid_t,SnapCap>::iterator p = in->cap_snaps.begin(); p != in->cap_snaps.end(); p++) {
+  for (map<snapid_t,CapSnap>::iterator p = in->cap_snaps.begin(); p != in->cap_snaps.end(); p++) {
     dout(10) << "flush_snaps mds" << mds
             << " follows " << p->first
             << " size " << p->second.size
             << " mtime " << p->second.mtime
+            << " still flushing=" << p->second.flushing
             << " on " << *in << dendl;
-    MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino());
+    if (p->second.flushing)
+      continue;
+    MClientCaps *m = new MClientCaps(CEPH_CAP_OP_FLUSHSNAP, in->ino(), mseq);
+    m->head.caps = in->caps_issued();
     m->head.snap_follows = p->first;
     m->head.size = p->second.size;
     p->second.mtime.encode_timeval(&m->head.mtime);
@@ -1858,9 +1891,6 @@ void Client::handle_snap(MClientSnap *m)
        // queue for snap writeback
        queue_cap_snap(in);
 
-       if (g_conf.client_oc)
-         _flush(in);
-
        if (in->snaprealm) {
          put_snap_realm(in->snaprealm);
          in->snaprealm_item.remove_myself();
@@ -2034,11 +2064,17 @@ void Client::handle_cap_flushedsnap(Inode *in, MClientCaps *m)
   assert(in->caps[mds]);
   snapid_t follows = m->get_snap_follows();
 
-  dout(5) << "handle_cap_flushedsnap mds" << mds << " flushed snap follows " << follows
-         << " on " << *in << dendl;
-  assert(in->cap_snaps.count(follows));
-  in->cap_snaps.erase(follows);
-  
+  if (in->cap_snaps.count(follows)) {
+    dout(5) << "handle_cap_flushedsnap mds" << mds << " flushed snap follows " << follows
+           << " on " << *in << dendl;
+    in->cap_snaps.erase(follows);
+    put_inode(in);
+  } else {
+    dout(5) << "handle_cap_flushedsnap DUP(?) mds" << mds << " flushed snap follows " << follows
+           << " on " << *in << dendl;
+    // we may not have it if we send multiple FLUSHSNAP requests and (get multiple FLUSHEDSNAPs back)
+  }
+    
   delete m;
 }
 
@@ -3370,8 +3406,8 @@ int Client::_release(Fh *f)
 {
   //dout(3) << "op: client->close(open_files[ " << fh << " ]);" << dendl;
   //dout(3) << "op: open_files.erase( " << fh << " );" << dendl;
-  dout(5) << "_release " << f << " mode " << f->mode << dendl;
   Inode *in = f->inode;
+  dout(5) << "_release " << f << " mode " << f->mode << " on " << *in << dendl;
 
   if (in->snapid == CEPH_NOSNAP) {
     if (in->put_open_ref(f->mode)) {
index 42caa352a4b7b4ef9b0520a12e343f077857b9dc..5ded3237a15d6e359ae1480cf2604f968223e2bb 100644 (file)
@@ -167,12 +167,12 @@ struct InodeCap {
   InodeCap() : issued(0), implemented(0), seq(0), mseq(0) {}
 };
 
-struct SnapCap {
+struct CapSnap {
   //snapid_t follows;  // map key
   __u64 size;
   utime_t mtime;
-  int flushing;
-  SnapCap() : flushing(0) {}
+  bool flushing;
+  CapSnap() : flushing(false) {}
 };
 
 
@@ -200,7 +200,7 @@ class Inode {
   SnapRealm *snaprealm;
   xlist<Inode*>::item snaprealm_item;
   Inode *snapdir_parent;  // only if we are a snapdir inode
-  map<snapid_t,SnapCap> cap_snaps;   // pending flush to mds
+  map<snapid_t,CapSnap> cap_snaps;   // pending flush to mds
   snapid_t cap_snap_pending;
 
   //int open_by_mode[CEPH_FILE_MODE_NUM];
@@ -331,7 +331,7 @@ class Inode {
     return want;
   }
 
-  void take_cap_snap(int flushing, snapid_t seq) {
+  void take_cap_snap(bool flushing, snapid_t seq) {
     assert(snaprealm);
     cap_snaps[seq].size = inode.size;
     cap_snaps[seq].mtime = inode.mtime;
@@ -867,7 +867,8 @@ protected:
   void check_caps(Inode *in, bool is_delayed);
   void put_cap_ref(Inode *in, int cap);
   void flush_snaps(Inode *in);
-  void queue_cap_snap(Inode *in);
+  void queue_cap_snap(Inode *in, snapid_t seq=0);
+  void _flushed_cap_snap(Inode *in, snapid_t seq);
 
   void _release(Inode *in, bool checkafter=true);
   void _flush(Inode *in, Context *onfinish=NULL);
index 3263ae23143fcb0fe3c8baaa3670b1bcdabacd3f..bebbde997d389c3fced52565f001332323d07432 100644 (file)
@@ -978,9 +978,6 @@ void Locker::handle_client_caps(MClientCaps *m)
   if (m->get_op() == CEPH_CAP_OP_FLUSHSNAP) {
     dout(7) << " flushsnap follows " << follows
            << " client" << client << " on " << *in << dendl;
-    int had = cap->confirm_receipt(m->get_seq(), m->get_caps());
-    int has = cap->confirmed();
-
     // this cap now follows a later snap (i.e. the one initiating this flush, or later)
     cap->client_follows = follows+1;
 
@@ -993,7 +990,7 @@ void Locker::handle_client_caps(MClientCaps *m)
 
     MClientCaps *ack = new MClientCaps(CEPH_CAP_OP_FLUSHEDSNAP, in->inode, 0, 0, 0, 0, 0);
     ack->set_snap_follows(follows);
-    _do_cap_update(in, has|had, in->get_caps_wanted(), follows, m, ack);
+    _do_cap_update(in, m->get_caps(), in->get_caps_wanted(), follows, m, ack);
   } else {
 
     // for this and all subsequent versions of this inode,
index 615aaede9ae24f02568eafb4309a6892db827e58..2839c3e0adf3d07b7dc9701d36256759a6bb2693 100644 (file)
@@ -426,7 +426,7 @@ void Server::handle_client_reconnect(MClientReconnect *m)
        inode_t fake_inode;
        memset(&fake_inode, 0, sizeof(fake_inode));
        fake_inode.ino = p->first;
-       MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first);
+       MClientCaps *stale = new MClientCaps(CEPH_CAP_OP_EXPORT, p->first, 0);
        //stale->head.migrate_seq = 0; // FIXME ******
        mds->send_message_client(stale, m->get_source_inst());
 
index 37ae1fb250f51c93531bd106d53e6598e317cc19..bb64cbf687d11863348454558bece4c4d0a46774 100644 (file)
@@ -82,11 +82,13 @@ class MClientCaps : public Message {
     head.time_warp_seq = inode.time_warp_seq;
   }
   MClientCaps(int op,
-             inodeno_t ino) :
+             inodeno_t ino,
+             int mseq) :
     Message(CEPH_MSG_CLIENT_CAPS) {
     memset(&head, 0, sizeof(head));
     head.op = op;
     head.ino = ino;
+    head.migrate_seq = mseq;
   }
 
   const char *get_type_name() { return "Cfcap";}