]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Client: Add non-blocking helper classes
authorFrank S. Filz <ffilzlnx@mindspring.com>
Wed, 11 May 2022 21:22:55 +0000 (14:22 -0700)
committerFrank S. Filz <ffilzlnx@mindspring.com>
Mon, 24 Jul 2023 18:49:03 +0000 (11:49 -0700)
Signed-off-by: Frank S. Filz <ffilzlnx@mindspring.com>
src/client/Client.cc
src/client/Client.h

index e3b0a94a2ab859ee53834f864a2b43ec2390722d..20e62f389951fdebad00ea74a3f7e3af877229d1 100644 (file)
@@ -10499,17 +10499,144 @@ int Client::preadv(int fd, const struct iovec *iov, int iovcnt, loff_t offset)
   return _preadv_pwritev(fd, iov, iovcnt, offset, false);
 }
 
-int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl)
+void Client::C_Read_Finisher::finish_io(int r)
+{
+  utime_t lat;
+
+  // Caller holds client_lock so we don't need to take it.
+
+  if (r >= 0) {
+    if (is_read_async) {
+    } else {
+      // may need to retry on short read
+    }
+
+    clnt->update_read_io_size(size);
+    if (movepos) {
+      // adjust fd pos
+      f->pos = offset + r;
+    }
+    
+    lat = ceph_clock_now();
+    lat -= start;
+    ++clnt->nr_read_request;
+    clnt->update_io_stat_read(lat);
+  }
+
+  iofinished = true;
+
+  if (have_caps) {
+    clnt->put_cap_ref(in, CEPH_CAP_FILE_RD);
+  }
+  if (movepos) {
+    clnt->unlock_fh_pos(f);
+  }
+
+  onfinish->complete(r);
+  delete this;
+}
+
+void Client::C_Read_Sync_NonBlocking::retry()
+{
+  filer->read_trunc(in->ino, &in->layout, in->snapid, pos, left, &tbl, 0,
+                    in->truncate_size, in->truncate_seq, this);
+}
+
+/**
+ * The following method implements most of what _read_sync does, but in a
+ * way that works with the non-blocking read path.
+ */
+void Client::C_Read_Sync_NonBlocking::finish(int r)
+{
+  clnt->client_lock.lock();
+
+  if (r == -CEPHFS_ENOENT) {
+    // if we get ENOENT from OSD, assume 0 bytes returned
+    goto success;
+  } else if (r < 0) {
+    // pass error to caller
+    goto error;
+  }
+
+  if (tbl.length()) {
+    r = tbl.length();
+
+    read += r;
+    pos += r;
+    left -= r;
+    bl->claim_append(tbl);
+  }
+
+  // short read?
+  if (r >= 0 && r < wanted) {
+    if (pos < in->size) {
+      // zero up to known EOF
+      int64_t some = in->size - pos;
+      if (some > left)
+        some = left;
+      auto z = buffer::ptr_node::create(some);
+      z->zero();
+      bl->push_back(std::move(z));
+      read += some;
+      pos += some;
+      left -= some;
+      if (left == 0)
+        goto success;
+    }
+
+    clnt->put_cap_ref(in, CEPH_CAP_FILE_RD);
+    // reverify size
+    {
+      r = clnt->_getattr(in, CEPH_STAT_CAP_SIZE, f->actor_perms);
+      if (r < 0)
+        goto error;
+    }
+
+    // eof?  short read.
+    if ((uint64_t)pos >= in->size)
+      goto success;
+
+    {
+      int have_caps2 = 0;
+      r = clnt->get_caps(f, CEPH_CAP_FILE_RD, have_caps, &have_caps2, -1);
+      if (r < 0) {
+        goto error;
+      }
+    }
+
+    wanted = left;
+    retry();
+    clnt->client_lock.unlock();
+    return;
+  }
+
+success:
+
+  r = read;
+
+error:
+
+  onfinish->complete(r);
+  fini = true;
+
+  clnt->client_lock.unlock();
+}
+
+int64_t Client::_read(Fh *f, int64_t offset, uint64_t size, bufferlist *bl,
+                      Context *onfinish)
 {
   ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   int want, have = 0;
   bool movepos = false;
+  std::unique_ptr<Context> iofinish = nullptr;
+  std::unique_ptr<C_Read_Finisher> crf = nullptr;
   int64_t rc = 0;
   const auto& conf = cct->_conf;
   Inode *in = f->inode.get();
   utime_t lat;
   utime_t start = ceph_clock_now(); 
+  CRF_iofinish *crf_iofinish = nullptr;
 
   if ((f->mode & CEPH_FILE_MODE_RD) == 0)
     return -CEPHFS_EBADF;
@@ -10569,17 +10696,81 @@ retry:
     goto success;
   }
 
+  if (onfinish) {
+     crf_iofinish = new CRF_iofinish();
+     iofinish.reset(crf_iofinish);
+
+     crf.reset(new
+       C_Read_Finisher(this, onfinish, iofinish.get(),
+                       !conf->client_debug_force_sync_read &&
+                         conf->client_oc &&
+                         (have & (CEPH_CAP_FILE_CACHE |
+                                  CEPH_CAP_FILE_LAZYIO)),
+                       have, movepos, start, f, in, f->pos, offset, size));
+
+    crf_iofinish->CRF = crf.get();
+  }
+
+  // There are three cases to be handled at this point:
+  //
+  // CAES 1 - blocking or non-blocking caller with the client holding Fc caps
+  //          and client_debug_force_sync_read being default (`false)
+  //
+  // CASE 2 - non-blocking caller with sync read from the OSD (since client
+  //          does not have the required caps or the above config is set)
+  //
+  // CASE 3 - blocking call by the caller
+
   if (!conf->client_debug_force_sync_read &&
       conf->client_oc &&
       (have & (CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO))) {
+    // CAES 1 - blocking or non-blocking caller with the client holding Fc caps
+    //          and client_debug_force_sync_read being default (`false)
 
     if (f->flags & O_RSYNC) {
       _flush_range(in, offset, size);
     }
-    rc = _read_async(f, offset, size, bl);
+    rc = _read_async(f, offset, size, bl, iofinish.get());
+
+    if (onfinish) {
+      // handle non-blocking caller (onfinish != nullptr), we can now safely
+      // release all the managed pointers, but we might need to do something
+      // with iofinisher.
+      Context *iof = iofinish.release();
+      crf.release();
+
+      if (rc < 0)
+        iof->complete(rc);
+
+      // allow caller to wait on onfinish...
+      return 0;
+    }
+
     if (rc < 0)
       goto done;
+  } else if (onfinish) {
+    // CASE 2 - non-blocking caller with sync read from the OSD (since client
+    //          does not have the required caps or the above config is set)
+
+    // handle _sync_read without blocking...
+    // This sounds odd, but we want to accomplish what is done in the else
+    // branch below but in a non-blocking fashion. The code in _read_sync
+    // is duplicated and modified and exists in
+    // C_Read_Sync_NonBlocking::finish().
+    C_Read_Sync_NonBlocking *crsa =
+      new C_Read_Sync_NonBlocking(this, iofinish.release(), f, in, f->pos,
+                                  offset, size, bl, filer.get(), have);
+      crf.release();
+
+      // Now make first attempt at performing _read_sync
+      crsa->retry();
+
+      // Now the C_Read_Sync_NonBlocking is going to handle EVERYTHING else
+      // Allow caller to wait on onfinish...
+      return 0;
   } else {
+    // CASE 3 - blocking call by the caller
+
     if (f->flags & O_DIRECT)
       _flush_range(in, offset, size);
 
@@ -10674,7 +10865,19 @@ void Client::do_readahead(Fh *f, Inode *in, uint64_t off, uint64_t len)
   }
 }
 
-int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
+void Client::C_Read_Async_Finisher::finish(int r)
+{
+  clnt->client_lock.lock();
+
+  clnt->do_readahead(f, in, off, len);
+
+  onfinish->complete(r);
+
+  clnt->client_lock.unlock();
+}
+
+int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
+                       Context *onfinish)
 {
   ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
@@ -10698,13 +10901,40 @@ int Client::_read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl)
 
   // read (and possibly block)
   int r = 0;
-  C_SaferCond onfinish("Client::_read_async flock");
+  std::unique_ptr<Context> io_finish = nullptr;
+  C_SaferCond *io_finish_cond = nullptr;
+  if (onfinish == nullptr) {
+    io_finish_cond = new C_SaferCond("Client::_read_async flock");
+    io_finish.reset(io_finish_cond);
+  } else {
+    io_finish.reset(new C_Read_Async_Finisher(this, onfinish, f, in,
+                                              f->pos, off, len));
+  }
+
   r = objectcacher->file_read(&in->oset, &in->layout, in->snapid,
-                             off, len, bl, 0, &onfinish);
+                             off, len, bl, 0, io_finish.get());
+
+  if (onfinish != nullptr) {
+    // Release C_Read_Async_Finisher from managed pointer, either
+    // file_read will result in non-blocking complete, or we need to complete
+    // immediately. In either case, the C_Read_Async_Finisher is safely
+    // handled and won't be abandoned.
+    Context *crf = io_finish.release();
+    if (r != 0) {
+      // need to do readahead, so complete the crf
+      client_lock.unlock();
+      crf->complete(r);
+      client_lock.lock();
+    } else {
+      get_cap_ref(in, CEPH_CAP_FILE_CACHE);
+    }
+    return 0;
+  }
+
   if (r == 0) {
     get_cap_ref(in, CEPH_CAP_FILE_CACHE);
     client_lock.unlock();
-    r = onfinish.wait();
+    r = io_finish_cond->wait();
     client_lock.lock();
     put_cap_ref(in, CEPH_CAP_FILE_CACHE);
     update_read_io_size(bl->length());
@@ -10823,7 +11053,8 @@ int Client::pwritev(int fd, const struct iovec *iov, int iovcnt, int64_t offset)
 
 int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
                                        unsigned iovcnt, int64_t offset,
-                                       bool write, bool clamp_to_int)
+                                       bool write, bool clamp_to_int,
+                                       Context *onfinish, bufferlist *blp)
 {
     ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
@@ -10844,16 +11075,24 @@ int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
     if (clamp_to_int) {
       totallen = std::min(totallen, (loff_t)INT_MAX);
     }
+
     if (write) {
-        int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt);
+        int64_t w = _write(fh, offset, totallen, NULL, iov, iovcnt, onfinish);
         ldout(cct, 3) << "pwritev(" << fh << ", \"...\", " << totallen << ", " << offset << ") = " << w << dendl;
         return w;
     } else {
         bufferlist bl;
-        int64_t r = _read(fh, offset, totallen, &bl);
+        int64_t r = _read(fh, offset, totallen, blp ? blp : &bl,
+                          onfinish);
         ldout(cct, 3) << "preadv(" << fh << ", " <<  offset << ") = " << r << dendl;
-        if (r <= 0)
+        if (r <= 0) {
+          if (r < 0 && onfinish != nullptr) {
+            client_lock.unlock();
+            onfinish->complete(r);
+            client_lock.lock();
+          }
           return r;
+        }
 
         client_lock.unlock();
         copy_bufferlist_to_iovec(iov, iovcnt, &bl, r);
@@ -10862,7 +11101,9 @@ int64_t Client::_preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
     }
 }
 
-int Client::_preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, int64_t offset, bool write)
+int Client::_preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt,
+                            int64_t offset, bool write, Context *onfinish,
+                            bufferlist *blp)
 {
     RWRef_t mref_reader(mount_state, CLIENT_MOUNTING);
     if (!mref_reader.is_state_satisfied())
@@ -10875,7 +11116,8 @@ int Client::_preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt, in
     Fh *fh = get_filehandle(fd);
     if (!fh)
       return -CEPHFS_EBADF;
-    return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true);
+    return _preadv_pwritev_locked(fh, iov, iovcnt, offset, write, true,
+                                  onfinish, blp);
 }
 
 int64_t Client::_write_success(Fh *f, utime_t start, uint64_t fpos,
@@ -10925,13 +11167,64 @@ int64_t Client::_write_success(Fh *f, utime_t start, uint64_t fpos,
   return r;
 }
 
+void Client::C_Write_Finisher::finish_io(int r)
+{
+  bool fini;
+
+  clnt->put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
+
+  if (r >= 0) {
+    if (is_file_write) {
+      if ((f->flags & O_SYNC) || (f->flags & O_DSYNC)) {
+        clnt->_flush_range(in, offset, size);
+      }
+    }
+
+    r = clnt->_write_success(f, start, fpos, offset, size, in);
+  }
+
+  iofinished = true;
+  iofinished_r = r;
+  fini = try_complete();
+
+  if (fini)
+    delete this;
+}
+
+void Client::C_Write_Finisher::finish_onuninline(int r)
+{
+  // Called by _write with client_lock held.
+  onuninlinefinished = true;
+  onuninlinefinished_r = r;
+
+  if (try_complete())
+    delete this;
+}
+
+bool Client::C_Write_Finisher::try_complete()
+{
+  if (onuninlinefinished && iofinished) {
+    clnt->put_cap_ref(in, CEPH_CAP_FILE_WR);
+
+    if (onuninlinefinished_r >= 0 || onuninlinefinished_r == -CEPHFS_ECANCELED)
+      onfinish->complete(iofinished_r);
+    else
+      onfinish->complete(onuninlinefinished_r);
+    return true;
+  }
+  return false;
+}
+
 int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
-                       const struct iovec *iov, int iovcnt)
+                       const struct iovec *iov, int iovcnt, Context *onfinish)
 {
   ceph_assert(ceph_mutex_is_locked_by_me(client_lock));
 
   uint64_t fpos = 0;
   Inode *in = f->inode.get();
+  std::unique_ptr<C_SaferCond> onuninline = nullptr;
+  CWF_iofinish *cwf_iofinish = NULL;
+  C_SaferCond *cond_iofinish = NULL;
 
   if ( (uint64_t)(offset+size) > mdsmap->get_max_filesize() && //exceeds config
        (uint64_t)(offset+size) > in->size ) { //exceeds filesize 
@@ -11025,7 +11318,8 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
 
   ldout(cct, 10) << " snaprealm " << *in->snaprealm << dendl;
 
-  std::unique_ptr<C_SaferCond> onuninline = nullptr;
+  std::unique_ptr<Context> iofinish = nullptr;
+  std::unique_ptr<C_Write_Finisher> cwf = nullptr;
   
   if (in->inline_version < CEPH_INLINE_NONE) {
     if (endoff > cct->_conf->client_max_inline_size ||
@@ -11055,6 +11349,20 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
     }
   }
 
+  if (onfinish) {
+     cwf_iofinish = new CWF_iofinish();
+     iofinish.reset(cwf_iofinish);
+
+     cwf.reset(new
+       C_Write_Finisher(this, onfinish, nullptr == onuninline,
+                        cct->_conf->client_oc &&
+                          (have & (CEPH_CAP_FILE_BUFFER |
+                                 CEPH_CAP_FILE_LAZYIO)),
+                        start, f, in, fpos, offset, size));
+
+    cwf_iofinish->CWF = cwf.get();
+  }
+
   if (cct->_conf->client_oc &&
       (have & (CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO))) {
     // do buffered write
@@ -11067,8 +11375,42 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
     r = objectcacher->file_write(&in->oset, &in->layout,
                                 in->snaprealm->get_snap_context(),
                                 offset, size, bl, ceph::real_clock::now(),
-                                0, nullptr,
-                                objectcacher->CFG_block_writes_upfront());
+                                0, iofinish.get(),
+                                onfinish == nullptr
+                                  ? objectcacher->CFG_block_writes_upfront()
+                                  : false);
+
+    if (onfinish) {
+      // handle non-blocking caller (onfinish != nullptr), we can now safely
+      // release all the managed pointers, but we might need to do something
+      // with iofinisher.
+      Context *iof = iofinish.release();
+      C_Write_Finisher *cwfp = cwf.release();
+
+      if (r < 0) {
+        // should not get here, but...
+        iof->complete(r);
+      }
+
+      if (nullptr != onuninline) {
+        client_lock.unlock();
+        int uninline_ret = onuninline->wait();
+        client_lock.lock();
+
+        if (uninline_ret >= 0 || uninline_ret == -CEPHFS_ECANCELED) {
+          in->inline_data.clear();
+          in->inline_version = CEPH_INLINE_NONE;
+          in->mark_caps_dirty(CEPH_CAP_FILE_WR);
+          check_caps(in, 0);
+        }
+
+        cwfp->finish_onuninline(uninline_ret);
+      }
+
+      // allow caller to wait on onfinish...
+      return 0;
+    }
+
     put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
 
     if (r < 0)
@@ -11085,15 +11427,32 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
       _flush_range(in, offset, size);
 
     // simple, non-atomic sync write
-    C_SaferCond onfinish("Client::_write flock");
+    if (onfinish == nullptr) {
+      // We need a safer condition to wait on.
+      cond_iofinish = new C_SaferCond();
+      iofinish.reset(cond_iofinish);
+    }
+
     get_cap_ref(in, CEPH_CAP_FILE_BUFFER);
 
     filer->write_trunc(in->ino, &in->layout, in->snaprealm->get_snap_context(),
                       offset, size, bl, ceph::real_clock::now(), 0,
                       in->truncate_size, in->truncate_seq,
-                      &onfinish);
+                      iofinish.get());
+
+    if (onfinish) {
+      // handle non-blocking caller (onfinish != nullptr), we can now safely
+      // release all the managed pointers
+      iofinish.release();
+      onuninline.release();
+      cwf.release();
+
+      // allow caller to wait on onfinish...
+      return 0;
+    }
+
     client_lock.unlock();
-    r = onfinish.wait();
+    r = cond_iofinish->wait();
     client_lock.lock();
     put_cap_ref(in, CEPH_CAP_FILE_BUFFER);
     if (r < 0)
@@ -11103,10 +11462,13 @@ int64_t Client::_write(Fh *f, int64_t offset, uint64_t size, const char *buf,
   // if we get here, write was successful, update client metadata
 success:
 
+  // do not get here if non-blocking caller (onfinish != nullptr)
   r = _write_success(f, start, fpos, offset, size, in);
 
 done:
 
+  // can not get here if non-blocking caller (onfinish != nullptr)
+
   if (nullptr != onuninline) {
     client_lock.unlock();
     int uninline_ret = onuninline->wait();
index a561f4e7a585049094fc4fea11bf962589bd6612..b8e4a9cec8e091a41ef8727427fe787b5be2f808 100644 (file)
@@ -1245,6 +1245,178 @@ protected:
   struct initialize_state_t initialize_state;
 
 private:
+  class C_Read_Finisher : public Context {
+  public:
+    bool iofinished;
+    void finish_io(int r);
+
+    C_Read_Finisher(Client *clnt, Context *onfinish, Context *iofinish,
+                    bool is_read_async, int have_caps, bool movepos,
+                    utime_t start, Fh *f, Inode *in, uint64_t fpos,
+                    int64_t offset, uint64_t size)
+      : clnt(clnt), onfinish(onfinish), iofinish(iofinish),
+        is_read_async(is_read_async), have_caps(have_caps), f(f), in(in),
+        start(start), fpos(fpos), offset(offset), size(size), movepos(movepos) {
+      iofinished = false;
+    }
+
+    void finish(int r) override {
+      // We need to override finish, but have nothing to do.
+    }
+
+  private:
+    Client *clnt;
+    Context *onfinish;
+    Context *iofinish;
+    bool is_read_async;
+    int have_caps;
+    Fh *f;
+    Inode *in;
+    utime_t start;
+    uint64_t fpos;
+    int64_t offset;
+    uint64_t size;
+    bool movepos;
+  };
+
+  struct CRF_iofinish : public Context {
+    Client::C_Read_Finisher *CRF;
+
+    CRF_iofinish()
+      : CRF(nullptr) {}
+
+    void finish(int r) override {
+      CRF->finish_io(r);
+    }
+
+    // For _read_async, we may not finish in one go, so be prepared for multiple
+    // calls to complete. All the handling though is in C_Read_Finisher.
+    void complete(int r) override {
+      finish(r);
+      if (CRF->iofinished)
+        delete this;
+    }
+  };
+
+  class C_Read_Sync_NonBlocking : public Context {
+  // When operating in non-blocking mode, what used to be done by _read_sync
+  // still needs to be handled, but it needs to be handled without blocking
+  // while still following the semantics. Note that _read_sync is actually
+  // asynchronous. it just uses condition variables to wait. Now instead, we use
+  // this Context class to synchronize the steps.
+  //
+  // The steps will be accomplished by complete/finish being called to complete
+  // each step, with complete only releasing this object once all is finally
+  // complete.
+  public:
+    C_Read_Sync_NonBlocking(Client *clnt, Context *onfinish, Fh *f, Inode *in,
+                            uint64_t fpos, uint64_t off, uint64_t len,
+                            bufferlist *bl, Filer *filer, int have_caps)
+      : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len), bl(bl),
+        filer(filer), have_caps(have_caps)
+    {
+      left = len;
+      wanted = len;
+      read = 0;
+      pos = off;
+      fini = false;
+    }
+
+    void retry();
+
+  private:
+    Client *clnt;
+    Context *onfinish;
+    Fh *f;
+    Inode *in;
+    uint64_t off;
+    uint64_t len;
+    int left;
+    int wanted;
+    bufferlist *bl;
+    bufferlist tbl;
+    Filer *filer;
+    int have_caps;
+    int read;
+    uint64_t pos;
+    bool fini;
+
+    void finish(int r) override;
+
+    void complete(int r) override
+    {
+      finish(r);
+      if (fini)
+        delete this;
+    }
+  };
+
+  class C_Read_Async_Finisher : public Context {
+  public:
+    C_Read_Async_Finisher(Client *clnt, Context *onfinish, Fh *f, Inode *in,
+                          uint64_t fpos, uint64_t off, uint64_t len)
+      : clnt(clnt), onfinish(onfinish), f(f), in(in), off(off), len(len) {}
+
+  private:
+    Client *clnt;
+    Context *onfinish;
+    Fh *f;
+    Inode *in;
+    uint64_t off;
+    uint64_t len;
+
+    void finish(int r) override;
+  };
+
+  class C_Write_Finisher : public Context {
+  public:
+    void finish_io(int r);
+    void finish_onuninline(int r);
+
+    C_Write_Finisher(Client *clnt, Context *onfinish, bool dont_need_uninline,
+                     bool is_file_write, utime_t start, Fh *f, Inode *in,
+                     uint64_t fpos, int64_t offset, uint64_t size)
+      : clnt(clnt), onfinish(onfinish),
+        is_file_write(is_file_write), start(start), f(f), in(in), fpos(fpos),
+        offset(offset), size(size) {
+      iofinished_r = 0;
+      onuninlinefinished_r = 0;
+      iofinished = false;
+      onuninlinefinished = dont_need_uninline;
+    }
+
+    void finish(int r) override {
+      // We need to override finish, but have nothing to do.
+    }
+
+  private:
+    Client *clnt;
+    Context *onfinish;
+    bool is_file_write;
+    utime_t start;
+    Fh *f;
+    Inode *in;
+    uint64_t fpos;
+    int64_t offset;
+    uint64_t size;
+    int64_t iofinished_r;
+    int64_t onuninlinefinished_r;
+    bool iofinished;
+    bool onuninlinefinished;
+    bool try_complete();
+  };
+
+  struct CWF_iofinish : public Context {
+    C_Write_Finisher *CWF;
+
+    CWF_iofinish()
+      : CWF(nullptr) {}
+
+    void finish(int r) override {
+      CWF->finish_io(r);
+    }
+  };
+
   struct C_Readahead : public Context {
     C_Readahead(Client *c, Fh *f);
     ~C_Readahead() override;
@@ -1331,7 +1503,8 @@ private:
   std::pair<int, bool> _do_remount(bool retry_on_error);
 
   int _read_sync(Fh *f, uint64_t off, uint64_t len, bufferlist *bl, bool *checkeof);
-  int _read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl);
+  int _read_async(Fh *f, uint64_t off, uint64_t len, bufferlist *bl,
+                  Context *onfinish);
 
   bool _dentry_valid(const Dentry *dn);
 
@@ -1399,17 +1572,21 @@ private:
               std::string alternate_name);
 
   loff_t _lseek(Fh *fh, loff_t offset, int whence);
-  int64_t _read(Fh *fh, int64_t offset, uint64_t size, bufferlist *bl);
+  int64_t _read(Fh *fh, int64_t offset, uint64_t size, bufferlist *bl,
+               Context *onfinish = nullptr);
   void do_readahead(Fh *f, Inode *in, uint64_t off, uint64_t len);
   int64_t _write_success(Fh *fh, utime_t start, uint64_t fpos,
           int64_t offset, uint64_t size, Inode *in);
   int64_t _write(Fh *fh, int64_t offset, uint64_t size, const char *buf,
-          const struct iovec *iov, int iovcnt);
+          const struct iovec *iov, int iovcnt, Context *onfinish = nullptr);
   int64_t _preadv_pwritev_locked(Fh *fh, const struct iovec *iov,
                                  unsigned iovcnt, int64_t offset,
-                                 bool write, bool clamp_to_int);
+                                 bool write, bool clamp_to_int,
+                                 Context *onfinish = nullptr,
+                                 bufferlist *blp = nullptr);
   int _preadv_pwritev(int fd, const struct iovec *iov, unsigned iovcnt,
-                      int64_t offset, bool write);
+                      int64_t offset, bool write, Context *onfinish = nullptr,
+                      bufferlist *blp = nullptr);
   int _flush(Fh *fh);
   int _fsync(Fh *fh, bool syncdataonly);
   int _fsync(Inode *in, bool syncdataonly);