]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
librbd: ensure ImageWatcher notifications are idempotent 3695/head
authorJason Dillaman <dillaman@redhat.com>
Fri, 6 Feb 2015 02:14:42 +0000 (21:14 -0500)
committerJason Dillaman <dillaman@redhat.com>
Fri, 6 Feb 2015 02:25:43 +0000 (21:25 -0500)
Ensure that ImageWatcher can support receiving duplicate notifications
from librados.  Resize and flatten now pass a fixed request id and
the lock owner matches those ids with in-progress requests. Also
protect against loopback notifications that might result in incorrect
behavior across exclusive lock states.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/librbd/ImageCtx.h
src/librbd/ImageWatcher.cc
src/librbd/ImageWatcher.h
src/librbd/internal.cc

index 9b401163fc7e679eaf63c718fe8883d56d648070..781b66b9576cb33fa622bdfe4a87f23a98b7c0ae 100644 (file)
@@ -16,6 +16,7 @@
 #include "common/Readahead.h"
 #include "common/RWLock.h"
 #include "common/snap_types.h"
+#include "include/atomic.h"
 #include "include/buffer.h"
 #include "include/rbd/librbd.hpp"
 #include "include/rbd_types.h"
@@ -114,6 +115,8 @@ namespace librbd {
 
     ObjectMap *object_map;
 
+    atomic_t async_request_seq;
+
     /**
      * Either image_name or image_id must be set.
      * If id is not known, pass the empty std::string,
index a533f1f4a9a24733db29ade9dd22c260df64a2f9..30e0a19618511d202fdb7983558b11a296c62322 100644 (file)
@@ -70,7 +70,6 @@ ImageWatcher::ImageWatcher(ImageCtx &image_ctx)
     m_timer_lock("librbd::ImageWatcher::m_timer_lock"),
     m_timer(new SafeTimer(image_ctx.cct, m_timer_lock)),
     m_async_request_lock("librbd::ImageWatcher::m_async_request_lock"),
-    m_async_request_id(0),
     m_aio_request_lock("librbd::ImageWatcher::m_aio_request_lock"),
     m_retry_aio_context(NULL)
 {
@@ -212,19 +211,15 @@ int ImageWatcher::request_lock(
     }
   }
 
-  {
-    // aio operations will be retried once the the watch is re-established
-    RWLock::RLocker l(m_watch_lock);
-    if (m_watch_state == WATCH_STATE_ERROR) {
-      return 0;
-    }
-  }
+  RWLock::RLocker l(m_watch_lock);
+  if (m_watch_state == WATCH_STATE_REGISTERED) {
+    ldout(m_image_ctx.cct, 10) << "requesting exclusive lock" << dendl;
 
-  // run notify request in finisher to avoid blocking aio path
-  FunctionContext *ctx = new FunctionContext(
-    boost::bind(&ImageWatcher::notify_request_lock, this));
-  m_finisher->queue(ctx);
-  ldout(m_image_ctx.cct, 10) << "requesting exclusive lock" << dendl;
+    // run notify request in finisher to avoid blocking aio path
+    FunctionContext *ctx = new FunctionContext(
+      boost::bind(&ImageWatcher::notify_request_lock, this));
+    m_finisher->queue(ctx);
+  }
   return 0;
 }
 
@@ -270,9 +265,8 @@ void ImageWatcher::finalize_request_lock() {
   }
   if (owned_lock) {
     retry_aio_requests();
-    
   } else {
-    schedule_retry_aio_requests();
+    schedule_retry_aio_requests(true);
   }
 }
 
@@ -426,6 +420,13 @@ int ImageWatcher::notify_async_progress(const RemoteAsyncRequest &request,
   return 0;
 }
 
+void ImageWatcher::schedule_async_complete(const RemoteAsyncRequest &request,
+                                          int r) {
+  FunctionContext *ctx = new FunctionContext(
+    boost::bind(&ImageWatcher::notify_async_complete, this, request, r));
+  m_finisher->queue(ctx);
+}
+
 int ImageWatcher::notify_async_complete(const RemoteAsyncRequest &request,
                                        int r) {
   ldout(m_image_ctx.cct, 20) << "remote async request finished: "
@@ -440,37 +441,47 @@ int ImageWatcher::notify_async_complete(const RemoteAsyncRequest &request,
 
   librbd::notify_change(m_image_ctx.md_ctx, m_image_ctx.header_oid,
                        &m_image_ctx);
-  m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl, NOTIFY_TIMEOUT, NULL);
+  int ret = m_image_ctx.md_ctx.notify2(m_image_ctx.header_oid, bl,
+                                      NOTIFY_TIMEOUT, NULL);
+  if (ret < 0) {
+    lderr(m_image_ctx.cct) << "failed to notify async complete: "
+                          << cpp_strerror(ret) << dendl;
+    if (ret == -ETIMEDOUT) {
+      schedule_async_complete(request, r);
+    }
+  } else {
+    RWLock::WLocker l(m_async_request_lock);
+    m_async_pending.erase(request);
+  }
   return 0;
 }
 
-int ImageWatcher::notify_flatten(ProgressContext &prog_ctx) {
+int ImageWatcher::notify_flatten(uint64_t request_id, ProgressContext &prog_ctx) {
   assert(m_image_ctx.owner_lock.is_locked());
   assert(!is_lock_owner());
 
   bufferlist bl;
-  uint64_t async_request_id;
   ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
   ::encode(NOTIFY_OP_FLATTEN, bl);
-  async_request_id = encode_async_request(bl);
+  encode_async_request(request_id, bl);
   ENCODE_FINISH(bl);
 
-  return notify_async_request(async_request_id, bl, prog_ctx);
+  return notify_async_request(request_id, bl, prog_ctx);
 }
 
-int ImageWatcher::notify_resize(uint64_t size, ProgressContext &prog_ctx) {
+int ImageWatcher::notify_resize(uint64_t request_id, uint64_t size,
+                               ProgressContext &prog_ctx) {
   assert(m_image_ctx.owner_lock.is_locked());
   assert(!is_lock_owner());
 
   bufferlist bl;
-  uint64_t async_request_id;
   ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, bl);
   ::encode(NOTIFY_OP_RESIZE, bl);
   ::encode(size, bl);
-  async_request_id = encode_async_request(bl);
+  encode_async_request(request_id, bl);
   ENCODE_FINISH(bl);
 
-  return notify_async_request(async_request_id, bl, prog_ctx);
+  return notify_async_request(request_id, bl, prog_ctx);
 }
 
 int ImageWatcher::notify_snap_create(const std::string &snap_name) {
@@ -520,12 +531,21 @@ bool ImageWatcher::decode_lock_cookie(const std::string &tag,
   return true;
 }
 
-void ImageWatcher::schedule_retry_aio_requests() {
+void ImageWatcher::schedule_retry_aio_requests(bool use_timer) {
   Mutex::Locker l(m_timer_lock);
-  if (m_retry_aio_context == NULL) {
-    m_retry_aio_context = new FunctionContext(boost::bind(
-      &ImageWatcher::finalize_retry_aio_requests, this));
-    m_timer->add_event_after(RETRY_DELAY_SECONDS, m_retry_aio_context);
+  if (use_timer) {
+    if (m_retry_aio_context == NULL) {
+      m_retry_aio_context = new FunctionContext(boost::bind(
+       &ImageWatcher::finalize_retry_aio_requests, this));
+      m_timer->add_event_after(RETRY_DELAY_SECONDS, m_retry_aio_context);
+    }
+  } else {
+    m_timer->cancel_event(m_retry_aio_context);
+    m_retry_aio_context = NULL;
+
+    Context *ctx = new FunctionContext(boost::bind(
+      &ImageWatcher::retry_aio_requests, this));
+    m_finisher->queue(ctx);
   }
 }
 
@@ -559,6 +579,12 @@ void ImageWatcher::retry_aio_requests() {
   }
 }
 
+void ImageWatcher::schedule_cancel_async_requests() {
+  FunctionContext *ctx = new FunctionContext(
+    boost::bind(&ImageWatcher::cancel_async_requests, this));
+  m_finisher->queue(ctx);
+}
+
 void ImageWatcher::cancel_async_requests() {
   RWLock::WLocker l(m_async_request_lock);
   for (std::map<uint64_t, AsyncRequest>::iterator iter = m_async_requests.begin();
@@ -568,16 +594,12 @@ void ImageWatcher::cancel_async_requests() {
   m_async_requests.clear();
 }
 
-uint64_t ImageWatcher::encode_async_request(bufferlist &bl) {
-  RWLock::WLocker l(m_async_request_lock);
-  ++m_async_request_id;
-
+void ImageWatcher::encode_async_request(uint64_t request_id, bufferlist &bl) {
   RemoteAsyncRequest request(m_image_ctx.md_ctx.get_instance_id(),
-                            m_watch_handle, m_async_request_id);
+                            reinterpret_cast<uint64_t>(this), request_id);
   ::encode(request, bl);
 
   ldout(m_image_ctx.cct, 10) << "async request: " << request << dendl;
-  return m_async_request_id;
 }
 
 int ImageWatcher::decode_response_code(bufferlist &bl) {
@@ -628,7 +650,7 @@ void ImageWatcher::notify_request_lock() {
   } else if (r < 0) {
     lderr(m_image_ctx.cct) << "error requesting lock: " << cpp_strerror(r)
                           << dendl;
-    schedule_retry_aio_requests();
+    schedule_retry_aio_requests(true);
   }
 }
 
@@ -718,15 +740,14 @@ int ImageWatcher::notify_async_request(uint64_t async_request_id,
   return r;
 }
 
-void ImageWatcher::schedule_update_progress(
-    const RemoteAsyncRequest &remote_async_request,
-    uint64_t offset, uint64_t total) {
+void ImageWatcher::schedule_async_progress(const RemoteAsyncRequest &request,
+                                          uint64_t offset, uint64_t total) {
   RWLock::WLocker l(m_async_request_lock);
-  if (m_async_progress.count(remote_async_request) == 0) {
-    m_async_progress.insert(remote_async_request);
+  if (m_async_progress.count(request) == 0) {
+    m_async_progress.insert(request);
     FunctionContext *ctx = new FunctionContext(
-      boost::bind(&ImageWatcher::notify_async_progress,
-                 this, remote_async_request, offset, total));
+      boost::bind(&ImageWatcher::notify_async_progress, this, request, offset,
+                 total));
     m_finisher->queue(ctx);
   }
 }
@@ -741,32 +762,24 @@ void ImageWatcher::handle_header_update() {
 
 void ImageWatcher::handle_acquired_lock() {
   ldout(m_image_ctx.cct, 10) << "image exclusively locked announcement" << dendl;
-  FunctionContext *ctx = new FunctionContext(
-    boost::bind(&ImageWatcher::cancel_async_requests, this));
-  m_finisher->queue(ctx);
+  RWLock::RLocker l(m_image_ctx.owner_lock);
+  if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
+    schedule_cancel_async_requests();
+  }
 }
 
 void ImageWatcher::handle_released_lock() {
   ldout(m_image_ctx.cct, 10) << "exclusive lock released" << dendl;
   RWLock::RLocker l(m_image_ctx.owner_lock);
   if (m_lock_owner_state == LOCK_OWNER_STATE_NOT_LOCKED) {
-   FunctionContext *ctx = new FunctionContext(
-      boost::bind(&ImageWatcher::cancel_async_requests, this));
-    m_finisher->queue(ctx);
-
-    Mutex::Locker l(m_aio_request_lock);
-    if (!m_aio_requests.empty()) {
-      ldout(m_image_ctx.cct, 20) << "queuing lock request" << dendl;
-      FunctionContext *req_ctx = new FunctionContext(
-       boost::bind(&ImageWatcher::finalize_request_lock, this));
-      m_finisher->queue(req_ctx);
-    }
+    schedule_cancel_async_requests();
+    schedule_retry_aio_requests(false);
   }
 }
 
 void ImageWatcher::handle_request_lock(bufferlist *out) {
   RWLock::WLocker l(m_image_ctx.owner_lock);
-  if (is_lock_owner()) {
+  if (m_lock_owner_state == LOCK_OWNER_STATE_LOCKED) {
     m_lock_owner_state = LOCK_OWNER_STATE_RELEASING;
 
     // need to send something back so the client can detect a missing leader
@@ -790,7 +803,7 @@ void ImageWatcher::handle_async_progress(bufferlist::iterator iter) {
   ::decode(offset, iter);
   ::decode(total, iter);
   if (request.gid == m_image_ctx.md_ctx.get_instance_id() &&
-      request.handle == m_watch_handle) {
+      request.handle == reinterpret_cast<uint64_t>(this)) {
     RWLock::RLocker l(m_async_request_lock);
     std::map<uint64_t, AsyncRequest>::iterator iter =
       m_async_requests.find(request.request_id);
@@ -810,7 +823,7 @@ void ImageWatcher::handle_async_complete(bufferlist::iterator iter) {
   int r;
   ::decode(r, iter);
   if (request.gid == m_image_ctx.md_ctx.get_instance_id() &&
-      request.handle == m_watch_handle) {
+      request.handle == reinterpret_cast<uint64_t>(this)) {
     Context *ctx = NULL;
     {
       RWLock::RLocker l(m_async_request_lock);
@@ -834,14 +847,22 @@ void ImageWatcher::handle_flatten(bufferlist::iterator iter, bufferlist *out) {
     RemoteAsyncRequest request;
     ::decode(request, iter);
 
-    RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this,
-                                                               request);
-    RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
+    int r = 0;
+    RWLock::WLocker l(m_async_request_lock);
+    if (m_async_pending.count(request) == 0) {
+      RemoteProgressContext *prog_ctx =
+       new RemoteProgressContext(*this, request);
+      RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
 
-    ldout(m_image_ctx.cct, 10) << "remote flatten request: " << request << dendl;
-    int r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
-    if (r < 0) {
-      delete ctx;
+      ldout(m_image_ctx.cct, 10) << "remote flatten request: " << request << dendl;
+      r = librbd::async_flatten(&m_image_ctx, ctx, *prog_ctx);
+      if (r < 0) {
+       delete ctx;
+       lderr(m_image_ctx.cct) << "remove flatten request failed: "
+                              << cpp_strerror(r) << dendl;
+      } else {
+       m_async_pending.insert(request);
+      }
     }
 
     ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
@@ -859,15 +880,23 @@ void ImageWatcher::handle_resize(bufferlist::iterator iter, bufferlist *out) {
     RemoteAsyncRequest request;
     ::decode(request, iter);
 
-    RemoteProgressContext *prog_ctx = new RemoteProgressContext(*this,
-                                                               request);
-    RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
-
-    ldout(m_image_ctx.cct, 10) << "remote resize request: " << request
-                              << " " << size << dendl;
-    int r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx);
-    if (r < 0) {
-      delete ctx;
+    int r = 0;
+    RWLock::WLocker l(m_async_request_lock);
+    if (m_async_pending.count(request) == 0) {
+      RemoteProgressContext *prog_ctx =
+       new RemoteProgressContext(*this, request);
+      RemoteContext *ctx = new RemoteContext(*this, request, prog_ctx);
+
+      ldout(m_image_ctx.cct, 10) << "remote resize request: " << request
+                                << " " << size << dendl;
+      r = librbd::async_resize(&m_image_ctx, ctx, size, *prog_ctx);
+      if (r < 0) {
+       lderr(m_image_ctx.cct) << "remove resize request failed: "
+                              << cpp_strerror(r) << dendl;
+       delete ctx;
+      } else {
+       m_async_pending.insert(request);
+      }
     }
 
     ENCODE_START(NOTIFY_VERSION, NOTIFY_VERSION, *out);
@@ -916,6 +945,12 @@ void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
     return;
   }
 
+  bool loopback;
+  {
+    RWLock::RLocker l(m_watch_lock);
+    loopback = (m_watch_handle == handle);
+  }
+
   bufferlist::iterator iter = bl.begin();
   try {
     DECODE_START(NOTIFY_VERSION, iter);
@@ -923,51 +958,54 @@ void ImageWatcher::handle_notify(uint64_t notify_id, uint64_t handle,
     ::decode(op, iter);
 
     bufferlist out;
-    switch (op) {
-    // client ops
-    case NOTIFY_OP_ACQUIRED_LOCK:
-      acknowledge_notify(notify_id, handle, out);
-      handle_acquired_lock();
-      break;
-    case NOTIFY_OP_RELEASED_LOCK:
-      acknowledge_notify(notify_id, handle, out);
-      handle_released_lock();
-      break;
-    case NOTIFY_OP_HEADER_UPDATE:
-      acknowledge_notify(notify_id, handle, out);
-      handle_header_update();
-      break;
-    case NOTIFY_OP_ASYNC_PROGRESS:
-      acknowledge_notify(notify_id, handle, out);
-      handle_async_progress(iter);
-      break;
-    case NOTIFY_OP_ASYNC_COMPLETE:
-      acknowledge_notify(notify_id, handle, out);
-      handle_async_complete(iter);
-      break;
-
-    // lock owner-only ops
-    case NOTIFY_OP_REQUEST_LOCK:
-      handle_request_lock(&out);
-      acknowledge_notify(notify_id, handle, out);
-      break;
-    case NOTIFY_OP_FLATTEN:
-      handle_flatten(iter, &out);
-      acknowledge_notify(notify_id, handle, out);
-      break;
-    case NOTIFY_OP_RESIZE:
-      handle_resize(iter, &out);
+    if (loopback && op != NOTIFY_OP_HEADER_UPDATE) {
       acknowledge_notify(notify_id, handle, out);
-      break;
-    case NOTIFY_OP_SNAP_CREATE:
-      handle_snap_create(iter, &out);
-      acknowledge_notify(notify_id, handle, out);
-      break;
-
-    default:
-      handle_unknown_op(&out);
-      acknowledge_notify(notify_id, handle, out);
-      break;
+    } else {
+      switch (op) {
+      // client ops
+      case NOTIFY_OP_ACQUIRED_LOCK:
+        acknowledge_notify(notify_id, handle, out);
+        handle_acquired_lock();
+        break;
+      case NOTIFY_OP_RELEASED_LOCK:
+        acknowledge_notify(notify_id, handle, out);
+        handle_released_lock();
+        break;
+      case NOTIFY_OP_HEADER_UPDATE:
+        acknowledge_notify(notify_id, handle, out);
+        handle_header_update();
+        break;
+      case NOTIFY_OP_ASYNC_PROGRESS:
+        acknowledge_notify(notify_id, handle, out);
+        handle_async_progress(iter);
+        break;
+      case NOTIFY_OP_ASYNC_COMPLETE:
+        acknowledge_notify(notify_id, handle, out);
+        handle_async_complete(iter);
+        break;
+
+      // lock owner-only ops
+      case NOTIFY_OP_REQUEST_LOCK:
+        handle_request_lock(&out);
+        acknowledge_notify(notify_id, handle, out);
+        break;
+      case NOTIFY_OP_FLATTEN:
+        handle_flatten(iter, &out);
+        acknowledge_notify(notify_id, handle, out);
+        break;
+      case NOTIFY_OP_RESIZE:
+        handle_resize(iter, &out);
+        acknowledge_notify(notify_id, handle, out);
+        break;
+      case NOTIFY_OP_SNAP_CREATE:
+        handle_snap_create(iter, &out);
+        acknowledge_notify(notify_id, handle, out);
+        break;
+      default:
+        handle_unknown_op(&out);
+        acknowledge_notify(notify_id, handle, out);
+        break;
+      }
     }
     DECODE_FINISH(iter);
   } catch (const buffer::error &err) {
@@ -1040,7 +1078,6 @@ void ImageWatcher::reregister_watch() {
     }
   }
 
-  Mutex::Locker l(m_timer_lock);
   retry_aio_requests();
 }
 
@@ -1056,10 +1093,7 @@ void ImageWatcher::WatchCtx::handle_error(uint64_t handle, int err) {
 }
 
 void ImageWatcher::RemoteContext::finish(int r) {
-    FunctionContext *ctx = new FunctionContext(
-      boost::bind(&ImageWatcher::notify_async_complete,
-                 &m_image_watcher, m_remote_async_request, r));
-    m_image_watcher.m_finisher->queue(ctx);
+  m_image_watcher.schedule_async_complete(m_remote_async_request, r);
 }
 
 }
index 11e03715ae3f4347c3a0f401ba4060722de02650..c274c969ab0140e8a5804e8d920c4017ccb0caec 100644 (file)
@@ -63,13 +63,9 @@ namespace librbd {
 
     void assert_header_locked(librados::ObjectWriteOperation *op);
 
-    int notify_async_progress(const RemoteAsyncRequest &remote_async_request,
-                             uint64_t offset, uint64_t total);
-    int notify_async_complete(const RemoteAsyncRequest &remote_async_request,
-                             int r);
-
-    int notify_flatten(ProgressContext &prog_ctx);
-    int notify_resize(uint64_t size, ProgressContext &prog_ctx);
+    int notify_flatten(uint64_t request_id, ProgressContext &prog_ctx);
+    int notify_resize(uint64_t request_id, uint64_t size,
+                     ProgressContext &prog_ctx);
     int notify_snap_create(const std::string &snap_name);
 
     static void notify_header_update(librados::IoCtx &io_ctx,
@@ -115,8 +111,8 @@ namespace librbd {
       }
 
       virtual int update_progress(uint64_t offset, uint64_t total) {
-       m_image_watcher.schedule_update_progress(
-         m_remote_async_request, offset, total);
+       m_image_watcher.schedule_async_progress(m_remote_async_request, offset,
+                                               total);
         return 0;
       }
 
@@ -162,8 +158,8 @@ namespace librbd {
     SafeTimer *m_timer;
 
     RWLock m_async_request_lock;
-    uint64_t m_async_request_id;
     std::map<uint64_t, AsyncRequest> m_async_requests;
+    std::set<RemoteAsyncRequest> m_async_pending;
     std::set<RemoteAsyncRequest> m_async_progress;
 
     Mutex m_aio_request_lock;
@@ -181,14 +177,15 @@ namespace librbd {
     void finalize_request_lock();
     void finalize_header_update();
 
-    void schedule_retry_aio_requests();
+    void schedule_retry_aio_requests(bool use_timer);
     void cancel_retry_aio_requests();
     void finalize_retry_aio_requests();
     void retry_aio_requests();
 
+    void schedule_cancel_async_requests();
     void cancel_async_requests();
 
-    uint64_t encode_async_request(bufferlist &bl);
+    void encode_async_request(uint64_t request_id, bufferlist &bl);
     static int decode_response_code(bufferlist &bl);
 
     void notify_released_lock();
@@ -199,8 +196,14 @@ namespace librbd {
                             ProgressContext& prog_ctx);
     void notify_request_leadership();
 
-    void schedule_update_progress(const RemoteAsyncRequest &remote_async_request,
-                                 uint64_t offset, uint64_t total);
+    void schedule_async_progress(const RemoteAsyncRequest &remote_async_request,
+                                uint64_t offset, uint64_t total);
+    int notify_async_progress(const RemoteAsyncRequest &remote_async_request,
+                             uint64_t offset, uint64_t total);
+    void schedule_async_complete(const RemoteAsyncRequest &remote_async_request,
+                                int r);
+    int notify_async_complete(const RemoteAsyncRequest &remote_async_request,
+                             int r);
 
     void handle_header_update();
     void handle_acquired_lock();
index 2049553cbfa509ecba91e2eb58f225a46af723fd..0765a3ac0e61b531386585bc386036bd9a1598ce 100644 (file)
@@ -1601,6 +1601,7 @@ reprotect_and_return_err:
     ldout(cct, 20) << "resize " << ictx << " " << ictx->size << " -> "
                   << size << dendl;
 
+    uint64_t request_id = ictx->async_request_seq.inc();
     int r;
     do {
       C_SaferCond *ctx;
@@ -1614,7 +1615,7 @@ reprotect_and_return_err:
            break;
          }
 
-         r = ictx->image_watcher->notify_resize(size, prog_ctx);
+         r = ictx->image_watcher->notify_resize(request_id, size, prog_ctx);
          if (r != -ETIMEDOUT && r != -ERESTART) {
            return r;
          }
@@ -2409,6 +2410,7 @@ reprotect_and_return_err:
       return -EROFS;
     }
 
+    uint64_t request_id = ictx->async_request_seq.inc();
     int r;
     do {
       C_SaferCond *ctx;
@@ -2422,7 +2424,7 @@ reprotect_and_return_err:
            break;
          }
 
-          r = ictx->image_watcher->notify_flatten(prog_ctx);
+          r = ictx->image_watcher->notify_flatten(request_id, prog_ctx);
           if (r != -ETIMEDOUT && r != -ERESTART) {
             return r;
           }