]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
client: Respect O_SYNC, O_DSYNC, and O_RSYNC
authorSam Lang <sam.lang@inktank.com>
Thu, 17 Jan 2013 20:03:51 +0000 (14:03 -0600)
committerSam Lang <sam.lang@inktank.com>
Fri, 18 Jan 2013 21:43:33 +0000 (15:43 -0600)
If the file is opened with O_SYNC, O_DSYNC, or O_RSYNC, we need to
flush cached data (and metadata for O_SYNC) on a write.
For O_RSYNC, we need to flush dirty data on a read.
This patch adds a file_flush() call to the objectCacher
to allow a specific range to be flushed from the cache, and
in the O_SYNC,O_DSYNC case for write and O_RSYNC case for read,
calls that function waiting for the flush to complete.  The patch
also adds a flags field directly to the file handle struct, and
replaces the append boolean with the use of the flags field directly.

Signed-off-by: Sam Lang <sam.lang@inktank.com>
src/client/Client.cc
src/client/Client.h
src/client/Fh.h
src/osdc/ObjectCacher.h

index b76ed8776614cdf49e7d3b417631afc64c855d91..70d2ce0657536e2473792d5dd78f60260230168d 100644 (file)
@@ -2583,6 +2583,32 @@ bool Client::_flush(Inode *in)
   return safe;
 }
 
+void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
+{
+  assert(client_lock.is_locked());
+  if (!in->oset.dirty_or_tx) {
+    ldout(cct, 10) << " nothing to flush" << dendl;
+    return;
+  }
+
+  Mutex flock("Client::_flush_range flock");
+  Cond cond;
+  bool safe = false;
+  Context *onflush = new C_SafeCond(&flock, &cond, &safe);
+  safe = objectcacher->file_flush(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
+                                 offset, size, onflush);
+  if (safe)
+    return;
+
+  // wait for flush
+  client_lock.Unlock();
+  flock.Lock();
+  while (!safe)
+    cond.Wait(flock);
+  flock.Unlock();
+  client_lock.Lock();
+}
+
 void Client::flush_set_callback(ObjectCacher::ObjectSet *oset)
 {
   //  Mutex::Locker l(client_lock);
@@ -5201,9 +5227,8 @@ Fh *Client::_create_fh(Inode *in, int flags, int cmode)
   // yay
   Fh *f = new Fh;
   f->mode = cmode;
-  if (flags & O_APPEND)
-    f->append = true;
-  
+  f->flags = flags;
+
   // inode
   assert(in);
   f->inode = in;
@@ -5420,10 +5445,20 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
     movepos = true;
   }
 
-  if (!conf->client_debug_force_sync_read && have & CEPH_CAP_FILE_CACHE)
+  if (!conf->client_debug_force_sync_read && have & CEPH_CAP_FILE_CACHE) {
+
+    if (f->flags & O_RSYNC) {
+      _flush_range(in, offset, size);
+    }
     r = _read_async(f, offset, size, bl);
-  else
+  } else {
     r = _read_sync(f, offset, size, bl);
+  }
+
+  // don't move pointer if the read failed
+  if (r < 0) {
+    goto done;
+  }
 
   if (movepos) {
     // adjust fd pos
@@ -5444,6 +5479,7 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
           << dendl;
   f->last_pos = offset+bl->length();
 
+done:
   // done!
   put_cap_ref(in, CEPH_CAP_FILE_RD);
   return r;
@@ -5679,10 +5715,10 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
      * FIXME: this is racy in that we may block _after_ this point waiting for caps, and size may
      * change out from under us.
      */
-    if (f->append)
+    if (f->flags & O_APPEND)
       _lseek(f, 0, SEEK_END);
     offset = f->pos;
-    f->pos = offset+size;    
+    f->pos = offset+size;
     unlock_fh_pos(f);
   }
 
@@ -5692,13 +5728,15 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
 
   // time it.
   utime_t start = ceph_clock_now(cct);
-    
+
   // copy into fresh buffer (since our write may be resub, async)
   bufferptr bp;
   if (size > 0) bp = buffer::copy(buf, size);
   bufferlist bl;
   bl.push_back( bp );
 
+  utime_t lat;
+  uint64_t totalwritten;
   uint64_t endoff = offset + size;
   int have;
   int r = get_caps(in, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, endoff);
@@ -5715,11 +5753,21 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
     get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
 
     // async, caching, non-blocking.
-    objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
-                            offset, size, bl, ceph_clock_now(cct), 0,
-                            client_lock);
+    r = objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
+                                offset, size, bl, ceph_clock_now(cct), 0,
+                                client_lock);
 
     put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
+
+    if (r < 0)
+      goto done;
+
+    // flush cached write if O_SYNC is set on file fh
+    // O_DSYNC == O_SYNC on linux < 2.6.33
+    // O_SYNC = __O_SYNC | O_DSYNC on linux >= 2.6.33
+    if (f->flags & O_SYNC || f->flags & O_DSYNC) {
+      _flush_range(in, offset, size);
+    }
   } else {
     // simple, non-atomic sync write
     Mutex flock("Client::_write flock");
@@ -5730,12 +5778,14 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
 
     unsafe_sync_write++;
     get_cap_ref(in, CEPH_CAP_FILE_BUFFER);  // released by onsafe callback
-    
-    filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
-                      offset, size, bl, ceph_clock_now(cct), 0,
-                      in->truncate_size, in->truncate_seq,
-                      onfinish, onsafe);
-    
+
+    r = filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
+                          offset, size, bl, ceph_clock_now(cct), 0,
+                          in->truncate_size, in->truncate_seq,
+                          onfinish, onsafe);
+    if (r < 0)
+      goto done;
+
     client_lock.Unlock();
     flock.Lock();
     while (!done)
@@ -5744,23 +5794,25 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
     client_lock.Lock();
   }
 
+  // if we get here, write was successful, update client metadata
+
   // time
-  utime_t lat = ceph_clock_now(cct);
+  lat = ceph_clock_now(cct);
   lat -= start;
   logger->tinc(l_c_wrlat, lat);
-    
-  // assume success for now.  FIXME.
-  uint64_t totalwritten = size;
-  
+
+  totalwritten = size;
+  r = (int)totalwritten;
+
   // extend file?
   if (totalwritten + offset > in->size) {
     in->size = totalwritten + offset;
     mark_caps_dirty(in, CEPH_CAP_FILE_WR);
-    
+
     if ((in->size << 1) >= in->max_size &&
        (in->reported_size << 1) < in->max_size)
       check_caps(in, false);
-      
+
     ldout(cct, 7) << "wrote to " << totalwritten+offset << ", extending file size" << dendl;
   } else {
     ldout(cct, 7) << "wrote to " << totalwritten+offset << ", leaving file size at " << in->size << dendl;
@@ -5770,10 +5822,9 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf)
   in->mtime = ceph_clock_now(cct);
   mark_caps_dirty(in, CEPH_CAP_FILE_WR);
 
+done:
   put_cap_ref(in, CEPH_CAP_FILE_WR);
-  
-  // ok!
-  return totalwritten;  
+  return r;
 }
 
 int Client::_flush(Fh *f)
index 4f33d3c342ace991b3a97fa9f5dbf86a1b33756c..2f36713d53b27d277a9fcd03adf1b0dd232bf204 100644 (file)
@@ -393,7 +393,7 @@ protected:
 
   Client(Messenger *m, MonClient *mc);
   ~Client();
-  void tear_down_cache();   
+  void tear_down_cache();
 
   client_t get_nodeid() { return whoami; }
 
@@ -452,6 +452,7 @@ protected:
   void _async_invalidate(Inode *in, int64_t off, int64_t len, bool keep_caps);
   void _release(Inode *in);
   bool _flush(Inode *in);
+  void _flush_range(Inode *in, int64_t off, uint64_t size);
   void _flushed(Inode *in);
   void flush_set_callback(ObjectCacher::ObjectSet *oset);
 
index 741f31c62acdde94dc6fc13ddbc3abd77539bab8..59f8f33d3627b4bde0553b8da36dfa55efa10f62 100644 (file)
@@ -16,7 +16,7 @@ struct Fh {
 
   bool is_lazy() { return mode & O_LAZY; }
 
-  bool append;
+  int flags;
   bool pos_locked;           // pos is currently in use
   list<Cond*> pos_waiters;   // waiters for pos
 
@@ -25,7 +25,7 @@ struct Fh {
   loff_t consec_read_bytes;
   int nr_consec_read;
 
-  Fh() : inode(0), pos(0), mds(0), mode(0), append(false), pos_locked(false),
+  Fh() : inode(0), pos(0), mds(0), mode(0), flags(0), pos_locked(false),
         last_pos(0), consec_read_bytes(0), nr_consec_read(0) {}
 };
 
index 0daa97972269f859c46d747af15e46b8276a73d7..b222553abe6f1b0512ed62ef690b3dd7ade0c5fe 100644 (file)
@@ -601,6 +601,13 @@ public:
     Striper::file_to_extents(cct, oset->ino, layout, offset, len, wr->extents);
     return writex(wr, oset, wait_on_lock);
   }
+
+  bool file_flush(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,
+                  loff_t offset, uint64_t len, Context *onfinish) {
+    vector<ObjectExtent> extents;
+    Striper::file_to_extents(cct, oset->ino, layout, offset, len, extents);
+    return flush_set(oset, extents, onfinish);
+  }
 };