namespace librbd {
 namespace io {
 
+template <typename I>
+void ImageDispatchSpec<I>::C_Dispatcher::complete(int r) {
+  switch (image_dispatch_spec->dispatch_result) {
+  case DISPATCH_RESULT_RESTART:
+    ceph_assert(image_dispatch_spec->dispatch_layer != 0);
+    image_dispatch_spec->dispatch_layer = static_cast<ImageDispatchLayer>(
+      image_dispatch_spec->dispatch_layer - 1);
+    [[fallthrough]];
+  case DISPATCH_RESULT_CONTINUE:
+    if (r < 0) {
+      // bubble dispatch failure through AioCompletion
+      image_dispatch_spec->dispatch_result = DISPATCH_RESULT_COMPLETE;
+      image_dispatch_spec->fail(r);
+      return;
+    }
+
+    image_dispatch_spec->send();
+    break;
+  case DISPATCH_RESULT_COMPLETE:
+    finish(r);
+    break;
+  case DISPATCH_RESULT_INVALID:
+    ceph_abort();
+    break;
+  }
+}
+
+template <typename I>
+void ImageDispatchSpec<I>::C_Dispatcher::finish(int r) {
+  image_dispatch_spec->finish(r);
+}
+
 template <typename I>
 struct ImageDispatchSpec<I>::SendVisitor
   : public boost::static_visitor<void> {
   boost::apply_visitor(SendVisitor{this}, m_request);
 }
 
+template <typename I>
+void ImageDispatchSpec<I>::finish(int r) {
+  delete this;
+}
+
 template <typename I>
 void ImageDispatchSpec<I>::fail(int r) {
   m_aio_comp->fail(r);
 
 
 #include "include/int_types.h"
 #include "include/buffer.h"
+#include "include/Context.h"
 #include "common/zipkin_trace.h"
 #include "librbd/io/AioCompletion.h"
 #include "librbd/io/Types.h"
 
 template <typename ImageCtxT = ImageCtx>
 class ImageDispatchSpec {
+private:
+  // helper to avoid extra heap allocation per object IO
+  struct C_Dispatcher : public Context {
+    ImageDispatchSpec* image_dispatch_spec;
+
+    C_Dispatcher(ImageDispatchSpec* image_dispatch_spec)
+      : image_dispatch_spec(image_dispatch_spec) {
+    }
+
+    void complete(int r) override;
+    void finish(int r) override;
+  };
+
 public:
   struct Read {
     ReadResult read_result;
     }
   };
 
+  C_Dispatcher dispatcher_ctx;
+  ImageDispatchLayer dispatch_layer = IMAGE_DISPATCH_LAYER_NONE;
+  DispatchResult dispatch_result = DISPATCH_RESULT_INVALID;
+
   static ImageDispatchSpec* create_read_request(
       ImageCtxT &image_ctx, AioCompletion *aio_comp, Extents &&image_extents,
       ReadResult &&read_result, int op_flags,
   ImageDispatchSpec(ImageCtxT& image_ctx, AioCompletion* aio_comp,
                      Extents&& image_extents, Request&& request,
                      int op_flags, const ZTracer::Trace& parent_trace, uint64_t tid)
-    : m_image_ctx(image_ctx), m_aio_comp(aio_comp),
+    : dispatcher_ctx(this), m_image_ctx(image_ctx), m_aio_comp(aio_comp),
       m_image_extents(std::move(image_extents)), m_request(std::move(request)),
       m_op_flags(op_flags), m_parent_trace(parent_trace), m_tid(tid) {
     m_aio_comp->get();
   uint64_t m_tid;
   std::atomic<uint64_t> m_throttled_flag = 0;
 
+  void finish(int r);
+
   uint64_t extents_length();
 };
 
 
     finish(r);
     break;
   case DISPATCH_RESULT_INVALID:
+  case DISPATCH_RESULT_RESTART:
     ceph_abort();
     break;
   }
 
 
 enum DispatchResult {
   DISPATCH_RESULT_INVALID,
+  DISPATCH_RESULT_RESTART,
   DISPATCH_RESULT_CONTINUE,
   DISPATCH_RESULT_COMPLETE
 };
 
+enum ImageDispatchLayer {
+  IMAGE_DISPATCH_LAYER_NONE = 0,
+  IMAGE_DISPATCH_LAYER_QUEUE,
+  IMAGE_DISPATCH_LAYER_QOS,
+  IMAGE_DISPATCH_LAYER_EXCLUSIVE_LOCK,
+  IMAGE_DISPATCH_LAYER_REFRESH,
+  IMAGE_DISPATCH_LAYER_JOURNAL,
+  IMAGE_DISPATCH_LAYER_WRITEBACK_CACHE,
+  IMAGE_DISPATCH_LAYER_CORE,
+  IMAGE_DISPATCH_LAYER_LAST
+};
+
 enum ObjectDispatchLayer {
   OBJECT_DISPATCH_LAYER_NONE = 0,
   OBJECT_DISPATCH_LAYER_CACHE,