From 4bdcfbffa0b4f134d447cfc8fa83c19e4133390c Mon Sep 17 00:00:00 2001 From: Sam Lang Date: Thu, 17 Jan 2013 14:03:51 -0600 Subject: [PATCH] client: Respect O_SYNC, O_DSYNC, and O_RSYNC If the file is opened with O_SYNC, O_DSYNC, or O_RSYNC, we need to flush cached data (and metadata for O_SYNC) on a write. For O_RSYNC, we need to flush dirty data on a read. This patch adds a file_flush() call to the objectCacher to allow a specific range to be flushed from the cache, and in the O_SYNC,O_DSYNC case for write and O_RSYNC case for read, calls that function waiting for the flush to complete. The patch also adds a flags field directly to the file handle struct, and replaces the append boolean with the use of the flags field directly. Signed-off-by: Sam Lang --- src/client/Client.cc | 105 +++++++++++++++++++++++++++++----------- src/client/Client.h | 3 +- src/client/Fh.h | 4 +- src/osdc/ObjectCacher.h | 7 +++ 4 files changed, 89 insertions(+), 30 deletions(-) diff --git a/src/client/Client.cc b/src/client/Client.cc index b76ed8776614c..70d2ce0657536 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -2583,6 +2583,32 @@ bool Client::_flush(Inode *in) return safe; } +void Client::_flush_range(Inode *in, int64_t offset, uint64_t size) +{ + assert(client_lock.is_locked()); + if (!in->oset.dirty_or_tx) { + ldout(cct, 10) << " nothing to flush" << dendl; + return; + } + + Mutex flock("Client::_flush_range flock"); + Cond cond; + bool safe = false; + Context *onflush = new C_SafeCond(&flock, &cond, &safe); + safe = objectcacher->file_flush(&in->oset, &in->layout, in->snaprealm->get_snap_context(), + offset, size, onflush); + if (safe) + return; + + // wait for flush + client_lock.Unlock(); + flock.Lock(); + while (!safe) + cond.Wait(flock); + flock.Unlock(); + client_lock.Lock(); +} + void Client::flush_set_callback(ObjectCacher::ObjectSet *oset) { // Mutex::Locker l(client_lock); @@ -5201,9 +5227,8 @@ Fh *Client::_create_fh(Inode *in, int flags, int cmode) // yay Fh *f = new Fh; f->mode = cmode; - if (flags & O_APPEND) - f->append = true; - + f->flags = flags; + // inode assert(in); f->inode = in; @@ -5420,10 +5445,20 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl) movepos = true; } - if (!conf->client_debug_force_sync_read && have & CEPH_CAP_FILE_CACHE) + if (!conf->client_debug_force_sync_read && have & CEPH_CAP_FILE_CACHE) { + + if (f->flags & O_RSYNC) { + _flush_range(in, offset, size); + } r = _read_async(f, offset, size, bl); - else + } else { r = _read_sync(f, offset, size, bl); + } + + // don't move pointer if the read failed + if (r < 0) { + goto done; + } if (movepos) { // adjust fd pos @@ -5444,6 +5479,7 @@ int Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl) << dendl; f->last_pos = offset+bl->length(); +done: // done! put_cap_ref(in, CEPH_CAP_FILE_RD); return r; @@ -5679,10 +5715,10 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf) * FIXME: this is racy in that we may block _after_ this point waiting for caps, and size may * change out from under us. */ - if (f->append) + if (f->flags & O_APPEND) _lseek(f, 0, SEEK_END); offset = f->pos; - f->pos = offset+size; + f->pos = offset+size; unlock_fh_pos(f); } @@ -5692,13 +5728,15 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf) // time it. utime_t start = ceph_clock_now(cct); - + // copy into fresh buffer (since our write may be resub, async) bufferptr bp; if (size > 0) bp = buffer::copy(buf, size); bufferlist bl; bl.push_back( bp ); + utime_t lat; + uint64_t totalwritten; uint64_t endoff = offset + size; int have; int r = get_caps(in, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, endoff); @@ -5715,11 +5753,21 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf) get_cap_ref(in, CEPH_CAP_FILE_BUFFER); // async, caching, non-blocking. - objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(), - offset, size, bl, ceph_clock_now(cct), 0, - client_lock); + r = objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(), + offset, size, bl, ceph_clock_now(cct), 0, + client_lock); put_cap_ref(in, CEPH_CAP_FILE_BUFFER); + + if (r < 0) + goto done; + + // flush cached write if O_SYNC is set on file fh + // O_DSYNC == O_SYNC on linux < 2.6.33 + // O_SYNC = __O_SYNC | O_DSYNC on linux >= 2.6.33 + if (f->flags & O_SYNC || f->flags & O_DSYNC) { + _flush_range(in, offset, size); + } } else { // simple, non-atomic sync write Mutex flock("Client::_write flock"); @@ -5730,12 +5778,14 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf) unsafe_sync_write++; get_cap_ref(in, CEPH_CAP_FILE_BUFFER); // released by onsafe callback - - filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(), - offset, size, bl, ceph_clock_now(cct), 0, - in->truncate_size, in->truncate_seq, - onfinish, onsafe); - + + r = filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(), + offset, size, bl, ceph_clock_now(cct), 0, + in->truncate_size, in->truncate_seq, + onfinish, onsafe); + if (r < 0) + goto done; + client_lock.Unlock(); flock.Lock(); while (!done) @@ -5744,23 +5794,25 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf) client_lock.Lock(); } + // if we get here, write was successful, update client metadata + // time - utime_t lat = ceph_clock_now(cct); + lat = ceph_clock_now(cct); lat -= start; logger->tinc(l_c_wrlat, lat); - - // assume success for now. FIXME. - uint64_t totalwritten = size; - + + totalwritten = size; + r = (int)totalwritten; + // extend file? if (totalwritten + offset > in->size) { in->size = totalwritten + offset; mark_caps_dirty(in, CEPH_CAP_FILE_WR); - + if ((in->size << 1) >= in->max_size && (in->reported_size << 1) < in->max_size) check_caps(in, false); - + ldout(cct, 7) << "wrote to " << totalwritten+offset << ", extending file size" << dendl; } else { ldout(cct, 7) << "wrote to " << totalwritten+offset << ", leaving file size at " << in->size << dendl; @@ -5770,10 +5822,9 @@ int Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf) in->mtime = ceph_clock_now(cct); mark_caps_dirty(in, CEPH_CAP_FILE_WR); +done: put_cap_ref(in, CEPH_CAP_FILE_WR); - - // ok! - return totalwritten; + return r; } int Client::_flush(Fh *f) diff --git a/src/client/Client.h b/src/client/Client.h index 4f33d3c342ace..2f36713d53b27 100644 --- a/src/client/Client.h +++ b/src/client/Client.h @@ -393,7 +393,7 @@ protected: Client(Messenger *m, MonClient *mc); ~Client(); - void tear_down_cache(); + void tear_down_cache(); client_t get_nodeid() { return whoami; } @@ -452,6 +452,7 @@ protected: void _async_invalidate(Inode *in, int64_t off, int64_t len, bool keep_caps); void _release(Inode *in); bool _flush(Inode *in); + void _flush_range(Inode *in, int64_t off, uint64_t size); void _flushed(Inode *in); void flush_set_callback(ObjectCacher::ObjectSet *oset); diff --git a/src/client/Fh.h b/src/client/Fh.h index 741f31c62acdd..59f8f33d3627b 100644 --- a/src/client/Fh.h +++ b/src/client/Fh.h @@ -16,7 +16,7 @@ struct Fh { bool is_lazy() { return mode & O_LAZY; } - bool append; + int flags; bool pos_locked; // pos is currently in use list pos_waiters; // waiters for pos @@ -25,7 +25,7 @@ struct Fh { loff_t consec_read_bytes; int nr_consec_read; - Fh() : inode(0), pos(0), mds(0), mode(0), append(false), pos_locked(false), + Fh() : inode(0), pos(0), mds(0), mode(0), flags(0), pos_locked(false), last_pos(0), consec_read_bytes(0), nr_consec_read(0) {} }; diff --git a/src/osdc/ObjectCacher.h b/src/osdc/ObjectCacher.h index 0daa97972269f..b222553abe6f1 100644 --- a/src/osdc/ObjectCacher.h +++ b/src/osdc/ObjectCacher.h @@ -601,6 +601,13 @@ public: Striper::file_to_extents(cct, oset->ino, layout, offset, len, wr->extents); return writex(wr, oset, wait_on_lock); } + + bool file_flush(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc, + loff_t offset, uint64_t len, Context *onfinish) { + vector extents; + Striper::file_to_extents(cct, oset->ino, layout, offset, len, extents); + return flush_set(oset, extents, onfinish); + } }; -- 2.39.5