return safe;
}
+void Client::_flush_range(Inode *in, int64_t offset, uint64_t size)
+{
+ assert(client_lock.is_locked());
+ if (!in->oset.dirty_or_tx) {
+ ldout(cct, 10) << " nothing to flush" << dendl;
+ return;
+ }
+
+ Mutex flock("Client::_flush_range flock");
+ Cond cond;
+ bool safe = false;
+ Context *onflush = new C_SafeCond(&flock, &cond, &safe);
+ safe = objectcacher->file_flush(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
+ offset, size, onflush);
+ if (safe)
+ return;
+
+ // wait for flush
+ client_lock.Unlock();
+ flock.Lock();
+ while (!safe)
+ cond.Wait(flock);
+ flock.Unlock();
+ client_lock.Lock();
+}
+
void Client::flush_set_callback(ObjectCacher::ObjectSet *oset)
{
// Mutex::Locker l(client_lock);
// yay
Fh *f = new Fh;
f->mode = cmode;
- if (flags & O_APPEND)
- f->append = true;
-
+ f->flags = flags;
+
// inode
assert(in);
f->inode = in;
movepos = true;
}
- if (!conf->client_debug_force_sync_read && have & CEPH_CAP_FILE_CACHE)
+ if (!conf->client_debug_force_sync_read && have & CEPH_CAP_FILE_CACHE) {
+
+ if (f->flags & O_RSYNC) {
+ _flush_range(in, offset, size);
+ }
r = _read_async(f, offset, size, bl);
- else
+ } else {
r = _read_sync(f, offset, size, bl);
+ }
+
+ // don't move pointer if the read failed
+ if (r < 0) {
+ goto done;
+ }
if (movepos) {
// adjust fd pos
<< dendl;
f->last_pos = offset+bl->length();
+done:
// done!
put_cap_ref(in, CEPH_CAP_FILE_RD);
return r;
* FIXME: this is racy in that we may block _after_ this point waiting for caps, and size may
* change out from under us.
*/
- if (f->append)
+ if (f->flags & O_APPEND)
_lseek(f, 0, SEEK_END);
offset = f->pos;
- f->pos = offset+size;
+ f->pos = offset+size;
unlock_fh_pos(f);
}
// time it.
utime_t start = ceph_clock_now(cct);
-
+
// copy into fresh buffer (since our write may be resub, async)
bufferptr bp;
if (size > 0) bp = buffer::copy(buf, size);
bufferlist bl;
bl.push_back( bp );
+ utime_t lat;
+ uint64_t totalwritten;
uint64_t endoff = offset + size;
int have;
int r = get_caps(in, CEPH_CAP_FILE_WR, CEPH_CAP_FILE_BUFFER, &have, endoff);
get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
// async, caching, non-blocking.
- objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
- offset, size, bl, ceph_clock_now(cct), 0,
- client_lock);
+ r = objectcacher->file_write(&in->oset, &in->layout, in->snaprealm->get_snap_context(),
+ offset, size, bl, ceph_clock_now(cct), 0,
+ client_lock);
put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
+
+ if (r < 0)
+ goto done;
+
+ // flush cached write if O_SYNC is set on file fh
+ // O_DSYNC == O_SYNC on linux < 2.6.33
+ // O_SYNC = __O_SYNC | O_DSYNC on linux >= 2.6.33
+ if (f->flags & O_SYNC || f->flags & O_DSYNC) {
+ _flush_range(in, offset, size);
+ }
} else {
// simple, non-atomic sync write
Mutex flock("Client::_write flock");
unsafe_sync_write++;
get_cap_ref(in, CEPH_CAP_FILE_BUFFER); // released by onsafe callback
-
- filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
- offset, size, bl, ceph_clock_now(cct), 0,
- in->truncate_size, in->truncate_seq,
- onfinish, onsafe);
-
+
+ r = filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
+ offset, size, bl, ceph_clock_now(cct), 0,
+ in->truncate_size, in->truncate_seq,
+ onfinish, onsafe);
+ if (r < 0)
+ goto done;
+
client_lock.Unlock();
flock.Lock();
while (!done)
client_lock.Lock();
}
+ // if we get here, write was successful, update client metadata
+
// time
- utime_t lat = ceph_clock_now(cct);
+ lat = ceph_clock_now(cct);
lat -= start;
logger->tinc(l_c_wrlat, lat);
-
- // assume success for now. FIXME.
- uint64_t totalwritten = size;
-
+
+ totalwritten = size;
+ r = (int)totalwritten;
+
// extend file?
if (totalwritten + offset > in->size) {
in->size = totalwritten + offset;
mark_caps_dirty(in, CEPH_CAP_FILE_WR);
-
+
if ((in->size << 1) >= in->max_size &&
(in->reported_size << 1) < in->max_size)
check_caps(in, false);
-
+
ldout(cct, 7) << "wrote to " << totalwritten+offset << ", extending file size" << dendl;
} else {
ldout(cct, 7) << "wrote to " << totalwritten+offset << ", leaving file size at " << in->size << dendl;
in->mtime = ceph_clock_now(cct);
mark_caps_dirty(in, CEPH_CAP_FILE_WR);
+done:
put_cap_ref(in, CEPH_CAP_FILE_WR);
-
- // ok!
- return totalwritten;
+ return r;
}
int Client::_flush(Fh *f)