]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cls/fifo: Fix race condition on `_prepare_new_head`
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 2 Nov 2022 02:49:12 +0000 (22:49 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Tue, 10 Jan 2023 00:20:14 +0000 (19:20 -0500)
First, make `_prepare_new_head` take the new head part number, so two
calls racing from the same push will attempt to create the same head.

Also remove the neorados FIFO since it doesn't have all the bug fixes
in the legacy version and will be rewritten in terms of `async_compose`
anyway.

Fixes: https://tracker.ceph.com/issues/57562
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
(cherry picked from commit bf222906b01cbb9e8d55ae4685004a22cddcc138)
Conflicts:
src/rgw/cls_fifo_legacy.cc
 - Upstream C++20
 - Upstream DPP
Fixes: https://tracker.ceph.com/issues/58403
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/cls_fifo_legacy.cc
src/rgw/cls_fifo_legacy.h

index 76ca015b901b4a54b8b062c0e7dee1a7b4093224..6c1357b201b5b021facac3db7a028a742bbcbdd7 100644 (file)
@@ -421,8 +421,8 @@ int FIFO::apply_update(const DoutPrefixProvider *dpp,
                 << " entering: tid=" << tid << dendl;
   std::unique_lock l(m);
   if (objv != info->version) {
-    lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-              << " version mismatch, canceling: tid=" << tid << dendl;
+    ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " version mismatch, canceling: tid=" << tid << dendl;
     return -ECANCELED;
   }
 
@@ -438,7 +438,7 @@ int FIFO::_update_meta(const DoutPrefixProvider *dpp, const fifo::update& update
                 << " entering: tid=" << tid << dendl;
   lr::ObjectWriteOperation op;
   bool canceled = false;
-  update_meta(&op, info.version, update);
+  update_meta(&op, version, update);
   auto r = rgw_rados_operate(dpp, ioctx, oid, &op, y);
   if (r >= 0 || r == -ECANCELED) {
     canceled = (r == -ECANCELED);
@@ -711,15 +711,16 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti
   return r;
 }
 
-int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, optional_yield y)
+int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp,
+                           std::int64_t new_part_num, bool is_head,
+                           std::uint64_t tid, optional_yield y)
 {
   ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
   std::unique_lock l(m);
-  std::vector<fifo::journal_entry> jentries{{
-      fifo::journal_entry::Op::create, info.max_push_part_num + 1
-    }};
-  if (info.journal.contains(jentries.front())) {
+  std::vector<fifo::journal_entry> jentries{{ fifo::journal_entry::Op::create, new_part_num }};
+  if (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) &&
+      (!is_head || info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}))) {
     l.unlock();
     ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
                  << " new part journaled, but not processed: tid="
@@ -731,13 +732,12 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
     }
     return r;
   }
-  std::int64_t new_head_part_num = info.head_part_num;
   auto version = info.version;
 
   if (is_head) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " needs new head: tid=" << tid << dendl;
-    jentries.push_back({ fifo::journal_entry::Op::set_head, jentries.front().part_num });
+    jentries.push_back({ fifo::journal_entry::Op::set_head, new_part_num });
   }
   l.unlock();
 
@@ -751,10 +751,11 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
     r = _update_meta(dpp, u, version, &canceled, tid, y);
     if (r >= 0 && canceled) {
       std::unique_lock l(m);
-      auto found = (info.journal.contains({fifo::journal_entry::Op::create, jentries.front().part_num}) ||
-                   info.journal.contains({fifo::journal_entry::Op::set_head, jentries.front().part_num}));
-      if ((info.max_push_part_num >= jentries.front().part_num &&
-          info.head_part_num >= new_head_part_num)) {
+      version = info.version;
+      auto found = (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) ||
+                   info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}));
+      if ((info.max_push_part_num >= new_part_num &&
+          info.head_part_num >= new_part_num)) {
        ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                       << " raced, but journaled and processed: i=" << i
                       << " tid=" << tid << dendl;
@@ -788,21 +789,22 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
   return r;
 }
 
-int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y)
+int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp,
+                           std::int64_t new_head_part_num,
+                           std::uint64_t tid, optional_yield y)
 {
   ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " entering: tid=" << tid << dendl;
+                    << " entering: tid=" << tid << dendl;
   std::unique_lock l(m);
-  std::int64_t new_head_num = info.head_part_num + 1;
   auto max_push_part_num = info.max_push_part_num;
   auto version = info.version;
   l.unlock();
 
   int r = 0;
-  if (max_push_part_num < new_head_num) {
+  if (max_push_part_num < new_head_part_num) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " need new part: tid=" << tid << dendl;
-    r = _prepare_new_part(dpp, true, tid, y);
+    r = _prepare_new_part(dpp, new_head_part_num, true, tid, y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " _prepare_new_part failed: r=" << r
@@ -810,7 +812,7 @@ int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, op
       return r;
     }
     std::unique_lock l(m);
-    if (info.max_push_part_num < new_head_num) {
+    if (info.max_push_part_num < new_head_part_num) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " inconsistency, push part less than head part: "
                 << " tid=" << tid << dendl;
@@ -820,51 +822,71 @@ int FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, op
     return 0;
   }
 
+  fifo::journal_entry jentry;
+  jentry.op = fifo::journal_entry::Op::set_head;
+  jentry.part_num = new_head_part_num;
+
+  r = 0;
   bool canceled = true;
   for (auto i = 0; canceled && i < MAX_RACE_RETRIES; ++i) {
+    canceled = false;
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                  << " updating head: i=" << i << " tid=" << tid << dendl;
-    auto u = fifo::update{}.head_part_num(new_head_num);
+                  << " updating metadata: i=" << i << " tid=" << tid << dendl;
+    auto u = fifo::update{}.journal_entries_add({{ jentry }});
     r = _update_meta(dpp, u, version, &canceled, tid, y);
+    if (r >= 0 && canceled) {
+      std::unique_lock l(m);
+      auto found = (info.journal.contains({fifo::journal_entry::Op::create, new_head_part_num}) ||
+                   info.journal.contains({fifo::journal_entry::Op::set_head, new_head_part_num}));
+      version = info.version;
+      if ((info.head_part_num >= new_head_part_num)) {
+       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " raced, but journaled and processed: i=" << i
+                      << " tid=" << tid << dendl;
+       return 0;
+      }
+      if (found) {
+       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " raced, journaled but not processed: i=" << i
+                      << " tid=" << tid << dendl;
+       canceled = false;
+      }
+      l.unlock();
+    }
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " _update_meta failed: update=" << u << " r=" << r
                 << " tid=" << tid << dendl;
       return r;
     }
-    std::unique_lock l(m);
-    auto head_part_num = info.head_part_num;
-    version = info.version;
-    l.unlock();
-    if (canceled && (head_part_num >= new_head_num)) {
-      ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                    << " raced, but completed by the other caller: i=" << i
-                    << " tid=" << tid << dendl;
-      canceled = false;
-    }
   }
   if (canceled) {
     ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
               << " canceled too many times, giving up: tid=" << tid << dendl;
     return -ECANCELED;
   }
-  return 0;
+  r = process_journal(dpp, tid, y);
+  if (r < 0) {
+    ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
+              << " process_journal failed: r=" << r << " tid=" << tid << dendl;
+  }
+  return r;
 }
 
 struct NewPartPreparer : public Completion<NewPartPreparer> {
   FIFO* f;
   std::vector<fifo::journal_entry> jentries;
   int i = 0;
-  std::int64_t new_head_part_num;
+  std::int64_t new_part_num;
   bool canceled = false;
   uint64_t tid;
 
   NewPartPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
                  std::vector<fifo::journal_entry> jentries,
-                 std::int64_t new_head_part_num,
+                 std::int64_t new_part_num,
                  std::uint64_t tid)
     : Completion(dpp, super), f(f), jentries(std::move(jentries)),
-      new_head_part_num(new_head_part_num), tid(tid) {}
+      new_part_num(new_part_num), tid(tid) {}
 
   void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
@@ -879,14 +901,14 @@ struct NewPartPreparer : public Completion<NewPartPreparer> {
 
     if (canceled) {
       std::unique_lock l(f->m);
-      auto found = (f->info.journal.contains({fifo::journal_entry::Op::create, jentries.front().part_num}) ||
-                   f->info.journal.contains({fifo::journal_entry::Op::set_head, jentries.front().part_num}));
+      auto found = (f->info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) ||
+                   f->info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}));
       auto max_push_part_num = f->info.max_push_part_num;
       auto head_part_num = f->info.head_part_num;
       auto version = f->info.version;
       l.unlock();
-      if ((max_push_part_num >= jentries.front().part_num &&
-          head_part_num >= new_head_part_num)) {
+      if ((max_push_part_num >= new_part_num &&
+          head_part_num >= new_part_num)) {
        ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                          << " raced, but journaled and processed: i=" << i
                          << " tid=" << tid << dendl;
@@ -916,15 +938,13 @@ struct NewPartPreparer : public Completion<NewPartPreparer> {
   }
 };
 
-void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid,
-                            lr::AioCompletion* c)
+void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num,
+                            bool is_head, std::uint64_t tid, lr::AioCompletion* c)
 {
   std::unique_lock l(m);
-  std::vector<fifo::journal_entry> jentries{{
-      fifo::journal_entry::Op::create, info.max_push_part_num + 1
-    }};
-  if (info.journal.contains({fifo::journal_entry::Op::create, jentries.front().part_num}) &&
-      (!is_head || info.journal.contains({fifo::journal_entry::Op::set_head, jentries.front().part_num}))) {
+  std::vector<fifo::journal_entry> jentries{{fifo::journal_entry::Op::create, new_part_num}};
+  if (info.journal.contains({fifo::journal_entry::Op::create, new_part_num}) &&
+      (!is_head || info.journal.contains({fifo::journal_entry::Op::set_head, new_part_num}))) {
     l.unlock();
     ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
                  << " new part journaled, but not processed: tid="
@@ -932,16 +952,15 @@ void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::u
     process_journal(dpp, tid, c);
     return;
   }
-  std::int64_t new_head_part_num = info.head_part_num;
   auto version = info.version;
 
   if (is_head) {
-    jentries.push_back({ fifo::journal_entry::Op::set_head, jentries.front().part_num });
+    jentries.push_back({ fifo::journal_entry::Op::set_head, new_part_num });
   }
   l.unlock();
 
   auto n = std::make_unique<NewPartPreparer>(dpp, this, c, jentries,
-                                            new_head_part_num, tid);
+                                            new_part_num, tid);
   auto np = n.get();
   _update_meta(dpp, fifo::update{}.journal_entries_add(jentries), version,
               &np->canceled, tid, NewPartPreparer::call(std::move(n)));
@@ -951,14 +970,15 @@ struct NewHeadPreparer : public Completion<NewHeadPreparer> {
   FIFO* f;
   int i = 0;
   bool newpart;
-  std::int64_t new_head_num;
+  std::int64_t new_head_part_num;
   bool canceled = false;
   std::uint64_t tid;
 
   NewHeadPreparer(const DoutPrefixProvider *dpp, FIFO* f, lr::AioCompletion* super,
-                 bool newpart, std::int64_t new_head_num, std::uint64_t tid)
-    : Completion(dpp, super), f(f), newpart(newpart), new_head_num(new_head_num),
-      tid(tid) {}
+                 bool newpart, std::int64_t new_head_part_num,
+                 std::uint64_t tid)
+    : Completion(dpp, super), f(f), newpart(newpart),
+      new_head_part_num(new_head_part_num), tid(tid) {}
 
   void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
     if (newpart)
@@ -976,7 +996,7 @@ struct NewHeadPreparer : public Completion<NewHeadPreparer> {
       return;
     }
     std::unique_lock l(f->m);
-    if (f->info.max_push_part_num < new_head_num) {
+    if (f->info.max_push_part_num < new_head_part_num) {
       l.unlock();
       lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
                    << " _prepare_new_part failed: r=" << r
@@ -989,67 +1009,84 @@ struct NewHeadPreparer : public Completion<NewHeadPreparer> {
   }
 
   void handle_update(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
-    std::unique_lock l(f->m);
-    auto head_part_num = f->info.head_part_num;
-    auto version = f->info.version;
-    l.unlock();
-
+    ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " entering: tid=" << tid << dendl;
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                   << " _update_meta failed: r=" << r
+                   << " _update_meta failed:  r=" << r
                    << " tid=" << tid << dendl;
       complete(std::move(p), r);
       return;
     }
+
     if (canceled) {
+      std::unique_lock l(f->m);
+      auto found = (f->info.journal.contains({fifo::journal_entry::Op::create, new_head_part_num }) ||
+                   f->info.journal.contains({fifo::journal_entry::Op::set_head, new_head_part_num }));
+      auto head_part_num = f->info.head_part_num;
+      auto version = f->info.version;
+
+      l.unlock();
+      if ((head_part_num >= new_head_part_num)) {
+       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                         << " raced, but journaled and processed: i=" << i
+                         << " tid=" << tid << dendl;
+       complete(std::move(p), 0);
+       return;
+      }
       if (i >= MAX_RACE_RETRIES) {
-       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                     << " canceled too many times, giving up: tid=" << tid << dendl;
        complete(std::move(p), -ECANCELED);
        return;
       }
-
-      // Raced, but there's still work to do!
-      if (head_part_num < new_head_num) {
-       canceled = false;
+      if (!found) {
        ++i;
-       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                         << " updating head: i=" << i << " tid=" << tid << dendl;
-       f->_update_meta(dpp, fifo::update{}.head_part_num(new_head_num),
-                       version, &this->canceled, tid, call(std::move(p)));
+       fifo::journal_entry jentry;
+       jentry.op = fifo::journal_entry::Op::set_head;
+       jentry.part_num = new_head_part_num;
+       f->_update_meta(dpp, fifo::update{}
+                       .journal_entries_add({{jentry}}),
+                        version, &canceled, tid, call(std::move(p)));
        return;
+      } else {
+       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                         << " raced, journaled but not processed: i=" << i
+                         << " tid=" << tid << dendl;
+       canceled = false;
       }
+      // Fall through. We still need to process the journal.
     }
-    ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                  << " succeeded : i=" << i << " tid=" << tid << dendl;
-    complete(std::move(p), 0);
+    f->process_journal(dpp, tid, super());
     return;
   }
 };
 
-void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c)
+void FIFO::_prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num,
+                            std::uint64_t tid, lr::AioCompletion* c)
 {
   ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
   std::unique_lock l(m);
-  int64_t new_head_num = info.head_part_num + 1;
   auto max_push_part_num = info.max_push_part_num;
   auto version = info.version;
   l.unlock();
 
-  if (max_push_part_num < new_head_num) {
+  if (max_push_part_num < new_head_part_num) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " need new part: tid=" << tid << dendl;
-    auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_num,
+    auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, true, new_head_part_num,
                                               tid);
-    _prepare_new_part(dpp, true, tid, NewHeadPreparer::call(std::move(n)));
+    _prepare_new_part(dpp, new_head_part_num, true, tid,
+                     NewHeadPreparer::call(std::move(n)));
   } else {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " updating head: tid=" << tid << dendl;
-    auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_num,
+    auto n = std::make_unique<NewHeadPreparer>(dpp, this, c, false, new_head_part_num,
                                               tid);
     auto np = n.get();
-    _update_meta(dpp, fifo::update{}.head_part_num(new_head_num), version,
+    fifo::journal_entry jentry;
+    jentry.op = fifo::journal_entry::Op::set_head;
+    jentry.part_num = new_head_part_num;
+    _update_meta(dpp, fifo::update{}.journal_entries_add({{jentry}}), version,
                 &np->canceled, tid, NewHeadPreparer::call(std::move(n)));
   }
 }
@@ -1280,6 +1317,7 @@ int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_
   auto tid = ++next_tid;
   auto max_entry_size = info.params.max_entry_size;
   auto need_new_head = info.need_new_head();
+  auto head_part_num = info.head_part_num;
   l.unlock();
   ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
@@ -1302,7 +1340,7 @@ int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_
   if (need_new_head) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " need new head tid=" << tid << dendl;
-    r = _prepare_new_head(dpp, tid, y);
+    r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " _prepare_new_head failed: r=" << r
@@ -1324,6 +1362,7 @@ int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_
                   << " batch=" << batch.size() << " retries=" << retries
                   << " tid=" << tid << dendl;
     std::unique_lock l(m);
+    head_part_num = info.head_part_num;
     auto max_part_size = info.params.max_part_size;
     auto overhead = part_entry_overhead;
     l.unlock();
@@ -1350,7 +1389,7 @@ int FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_
       ++retries;
       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                     << " need new head tid=" << tid << dendl;
-      r = _prepare_new_head(dpp, tid, y);
+      r = _prepare_new_head(dpp, head_part_num + 1, tid, y);
       if (r < 0) {
        ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " prepare_new_head failed: r=" << r
@@ -1392,6 +1431,7 @@ struct Pusher : public Completion<Pusher> {
   std::deque<cb::list> remaining;
   std::deque<cb::list> batch;
   int i = 0;
+  std::int64_t head_part_num;
   std::uint64_t tid;
   bool new_heading = false;
 
@@ -1399,6 +1439,7 @@ struct Pusher : public Completion<Pusher> {
     std::unique_lock l(f->m);
     auto max_part_size = f->info.params.max_part_size;
     auto part_entry_overhead = f->part_entry_overhead;
+    head_part_num = f->info.head_part_num;
     l.unlock();
 
     ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
@@ -1448,7 +1489,7 @@ struct Pusher : public Completion<Pusher> {
 
   void new_head(const DoutPrefixProvider *dpp, Ptr&& p) {
     new_heading = true;
-    f->_prepare_new_head(dpp, tid, call(std::move(p)));
+    f->_prepare_new_head(dpp, head_part_num + 1, tid, call(std::move(p)));
   }
 
   void handle(const DoutPrefixProvider *dpp, Ptr&& p, int r) {
@@ -1505,9 +1546,10 @@ struct Pusher : public Completion<Pusher> {
   }
 
   Pusher(const DoutPrefixProvider *dpp, FIFO* f, std::deque<cb::list>&& remaining,
-        std::uint64_t tid, lr::AioCompletion* super)
+        std::int64_t head_part_num, std::uint64_t tid,
+        lr::AioCompletion* super)
     : Completion(dpp, super), f(f), remaining(std::move(remaining)),
-      tid(tid) {}
+      head_part_num(head_part_num), tid(tid) {}
 };
 
 void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data_bufs,
@@ -1517,11 +1559,12 @@ void FIFO::push(const DoutPrefixProvider *dpp, const std::vector<cb::list>& data
   auto tid = ++next_tid;
   auto max_entry_size = info.params.max_entry_size;
   auto need_new_head = info.need_new_head();
+  auto head_part_num = info.head_part_num;
   l.unlock();
   ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
   auto p = std::make_unique<Pusher>(dpp, this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
-                                   tid, c);
+                                   head_part_num, tid, c);
   // Validate sizes
   for (const auto& bl : data_bufs) {
     if (bl.length() > max_entry_size) {
index 509bd092e62d1eec3e07cafb0f712bb5fa8ebef0..172ee1e9fa8f6c7a000bfcc01a5cf7e2ca281492 100644 (file)
@@ -144,10 +144,11 @@ class FIFO {
                  optional_yield y);
   int process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y);
   void process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c);
-  int _prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, optional_yield y);
-  void _prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::uint64_t tid, lr::AioCompletion* c);
-  int _prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, optional_yield y);
-  void _prepare_new_head(const DoutPrefixProvider *dpp, std::uint64_t tid, lr::AioCompletion* c);
+  int _prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num, bool is_head, std::uint64_t tid, optional_yield y);
+  void _prepare_new_part(const DoutPrefixProvider *dpp, std::int64_t new_part_num, bool is_head, std::uint64_t tid, lr::AioCompletion* c);
+  int _prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num,
+                       std::uint64_t tid, optional_yield y);
+  void _prepare_new_head(const DoutPrefixProvider *dpp, std::int64_t new_head_part_num, std::uint64_t tid, lr::AioCompletion* c);
   int push_entries(const DoutPrefixProvider *dpp, const std::deque<cb::list>& data_bufs,
                   std::uint64_t tid, optional_yield y);
   void push_entries(const std::deque<cb::list>& data_bufs,