]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: integrate cache with journal
authorJason Dillaman <dillaman@redhat.com>
Wed, 15 Jul 2015 17:18:16 +0000 (13:18 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 13 Nov 2015 04:27:06 +0000 (23:27 -0500)
Cache writeback should be delayed until after journal event has been
commmitted to disk.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/AioImageRequest.cc
src/librbd/AioImageRequest.h
src/librbd/ImageCtx.cc
src/librbd/ImageCtx.h
src/librbd/Journal.cc
src/librbd/Journal.h
src/librbd/LibrbdWriteback.cc

index dc6d87f24fd5ea8b4522d28cc44472734277433c..a902adcaf4f4bf329591e006c79238127573df2e 100644 (file)
 
 namespace librbd {
 
+namespace {
+
+struct C_DiscardJournalCommit : public Context {
+  typedef std::vector<ObjectExtent> ObjectExtents;
+
+  ImageCtx &image_ctx;
+  AioCompletion *aio_comp;
+  ObjectExtents object_extents;
+
+  C_DiscardJournalCommit(ImageCtx &_image_ctx, AioCompletion *_aio_comp,
+                         const ObjectExtents &_object_extents, uint64_t tid)
+    : image_ctx(_image_ctx), aio_comp(_aio_comp),
+      object_extents(_object_extents) {
+    CephContext *cct = image_ctx.cct;
+    ldout(cct, 20) << this << " C_DiscardJournalCommit: "
+                   << "delaying cache discard until journal tid " << tid << " "
+                   << "safe" << dendl;
+
+    aio_comp->add_request();
+  }
+
+  virtual void finish(int r) {
+    CephContext *cct = image_ctx.cct;
+    ldout(cct, 20) << this << " C_DiscardJournalCommit: "
+                   << "journal committed: discarding from cache" << dendl;
+
+    Mutex::Locker cache_locker(image_ctx.cache_lock);
+    image_ctx.object_cacher->discard_set(image_ctx.object_set, object_extents);
+    aio_comp->complete_request(cct, r);
+  }
+};
+
+} // anonymous namespace
+
 void AioImageRequest::aio_read(
     ImageCtx *ictx, AioCompletion *c,
     const std::vector<std::pair<uint64_t,uint64_t> > &extents,
@@ -196,7 +230,7 @@ void AbstractAioImageWrite::send_request() {
   }
 
   if (m_image_ctx.object_cacher != NULL) {
-    send_cache_requests(object_extents, snapc, journal_tid);
+    send_cache_requests(object_extents, journal_tid);
   }
   update_stats(clip_len);
 
@@ -214,7 +248,7 @@ void AbstractAioImageWrite::send_object_requests(
     ldout(cct, 20) << " oid " << p->oid << " " << p->offset << "~" << p->length
                    << " from " << p->buffer_extents << dendl;
     C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
-    AioObjectRequest *request = send_object_request(*p, snapc, req_comp);
+    AioObjectRequest *request = create_object_request(*p, snapc, req_comp);
 
     // if journaling, stash the request for later; otherwise send
     if (request != NULL) {
@@ -242,14 +276,12 @@ uint64_t AioImageWrite::append_journal_event(
 
   journal::EventEntry event_entry(journal::AioWriteEvent(m_off, m_len, bl));
   return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests,
-                                           synchronous);
+                                           m_off, m_len, synchronous);
 }
 
 void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents,
-                                        const ::SnapContext &snapc,
                                         uint64_t journal_tid) {
   CephContext *cct = m_image_ctx.cct;
-
   for (ObjectExtents::const_iterator p = object_extents.begin();
        p != object_extents.end(); ++p) {
     const ObjectExtent &object_extent = *p;
@@ -257,19 +289,27 @@ void AioImageWrite::send_cache_requests(const ObjectExtents &object_extents,
     bufferlist bl;
     assemble_extent(object_extent, &bl);
 
-    // TODO pass journal_tid to object cacher
     C_AioRequest *req_comp = new C_AioRequest(cct, m_aio_comp);
     m_image_ctx.write_to_cache(object_extent.oid, bl, object_extent.length,
-                               object_extent.offset, req_comp, m_op_flags);
+                               object_extent.offset, req_comp, m_op_flags,
+                               journal_tid);
   }
 }
 
-AioObjectRequest *AioImageWrite::send_object_request(
+void AioImageWrite::send_object_requests(
+    const ObjectExtents &object_extents, const ::SnapContext &snapc,
+    AioObjectRequests *aio_object_requests) {
+  // cache handles creating object requests during writeback
+  if (m_image_ctx.object_cacher == NULL) {
+    AbstractAioImageWrite::send_object_requests(object_extents, snapc,
+                                                aio_object_requests);
+  }
+}
+
+AioObjectRequest *AioImageWrite::create_object_request(
     const ObjectExtent &object_extent, const ::SnapContext &snapc,
     Context *on_finish) {
-  if (m_image_ctx.object_cacher != NULL) {
-    return NULL;
-  }
+  assert(m_image_ctx.object_cacher == NULL);
 
   bufferlist bl;
   assemble_extent(object_extent, &bl);
@@ -291,22 +331,25 @@ uint64_t AioImageDiscard::append_journal_event(
     const AioObjectRequests &requests, bool synchronous) {
   journal::EventEntry event_entry(journal::AioDiscardEvent(m_off, m_len));
   return m_image_ctx.journal->append_event(m_aio_comp, event_entry, requests,
-                                           synchronous);
+                                           m_off, m_len, synchronous);
 }
 
 void AioImageDiscard::send_cache_requests(const ObjectExtents &object_extents,
-                                          const ::SnapContext &snapc,
                                           uint64_t journal_tid) {
-  // TODO need to have cache flag pending discard for writeback or need
-  // to delay cache update until after journal commits
-  Mutex::Locker cache_locker(m_image_ctx.cache_lock);
-
-  // TODO pass journal_tid to object cacher
-  m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set,
-                                         object_extents);
+  if (journal_tid == 0) {
+    Mutex::Locker cache_locker(m_image_ctx.cache_lock);
+    m_image_ctx.object_cacher->discard_set(m_image_ctx.object_set,
+                                           object_extents);
+  } else {
+    // cannot discard from cache until journal has committed
+    assert(m_image_ctx.journal != NULL);
+    m_image_ctx.journal->wait_event(
+      journal_tid, new C_DiscardJournalCommit(m_image_ctx, m_aio_comp,
+                                              object_extents, journal_tid));
+  }
 }
 
-AioObjectRequest *AioImageDiscard::send_object_request(
+AioObjectRequest *AioImageDiscard::create_object_request(
     const ObjectExtent &object_extent, const ::SnapContext &snapc,
     Context *on_finish) {
   CephContext *cct = m_image_ctx.cct;
@@ -346,7 +389,7 @@ void AioImageFlush::send_request() {
     if (m_image_ctx.journal != NULL) {
       m_image_ctx.journal->append_event(
         m_aio_comp, journal::EventEntry(journal::AioFlushEvent()),
-        AioObjectRequests(), true);
+        AioObjectRequests(), 0, 0, true);
     }
   }
 
index ab8053a03f2937462a6c5bafa384499cc3cabfa5..6fdc9f9712dfd9154d710401700bdc0f524fdd41 100644 (file)
@@ -109,13 +109,12 @@ protected:
   virtual void send_request();
 
   virtual void send_cache_requests(const ObjectExtents &object_extents,
-                                   const ::SnapContext &snapc,
                                    uint64_t journal_tid) = 0;
 
-  void send_object_requests(const ObjectExtents &object_extents,
-                            const ::SnapContext &snapc,
-                            AioObjectRequests *aio_object_requests);
-  virtual AioObjectRequest *send_object_request(
+  virtual void send_object_requests(const ObjectExtents &object_extents,
+                                    const ::SnapContext &snapc,
+                                    AioObjectRequests *aio_object_requests);
+  virtual AioObjectRequest *create_object_request(
       const ObjectExtent &object_extent, const ::SnapContext &snapc,
       Context *on_finish) = 0;
 
@@ -146,10 +145,12 @@ protected:
   void assemble_extent(const ObjectExtent &object_extent, bufferlist *bl);
 
   virtual void send_cache_requests(const ObjectExtents &object_extents,
-                                   const ::SnapContext &snapc,
                                    uint64_t journal_tid);
 
-  virtual AioObjectRequest *send_object_request(
+  virtual void send_object_requests(const ObjectExtents &object_extents,
+                                    const ::SnapContext &snapc,
+                                    AioObjectRequests *aio_object_requests);
+  virtual AioObjectRequest *create_object_request(
       const ObjectExtent &object_extent, const ::SnapContext &snapc,
       Context *on_finish);
 
@@ -177,10 +178,9 @@ protected:
   }
 
   virtual void send_cache_requests(const ObjectExtents &object_extents,
-                                   const ::SnapContext &snapc,
                                    uint64_t journal_tid);
 
-  virtual AioObjectRequest *send_object_request(
+  virtual AioObjectRequest *create_object_request(
       const ObjectExtent &object_extent, const ::SnapContext &snapc,
       Context *on_finish);
 
index 86cd49e88bab83436ad0952dba79bf49b93d4f8f..a7ce73a168b6ca33f1cda4076a705dcfd312667f 100644 (file)
@@ -625,12 +625,12 @@ public:
 
   void ImageCtx::write_to_cache(object_t o, const bufferlist& bl, size_t len,
                                uint64_t off, Context *onfinish,
-                               int fadvise_flags) {
+                               int fadvise_flags, uint64_t journal_tid) {
     snap_lock.get_read();
     ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl,
                                                              utime_t(),
                                                               fadvise_flags,
-                                                              0);
+                                                              journal_tid);
     snap_lock.put_read();
     ObjectExtent extent(o, 0, off, len, 0);
     extent.oloc.pool = data_ctx.get_id();
index 2d41e01145078124957bba4ee61c16cb437238bc..df3784a770cd1e60c82a297a2f122ba86af5c281 100644 (file)
@@ -226,7 +226,8 @@ namespace librbd {
                             size_t len, uint64_t off, Context *onfinish,
                             int fadvise_flags);
     void write_to_cache(object_t o, const bufferlist& bl, size_t len,
-                       uint64_t off, Context *onfinish, int fadvise_flags);
+                       uint64_t off, Context *onfinish, int fadvise_flags,
+                        uint64_t journal_tid);
     void user_flushed();
     void flush_cache_aio(Context *onfinish);
     int flush_cache();
index 0e5f00eba953952922752d9b8bd302f2b46a1eab..1278ef36cf75465278539dbf053e5e42cfc42b7d 100644 (file)
@@ -27,7 +27,7 @@ Journal::Journal(ImageCtx &image_ctx)
   : m_image_ctx(image_ctx), m_journaler(NULL),
     m_lock("Journal::m_lock"), m_state(STATE_UNINITIALIZED),
     m_lock_listener(this), m_replay_handler(this), m_close_pending(false),
-    m_next_tid(0), m_blocking_writes(false) {
+    m_event_tid(0), m_blocking_writes(false) {
 
   ldout(m_image_ctx.cct, 5) << this << ": ictx=" << &m_image_ctx << dendl;
 
@@ -156,6 +156,7 @@ int Journal::close() {
 uint64_t Journal::append_event(AioCompletion *aio_comp,
                                const journal::EventEntry &event_entry,
                                const AioObjectRequests &requests,
+                               uint64_t offset, size_t length,
                                bool flush_entry) {
   assert(m_image_ctx.owner_lock.is_locked());
   assert(m_state == STATE_RECORDING);
@@ -167,14 +168,18 @@ uint64_t Journal::append_event(AioCompletion *aio_comp,
   uint64_t tid;
   {
     Mutex::Locker locker(m_lock);
-    tid = m_next_tid++;
-    m_events[tid] = Event(future, aio_comp, requests);
+    tid = ++m_event_tid;
+    assert(tid != 0);
+
+    m_events[tid] = Event(future, aio_comp, requests, offset, length);
   }
 
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << ": "
                  << "event=" << event_entry.get_event_type() << ", "
                  << "new_reqs=" << requests.size() << ", "
+                 << "offset=" << offset << ", "
+                 << "length=" << length << ", "
                  << "flush=" << flush_entry << ", tid=" << tid << dendl;
 
   Context *on_safe = new C_EventSafe(this, tid);
@@ -186,6 +191,97 @@ uint64_t Journal::append_event(AioCompletion *aio_comp,
   return tid;
 }
 
+void Journal::commit_event(uint64_t tid, int r) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
+                 "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+  Events::iterator it = m_events.find(tid);
+  if (it == m_events.end()) {
+    return;
+  }
+  complete_event(it, r);
+}
+
+void Journal::commit_event_extent(uint64_t tid, uint64_t offset,
+                                  uint64_t length, int r) {
+  assert(length > 0);
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
+                 << "offset=" << offset << ", "
+                 << "length=" << length << ", "
+                 << "r=" << r << dendl;
+
+  Mutex::Locker locker(m_lock);
+  Events::iterator it = m_events.find(tid);
+  if (it == m_events.end()) {
+    return;
+  }
+
+  Event &event = it->second;
+  if (event.ret_val == 0 && r < 0) {
+    event.ret_val = r;
+  }
+
+  ExtentInterval extent;
+  extent.insert(offset, length);
+
+  ExtentInterval intersect;
+  intersect.intersection_of(extent, event.pending_extents);
+
+  event.pending_extents.subtract(intersect);
+  if (!event.pending_extents.empty()) {
+    ldout(cct, 20) << "pending extents: " << event.pending_extents << dendl;
+    return;
+  }
+  complete_event(it, event.ret_val);
+}
+
+void Journal::flush_event(uint64_t tid, Context *on_safe) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
+                 << "on_safe=" << on_safe << dendl;
+
+  ::journal::Future future;
+  {
+    Mutex::Locker locker(m_lock);
+    future = wait_event(m_lock, tid, on_safe);
+  }
+
+  if (future.is_valid()) {
+    future.flush(NULL);
+  }
+}
+
+void Journal::wait_event(uint64_t tid, Context *on_safe) {
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": tid=" << tid << ", "
+                 << "on_safe=" << on_safe << dendl;
+
+  Mutex::Locker locker(m_lock);
+  wait_event(m_lock, tid, on_safe);
+}
+
+::journal::Future Journal::wait_event(Mutex &lock, uint64_t tid,
+                                      Context *on_safe) {
+  assert(m_lock.is_locked());
+  CephContext *cct = m_image_ctx.cct;
+
+  Events::iterator it = m_events.find(tid);
+  if (it == m_events.end() || it->second.safe) {
+    // journal entry already safe
+    ldout(cct, 20) << "journal entry already safe" << dendl;
+    m_image_ctx.op_work_queue->queue(on_safe, 0);
+    return ::journal::Future();
+  }
+
+  Event &event = it->second;
+  event.on_safe_contexts.push_back(on_safe);
+  return event.future;
+}
+
 void Journal::create_journaler() {
   CephContext *cct = m_image_ctx.cct;
   ldout(cct, 20) << this << " " << __func__ << dendl;
@@ -214,6 +310,18 @@ void Journal::destroy_journaler() {
   transition_state(STATE_UNINITIALIZED);
 }
 
+void Journal::complete_event(Events::iterator it, int r) {
+  assert(m_lock.is_locked());
+
+  CephContext *cct = m_image_ctx.cct;
+  ldout(cct, 20) << this << " " << __func__ << ": tid=" << it->first << " "
+                 << "r=" << r << dendl;
+
+  // TODO
+
+  m_events.erase(it);
+}
+
 void Journal::handle_initialized(int r) {
   CephContext *cct = m_image_ctx.cct;
   if (r < 0) {
@@ -298,8 +406,7 @@ void Journal::handle_event_safe(int r, uint64_t tid) {
   ldout(cct, 20) << this << " " << __func__ << ": r=" << r << ", "
                  << "tid=" << tid << dendl;
 
-  RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
-
+  // TODO: ensure this callback never sees a failure
   AioCompletion *aio_comp;
   AioObjectRequests aio_object_requests;
   Contexts on_safe_contexts;
@@ -312,18 +419,24 @@ void Journal::handle_event_safe(int r, uint64_t tid) {
     aio_comp = event.aio_comp;
     aio_object_requests.swap(event.aio_object_requests);
     on_safe_contexts.swap(event.on_safe_contexts);
-    m_events.erase(it);
+
+    if (event.pending_extents.empty()) {
+      m_events.erase(it);
+    } else {
+      event.safe = true;
+    }
   }
 
   ldout(cct, 20) << "completing tid=" << tid << dendl;
 
-  assert(m_image_ctx.image_watcher->is_lock_owner());
-
   if (r < 0) {
     // don't send aio requests if the journal fails -- bubble error up
     aio_comp->fail(cct, r);
   } else {
     // send any waiting aio requests now that journal entry is safe
+    RWLock::RLocker owner_locker(m_image_ctx.owner_lock);
+    assert(m_image_ctx.image_watcher->is_lock_owner());
+
     for (AioObjectRequests::iterator it = aio_object_requests.begin();
          it != aio_object_requests.end(); ++it) {
       (*it)->send();
index 35c1738944399f507583e0e9ba70c5126660ad31..4ac2bee97c62370b2893f8b0357cdaac1c46b77c 100644 (file)
@@ -6,6 +6,7 @@
 
 #include "include/int_types.h"
 #include "include/Context.h"
+#include "include/interval_set.h"
 #include "include/unordered_map.h"
 #include "include/rados/librados.hpp"
 #include "common/Mutex.h"
@@ -13,6 +14,7 @@
 #include "journal/Future.h"
 #include "journal/ReplayHandler.h"
 #include "librbd/ImageWatcher.h"
+#include <algorithm>
 #include <list>
 #include <string>
 
@@ -49,10 +51,19 @@ public:
   uint64_t append_event(AioCompletion *aio_comp,
                         const journal::EventEntry &event_entry,
                         const AioObjectRequests &requests,
+                        uint64_t offset, size_t length,
                         bool flush_entry);
 
+  void commit_event(uint64_t tid, int r);
+  void commit_event_extent(uint64_t tid, uint64_t offset, uint64_t length,
+                           int r);
+
+  void flush_event(uint64_t tid, Context *on_safe);
+  void wait_event(uint64_t tid, Context *on_safe);
+
 private:
   typedef std::list<Context *> Contexts;
+  typedef interval_set<uint64_t> ExtentInterval;
 
   enum State {
     STATE_UNINITIALIZED,
@@ -66,12 +77,19 @@ private:
     AioCompletion *aio_comp;
     AioObjectRequests aio_object_requests;
     Contexts on_safe_contexts;
+    ExtentInterval pending_extents;
+    bool safe;
+    int ret_val;
 
     Event() : aio_comp(NULL) {
     }
     Event(const ::journal::Future &_future, AioCompletion *_aio_comp,
-          const AioObjectRequests &_requests)
-      : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests) {
+          const AioObjectRequests &_requests, uint64_t offset, size_t length)
+      : future(_future), aio_comp(_aio_comp), aio_object_requests(_requests),
+        safe(false), ret_val(0) {
+      if (length > 0) {
+        pending_extents.insert(offset, length);
+      }
     }
   };
   typedef ceph::unordered_map<uint64_t, Event> Events;
@@ -149,14 +167,18 @@ private:
   ReplayHandler m_replay_handler;
   bool m_close_pending;
 
-  uint64_t m_next_tid;
+  uint64_t m_event_tid;
   Events m_events;
 
   bool m_blocking_writes;
 
+  ::journal::Future wait_event(Mutex &lock, uint64_t tid, Context *on_safe);
+
   void create_journaler();
   void destroy_journaler();
 
+  void complete_event(Events::iterator it, int r);
+
   void handle_initialized(int r);
 
   void handle_replay_ready();
index b55eb3ffa203088391bd118932130bd7ff01581a..1e68214273bd65c9ca039c230cd11a69c3337911 100644 (file)
@@ -17,6 +17,7 @@
 #include "librbd/LibrbdWriteback.h"
 #include "librbd/AioCompletion.h"
 #include "librbd/ObjectMap.h"
+#include "librbd/Journal.h"
 
 #include "include/assert.h"
 
@@ -91,6 +92,79 @@ namespace librbd {
     LibrbdWriteback *m_wb_handler;
   };
 
+  struct C_WriteJournalCommit : public Context {
+    typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
+
+    ImageCtx *image_ctx;
+    std::string oid;
+    uint64_t object_no;
+    uint64_t off;
+    bufferlist bl;
+    SnapContext snapc;
+    Context *req_comp;
+    uint64_t journal_tid;
+    bool request_sent;
+
+    C_WriteJournalCommit(ImageCtx *_image_ctx, const std::string &_oid,
+                         uint64_t _object_no, uint64_t _off,
+                         const bufferlist &_bl, const SnapContext& _snapc,
+                         Context *_req_comp, uint64_t _journal_tid)
+      : image_ctx(_image_ctx), oid(_oid), object_no(_object_no), off(_off),
+        bl(_bl), snapc(_snapc), req_comp(_req_comp), journal_tid(_journal_tid),
+        request_sent(false) {
+      CephContext *cct = image_ctx->cct;
+      ldout(cct, 20) << this << " C_WriteJournalCommit: "
+                     << "delaying write until journal tid "
+                     << journal_tid << " safe" << dendl;
+    }
+
+    virtual void complete(int r) {
+      if (request_sent || r < 0) {
+        commit_event_extent(r);
+        req_comp->complete(r);
+        delete this;
+      } else {
+        send_request();
+      }
+    }
+
+    virtual void finish(int r) {
+    }
+
+    void commit_event_extent(int r) {
+      CephContext *cct = image_ctx->cct;
+      ldout(cct, 20) << this << " C_WriteJournalCommit: "
+                     << "write committed: updating journal commit position"
+                     << dendl;
+
+      // all IO operations are flushed prior to closing the journal
+      assert(image_ctx->journal != NULL);
+
+      Extents file_extents;
+      Striper::extent_to_file(cct, &image_ctx->layout, object_no, off,
+                              bl.length(), file_extents);
+      for (Extents::iterator it = file_extents.begin();
+           it != file_extents.end(); ++it) {
+        image_ctx->journal->commit_event_extent(journal_tid, it->first,
+                                                it->second, r);
+      }
+    }
+
+    void send_request() {
+      CephContext *cct = image_ctx->cct;
+      ldout(cct, 20) << this << " C_WriteJournalCommit: "
+                     << "journal committed: sending write request" << dendl;
+
+      RWLock::RLocker owner_locker(image_ctx->owner_lock);
+      assert(image_ctx->image_watcher->is_lock_owner());
+
+      request_sent = true;
+      AioObjectWrite *req = new AioObjectWrite(image_ctx, oid, object_no, off,
+                                               bl, snapc, this);
+      req->send();
+    }
+  };
+
   LibrbdWriteback::LibrbdWriteback(ImageCtx *ictx, Mutex& lock)
     : m_finisher(new Finisher(ictx->cct)), m_tid(0), m_lock(lock), m_ictx(ictx)
   {
@@ -169,19 +243,41 @@ namespace librbd {
     m_writes[oid.name].push(result);
     ldout(m_ictx->cct, 20) << "write will wait for result " << result << dendl;
     C_OrderedWrite *req_comp = new C_OrderedWrite(m_ictx->cct, result, this);
-    AioObjectWrite *req = new AioObjectWrite(m_ictx, oid.name, object_no, off,
-                                             bl, snapc, req_comp);
-    req->send();
+
+    // all IO operations are flushed prior to closing the journal
+    assert(journal_tid == 0 || m_ictx->journal != NULL);
+    if (journal_tid != 0) {
+      m_ictx->journal->flush_event(
+        journal_tid, new C_WriteJournalCommit(m_ictx, oid.name, object_no, off,
+                                              bl, snapc, req_comp,
+                                              journal_tid));
+    } else {
+      AioObjectWrite *req = new AioObjectWrite(m_ictx, oid.name, object_no, off,
+                                               bl, snapc, req_comp);
+      req->send();
+    }
     return ++m_tid;
   }
 
 
   void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off,
                                          uint64_t len, ceph_tid_t journal_tid) {
-    assert(journal_tid != 0);
+    typedef std::vector<std::pair<uint64_t,uint64_t> > Extents;
+
+    assert(m_ictx->owner_lock.is_locked());
+    uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
+
+    // all IO operations are flushed prior to closing the journal
+    assert(journal_tid != 0 && m_ictx->journal != NULL);
 
-    // TODO inform the journal that we no longer expect to receive writebacks
-    //      for the specified extent
+    Extents file_extents;
+    Striper::extent_to_file(m_ictx->cct, &m_ictx->layout, object_no, off,
+                            len, file_extents);
+    for (Extents::iterator it = file_extents.begin();
+         it != file_extents.end(); ++it) {
+      m_ictx->journal->commit_event_extent(journal_tid, it->first, it->second,
+                                           0);
+    }
   }
 
   void LibrbdWriteback::get_client_lock() {