]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
cls/fifo: Fix race condition on `_prepare_new_head`
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 2 Nov 2022 02:49:12 +0000 (22:49 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 9 Jan 2023 22:27:34 +0000 (17:27 -0500)
First, make `_prepare_new_head` take the new head part number, so two
calls racing from the same push will attempt to create the same head.

Also remove the neorados FIFO since it doesn't have all the bug fixes
in the legacy version and will be rewritten in terms of `async_compose`
anyway.

Fixes: https://tracker.ceph.com/issues/57562
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/driver/rados/cls_fifo_legacy.cc
src/rgw/driver/rados/cls_fifo_legacy.h

index 82d6c88a3b38fb63f74aaa6a6a95b271ca6a7d41..0272e9abdab3d253f17620507e5b9d2b85d68344 100644 (file)
@@ -419,7 +419,7 @@ int FIFO::apply_update(const DoutPrefixProvider *dpp,
   std::unique_lock l(m);
   if (objv != info->version) {
     ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
-              << " version mismatch, canceling: tid=" << tid << dendl;
+                      << " version mismatch, canceling: tid=" << tid << dendl;
     return -ECANCELED;
   }
 
@@ -435,7 +435,7 @@ int FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update
                 << " entering: tid=" << tid << dendl;
   lr::ObjectWriteOperation op;
   bool canceled = false;
-  update_meta(&op, info.version, update);
+  update_meta(&op, version, update);
   auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
   if (r >= 0 || r == -ECANCELED) {
     canceled = (r == -ECANCELED);
@@ -709,16 +709,17 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti
   return r;
 }
 
-int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, optional_yield y)
+int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp,
+                           std::int64_t new_part_num, bool is_head,
+                           std::uint64_t tid, optional_yield y)
 {
   ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
   std::unique_lock l(m);
   using enum fifo::journal_entry::Op;
-  std::vector<fifo::journal_entry> jentries{{
-      create, info.max_push_part_num + 1
-    }};
-  if (info.journal.contains(jentries.front())) {
+  std::vector<fifo::journal_entry> jentries{{ create, new_part_num }};
+  if (info.journal.contains({create, new_part_num}) &&
+      (!is_head || info.journal.contains({set_head, new_part_num}))) {
     l.unlock();
     ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
                  << " new part journaled, but not processed: tid="
@@ -730,13 +731,12 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
     }
     return r;
   }
-  std::int64_t new_head_part_num = info.head_part_num;
   auto version = info.version;
 
   if (is_head) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " needs new head: tid=" << tid << dendl;
-    jentries.push_back({ set_head, jentries.front().part_num });
+    jentries.push_back({ set_head, new_part_num });
   }
   l.unlock();
 
@@ -750,10 +750,11 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
     r = _update_meta(dpp, u, version, &canceled, tid, y);
     if (r >= 0 && canceled) {
       std::unique_lock l(m);
-      auto found = (info.journal.contains({create, jentries.front().part_num}) ||
-                   info.journal.contains({set_head, jentries.front().part_num}));
-      if ((info.max_push_part_num >= jentries.front().part_num &&
-          info.head_part_num >= new_head_part_num)) {
+      version = info.version;
+      auto found = (info.journal.contains({create, new_part_num}) ||
+                   info.journal.contains({set_head, new_part_num}));
+      if ((info.max_push_part_num >= new_part_num &&
+          info.head_part_num >= new_part_num)) {
        ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                       << " raced, but journaled and processed: i=" << i
                       << " tid=" << tid << dendl;
@@ -787,21 +788,22 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
   return r;
 }
 
-int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
+int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp,
+                           std::int64_t new_head_part_num,
+                           std::uint64_t tid, optional_yield y)
 {
   ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " entering: tid=" << tid << dendl;
+                    << " entering: tid=" << tid << dendl;
   std::unique_lock l(m);
-  std::int64_t new_head_num = info.head_part_num + 1;
   auto max_push_part_num = info.max_push_part_num;
   auto version = info.version;
   l.unlock();
 
   int r = 0;
-  if (max_push_part_num < new_head_num) {
+  if (max_push_part_num < new_head_part_num) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " need new part: tid=" << tid << dendl;
-    r = _prepare_new_part(dpp, true, tid, y);
+    r = _prepare_new_part(dpp, new_head_part_num, true, tid, y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " _prepare_new_part failed: r=" << r
@@ -809,7 +811,7 @@ int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, op
       return r;
     }
     std::unique_lock l(m);
-    if (info.max_push_part_num < new_head_num) {
+    if (info.max_push_part_num < new_head_part_num) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " inconsistency, push part less than head part: "
                 << " tid=" << tid << dendl;
@@ -819,51 +821,72 @@ int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, op
     return 0;
   }
 
+  using enum fifo::journal_entry::Op;
+  fifo::journal_entry jentry;
+  jentry.op = set_head;
+  jentry.part_num = new_head_part_num;
+
+  r = 0;
   bool canceled = true;
   for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+    canceled = false;
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                  << " updating head: i=" << i << " tid=" << tid << dendl;
-    auto u = fifo::update{}.head_part_num(new_head_num);
+                  << " updating metadata: i=" << i << " tid=" << tid << dendl;
+    auto u = fifo::update{}.journal_entries_add({{ jentry }});
     r = _update_meta(dpp, u, version, &canceled, tid, y);
+    if (r >= 0 && canceled) {
+      std::unique_lock l(m);
+      auto found = (info.journal.contains({create, new_head_part_num}) ||
+                   info.journal.contains({set_head, new_head_part_num}));
+      version = info.version;
+      if ((info.head_part_num >= new_head_part_num)) {
+       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " raced, but journaled and processed: i=" << i
+                      << " tid=" << tid << dendl;
+       return 0;
+      }
+      if (found) {
+       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " raced, journaled but not processed: i=" << i
+                      << " tid=" << tid << dendl;
+       canceled = false;
+      }
+      l.unlock();
+    }
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " _update_meta failed: update=" << u << " r=" << r
                 << " tid=" << tid << dendl;
       return r;
     }
-    std::unique_lock l(m);
-    auto head_part_num = info.head_part_num;
-    version = info.version;
-    l.unlock();
-    if (canceled && (head_part_num >= new_head_num)) {
-      ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                    << " raced, but completed by the other caller: i=" << i
-                    << " tid=" << tid << dendl;
-      canceled = false;
-    }
   }
   if (canceled) {
     ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
               << " canceled too many times, giving up: tid=" << tid << dendl;
     return -ECANCELED;
   }
-  return 0;
+  r = process_journal(dpp, tid, y);
+  if (r < 0) {
+    ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+              << " process_journal failed: r=" << r << " tid=" << tid << dendl;
+  }
+  return r;
 }
 
 struct NewPartPreparer : public Completion<NewPartPreparer> {
   FIFO* f;
   std::vector<fifo::journal_entry> jentries;
   int i = 0;
-  std::int64_t new_head_part_num;
+  std::int64_t new_part_num;
   bool canceled = false;
   uint64_t tid;
 
   NewPartPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
                  std::vector<fifo::journal_entry> jentries,
-                 std::int64_t new_head_part_num,
+                 std::int64_t new_part_num,
                  std::uint64_t tid)
     : Completion(dpp, super), f(f), jentries(std::move(jentries)),
-      new_head_part_num(new_head_part_num), tid(tid) {}
+      new_part_num(new_part_num), tid(tid) {}
 
   void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
@@ -879,14 +902,14 @@ struct NewPartPreparer : public Completion<NewPartPreparer> {
     if (canceled) {
       using enum fifo::journal_entry::Op;
       std::unique_lock l(f->m);
-      auto found = (f->info.journal.contains({create, jentries.front().part_num}) ||
-                   f->info.journal.contains({set_head, jentries.front().part_num}));
+      auto found = (f->info.journal.contains({create, new_part_num}) ||
+                   f->info.journal.contains({set_head, new_part_num}));
       auto max_push_part_num = f->info.max_push_part_num;
       auto head_part_num = f->info.head_part_num;
       auto version = f->info.version;
       l.unlock();
-      if ((max_push_part_num >= jentries.front().part_num &&
-          head_part_num >= new_head_part_num)) {
+      if ((max_push_part_num >= new_part_num &&
+          head_part_num >= new_part_num)) {
        ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                          << " raced, but journaled and processed: i=" << i
                          << " tid=" << tid << dendl;
@@ -916,16 +939,14 @@ struct NewPartPreparer : public Completion<NewPartPreparer> {
   }
 };
 
-void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid,
-                            lr::AioCompletion* c)
+void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num,
+                            bool is_head, std::uint64_t tid, lr::AioCompletion* c)
 {
   std::unique_lock l(m);
   using enum fifo::journal_entry::Op;
-  std::vector<fifo::journal_entry> jentries{{
-      create, info.max_push_part_num + 1
-    }};
-  if (info.journal.contains({create, jentries.front().part_num}) &&
-      (!is_head || info.journal.contains({set_head, jentries.front().part_num}))) {
+  std::vector<fifo::journal_entry> jentries{{create, new_part_num}};
+  if (info.journal.contains({create, new_part_num}) &&
+      (!is_head || info.journal.contains({set_head, new_part_num}))) {
     l.unlock();
     ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
                  << " new part journaled, but not processed: tid="
@@ -933,16 +954,15 @@ void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::u
     process_journal(dpp, tid, c);
     return;
   }
-  std::int64_t new_head_part_num = info.head_part_num;
   auto version = info.version;
 
   if (is_head) {
-    jentries.push_back({ set_head, jentries.front().part_num });
+    jentries.push_back({ set_head, new_part_num });
   }
   l.unlock();
 
   auto n = std::make_unique<NewPartPreparer>(dpp, this, c, jentries,
-                                            new_head_part_num, tid);
+                                            new_part_num, tid);
   auto np = n.get();
   _update_meta(dpp, fifo::update{}.journal_entries_add(jentries), version,
               &np->canceled, tid, NewPartPreparer::call(std::move(n)));
@@ -952,14 +972,15 @@ struct NewHeadPreparer : public Completion<NewHeadPreparer> {
   FIFO* f;
   int i = 0;
   bool newpart;
-  std::int64_t new_head_num;
+  std::int64_t new_head_part_num;
   bool canceled = false;
   std::uint64_t tid;
 
   NewHeadPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
-                 bool newpart, std::int64_t new_head_num, std::uint64_t tid)
-    : Completion(dpp, super), f(f), newpart(newpart), new_head_num(new_head_num),
-      tid(tid) {}
+                 bool newpart, std::int64_t new_head_part_num,
+                 std::uint64_t tid)
+    : Completion(dpp, super), f(f), newpart(newpart),
+      new_head_part_num(new_head_part_num), tid(tid) {}
 
   void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
     if (newpart)
@@ -977,7 +998,7 @@ struct NewHeadPreparer : public Completion<NewHeadPreparer> {
       return;
     }
     std::unique_lock l(f->m);
-    if (f->info.max_push_part_num < new_head_num) {
+    if (f->info.max_push_part_num < new_head_part_num) {
       l.unlock();
       lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
                    << " _prepare_new_part failed: r=" << r
@@ -990,67 +1011,86 @@ struct NewHeadPreparer : public Completion<NewHeadPreparer> {
   }
 
   void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
-    std::unique_lock l(f->m);
-    auto head_part_num = f->info.head_part_num;
-    auto version = f->info.version;
-    l.unlock();
-
+    ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " entering: tid=" << tid << dendl;
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                   << " _update_meta failed: r=" << r
+                   << " _update_meta failed:  r=" << r
                    << " tid=" << tid << dendl;
       complete(std::move(p), r);
       return;
     }
+
     if (canceled) {
+      using enum fifo::journal_entry::Op;
+      std::unique_lock l(f->m);
+      auto found = (f->info.journal.contains({create, new_head_part_num }) ||
+                   f->info.journal.contains({set_head, new_head_part_num }));
+      auto head_part_num = f->info.head_part_num;
+      auto version = f->info.version;
+
+      l.unlock();
+      if ((head_part_num >= new_head_part_num)) {
+       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                         << " raced, but journaled and processed: i=" << i
+                         << " tid=" << tid << dendl;
+       complete(std::move(p), 0);
+       return;
+      }
       if (i >= MAX_RACE_RETRIES) {
-       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                     << " canceled too many times, giving up: tid=" << tid << dendl;
        complete(std::move(p), -ECANCELED);
        return;
       }
-
-      // Raced, but there's still work to do!
-      if (head_part_num < new_head_num) {
-       canceled = false;
+      if (!found) {
        ++i;
-       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                         << " updating head: i=" << i << " tid=" << tid << dendl;
-       f->_update_meta(dpp, fifo::update{}.head_part_num(new_head_num),
-                       version, &this->canceled, tid, call(std::move(p)));
+       fifo::journal_entry jentry;
+       jentry.op = set_head;
+       jentry.part_num = new_head_part_num;
+       f->_update_meta(dpp, fifo::update{}
+                       .journal_entries_add({{jentry}}),
+                        version, &canceled, tid, call(std::move(p)));
        return;
+      } else {
+       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                         << " raced, journaled but not processed: i=" << i
+                         << " tid=" << tid << dendl;
+       canceled = false;
       }
+      // Fall through. We still need to process the journal.
     }
-    ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                  << " succeeded : i=" << i << " tid=" << tid << dendl;
-    complete(std::move(p), 0);
+    f->process_journal(dpp, tid, super());
     return;
   }
 };
 
-void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
+void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num,
+                            std::uint64_t tid, lr::AioCompletion* c)
 {
   ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
   std::unique_lock l(m);
-  int64_t new_head_num = info.head_part_num + 1;
   auto max_push_part_num = info.max_push_part_num;
   auto version = info.version;
   l.unlock();
 
-  if (max_push_part_num < new_head_num) {
+  if (max_push_part_num < new_head_part_num) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " need new part: tid=" << tid << dendl;
-    auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_num,
+    auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_part_num,
                                               tid);
-    _prepare_new_part(dpp, true, tid, NewHeadPreparer::call(std::move(n)));
+    _prepare_new_part(dpp, new_head_part_num, true, tid,
+                     NewHeadPreparer::call(std::move(n)));
   } else {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " updating head: tid=" << tid << dendl;
-    auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_num,
+    auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_part_num,
                                               tid);
     auto np = n.get();
-    _update_meta(dpp, fifo::update{}.head_part_num(new_head_num), version,
+    using enum fifo::journal_entry::Op;
+    fifo::journal_entry jentry;
+    jentry.op = set_head;
+    jentry.part_num = new_head_part_num;
+    _update_meta(dpp, fifo::update{}.journal_entries_add({{jentry}}), version,
                 &np->canceled, tid, NewHeadPreparer::call(std::move(n)));
   }
 }
@@ -1281,6 +1321,7 @@ int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_
   auto tid = ++next_tid;
   auto max_entry_size = info.params.max_entry_size;
   auto need_new_head = info.need_new_head();
+  auto head_part_num = info.head_part_num;
   l.unlock();
   ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
@@ -1303,7 +1344,7 @@ int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_
   if (need_new_head) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " need new head tid=" << tid << dendl;
-    r = _prepare_new_head(dpp, tid, y);
+    r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " _prepare_new_head failed: r=" << r
@@ -1325,6 +1366,7 @@ int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_
                   << " batch=" << batch.size() << " retries=" << retries
                   << " tid=" << tid << dendl;
     std::unique_lock l(m);
+    head_part_num = info.head_part_num;
     auto max_part_size = info.params.max_part_size;
     auto overhead = part_entry_overhead;
     l.unlock();
@@ -1351,7 +1393,7 @@ int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_
       ++retries;
       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                     << " need new head tid=" << tid << dendl;
-      r = _prepare_new_head(dpp, tid, y);
+      r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
       if (r < 0) {
        ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " prepare_new_head failed: r=" << r
@@ -1393,6 +1435,7 @@ struct Pusher : public Completion<Pusher> {
   std::deque<cb::list> remaining;
   std::deque<cb::list> batch;
   int i = 0;
+  std::int64_t head_part_num;
   std::uint64_t tid;
   bool new_heading = false;
 
@@ -1400,6 +1443,7 @@ struct Pusher : public Completion<Pusher> {
     std::unique_lock l(f->m);
     auto max_part_size = f->info.params.max_part_size;
     auto part_entry_overhead = f->part_entry_overhead;
+    head_part_num = f->info.head_part_num;
     l.unlock();
 
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
@@ -1449,7 +1493,7 @@ struct Pusher : public Completion<Pusher> {
 
   void new_head(const DoutPrefixProvider *dpp, Ptr&& p) {
     new_heading = true;
-    f->_prepare_new_head(dpp, tid, call(std::move(p)));
+    f->_prepare_new_head(dpp, head_part_num + 1, tid, call(std::move(p)));
   }
 
   void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
@@ -1506,9 +1550,10 @@ struct Pusher : public Completion<Pusher> {
   }
 
   Pusher(const DoutPrefixProvider *dpp, FIFO* f, std::deque<cb::list>&& remaining,
-        std::uint64_t tid, lr::AioCompletion* super)
+        std::int64_t head_part_num, std::uint64_t tid,
+        lr::AioCompletion* super)
     : Completion(dpp, super), f(f), remaining(std::move(remaining)),
-      tid(tid) {}
+      head_part_num(head_part_num), tid(tid) {}
 };
 
 void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs,
@@ -1518,11 +1563,12 @@ void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data
   auto tid = ++next_tid;
   auto max_entry_size = info.params.max_entry_size;
   auto need_new_head = info.need_new_head();
+  auto head_part_num = info.head_part_num;
   l.unlock();
   ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
   auto p = std::make_unique<Pusher>(dpp, this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
-                                   tid, c);
+                                   head_part_num, tid, c);
   // Validate sizes
   for (const auto& bl : data_bufs) {
     if (bl.length() > max_entry_size) {
index 96c393e43a793360435c3fa642ad1b614e8f19be..3ea54082d8877f32170cb1ca4d71b80210f2ba43 100644 (file)
@@ -142,10 +142,11 @@ class FIFO {
                  optional_yield y);
   int process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y);
   void process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c);
-  int _prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, optional_yield y);
-  void _prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, lr::AioCompletion* c);
-  int _prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y);
-  void _prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c);
+  int _prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num, bool is_head, std::uint64_t tid, optional_yield y);
+  void _prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num, bool is_head, std::uint64_t tid, lr::AioCompletion* c);
+  int _prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num,
+                       std::uint64_t tid, optional_yield y);
+  void _prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num, std::uint64_t tid, lr::AioCompletion* c);
   int push_entries(const DoutPrefixProvider *dpp, const std::deque<cb::list>& data_bufs,
                   std::uint64_t tid, optional_yield y);
   void push_entries(const std::deque<cb::list>& data_bufs,