]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
cls/fifo: Journal is flat_set, not multimap
authorAdam C. Emerson <aemerson@redhat.com>
Wed, 30 Nov 2022 22:43:15 +0000 (17:43 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 9 Jan 2023 21:53:51 +0000 (16:53 -0500)
We don't really need the overhead and complexity of a multimap.

Fixes: https://tracker.ceph.com/issues/57562
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/cls/fifo/cls_fifo_types.h
src/rgw/driver/rados/cls_fifo_legacy.cc

index 89ba1cc7102f01352ad07b1d97358ff8ee02d893..2b9fec26af9b1671c4b7571133020394a0dea147 100644 (file)
@@ -23,6 +23,8 @@
 #include <string>
 #include <vector>
 
+#include <boost/container/flat_set.hpp>
+
 #undef FMT_HEADER_ONLY
 #define FMT_HEADER_ONLY 1
 #include <fmt/format.h>
@@ -117,7 +119,7 @@ inline std::ostream& operator <<(std::ostream& m, const data_params& d) {
 
 struct journal_entry {
   enum class Op {
-    unknown  = 0,
+    unknown  = -1,
     create   = 1,
     set_head = 2,
     remove   = 3,
@@ -125,11 +127,25 @@ struct journal_entry {
 
   std::int64_t part_num{-1};
 
+  bool valid() const {
+    using enum Op;
+    switch (op) {
+    case create: [[fallthrough]];
+    case set_head: [[fallthrough]];
+    case remove:
+      return part_num >= 0;
+
+    default:
+      return false;
+    }
+  }
+
   journal_entry() = default;
   journal_entry(Op op, std::int64_t part_num)
     : op(op), part_num(part_num) {}
 
   void encode(ceph::buffer::list& bl) const {
+    ceph_assert(valid());
     ENCODE_START(1, 1, bl);
     encode((int)op, bl);
     encode(part_num, bl);
@@ -308,8 +324,39 @@ struct info {
   std::int64_t min_push_part_num{0};
   std::int64_t max_push_part_num{-1};
 
-  std::multimap<int64_t, journal_entry> journal;
+  boost::container::flat_set<journal_entry> journal;
+  static_assert(journal_entry::Op::create < journal_entry::Op::set_head);
+
+  // So we can get rid of the multimap without breaking compatibility
+  void encode_journal(bufferlist& bl) const {
+    using ceph::encode;
+    assert(journal.size() <= std::numeric_limits<uint32_t>::max());
+    uint32_t n = static_cast<uint32_t>(journal.size());
+    encode(n, bl);
+    for (const auto& entry : journal) {
+      encode(entry.part_num, bl);
+      encode(entry, bl);
+    }
+  }
 
+  void decode_journal( bufferlist::const_iterator& p) {
+    using enum journal_entry::Op;
+    using ceph::decode;
+    uint32_t n;
+    decode(n, p);
+    journal.clear();
+    while (n--) {
+      decltype(journal_entry::part_num) dummy;
+      decode(dummy, p);
+      journal_entry e;
+      decode(e, p);
+      if (!e.valid()) {
+       throw ceph::buffer::malformed_input();
+      } else {
+       journal.insert(std::move(e));
+      }
+    }
+  }
   bool need_new_head() const {
     return (head_part_num < min_push_part_num);
   }
@@ -332,7 +379,7 @@ struct info {
     std::map<int64_t, std::string> tags;
     encode(tags, bl);
     encode(head_tag, bl);
-    encode(journal, bl);
+    encode_journal(bl);
     ENCODE_FINISH(bl);
   }
   void decode(ceph::buffer::list::const_iterator& bl) {
@@ -349,7 +396,7 @@ struct info {
     std::map<int64_t, std::string> tags;
     decode(tags, bl);
     decode(head_tag, bl);
-    decode(journal, bl);
+    decode_journal(bl);
     DECODE_FINISH(bl);
   }
   void dump(ceph::Formatter* f) const;
@@ -379,19 +426,17 @@ struct info {
     }
 
     for (const auto& entry : update.journal_entries_add()) {
-      if (std::find_if(journal.begin(), journal.end(),
-                      [&entry](const auto &x) { return x.second == entry; })
-         != journal.end()) {
-       continue;
-      } else {
-       journal.emplace(entry.part_num, entry);
+      auto [iter, inserted] = journal.insert(entry);
+      if (inserted) {
        changed = true;
       }
     }
 
     for (const auto& entry : update.journal_entries_rm()) {
-      journal.erase(entry.part_num);
-      changed = true;
+      auto count = journal.erase(entry);
+      if (count > 0) {
+       changed = true;
+      }
     }
 
     if (update.head_part_num() && (head_part_num != *update.head_part_num())) {
index 9946cfa413dfd0b90138c79c2de624ddd9a64d56..82d6c88a3b38fb63f74aaa6a6a95b271ca6a7d41 100644 (file)
@@ -13,6 +13,7 @@
  *
  */
 
+#include <algorithm>
 #include <cstdint>
 #include <numeric>
 #include <optional>
@@ -602,7 +603,7 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti
   l.unlock();
 
   int r = 0;
-  for (auto& [n, entry] : tmpjournal) {
+  for (auto& entry : tmpjournal) {
     ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " processing entry: entry=" << entry << " tid=" << tid
                   << dendl;
@@ -689,13 +690,9 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti
                     << " update canceled, retrying: i=" << i << " tid="
                     << tid << dendl;
       for (auto& e : processed) {
-       auto jiter = info.journal.find(e.part_num);
-       /* journal entry was already processed */
-       if (jiter == info.journal.end() ||
-           !(jiter->second == e)) {
-         continue;
+       if (info.journal.contains(e)) {
+         new_processed.push_back(e);
        }
-       new_processed.push_back(e);
       }
       processed = std::move(new_processed);
     }
@@ -721,7 +718,7 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
   std::vector<fifo::journal_entry> jentries{{
       create, info.max_push_part_num + 1
     }};
-  if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
+  if (info.journal.contains(jentries.front())) {
     l.unlock();
     ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
                  << " new part journaled, but not processed: tid="
@@ -753,8 +750,8 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui
     r = _update_meta(dpp, u, version, &canceled, tid, y);
     if (r >= 0 && canceled) {
       std::unique_lock l(m);
-      auto found = (info.journal.find(jentries.front().part_num) !=
-                   info.journal.end());
+      auto found = (info.journal.contains({create, jentries.front().part_num}) ||
+                   info.journal.contains({set_head, jentries.front().part_num}));
       if ((info.max_push_part_num >= jentries.front().part_num &&
           info.head_part_num >= new_head_part_num)) {
        ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
@@ -880,12 +877,13 @@ struct NewPartPreparer : public Completion<NewPartPreparer> {
     }
 
     if (canceled) {
+      using enum fifo::journal_entry::Op;
       std::unique_lock l(f->m);
-      auto iter = f->info.journal.find(jentries.front().part_num);
+      auto found = (f->info.journal.contains({create, jentries.front().part_num}) ||
+                   f->info.journal.contains({set_head, jentries.front().part_num}));
       auto max_push_part_num = f->info.max_push_part_num;
       auto head_part_num = f->info.head_part_num;
       auto version = f->info.version;
-      auto found = (iter != f->info.journal.end());
       l.unlock();
       if ((max_push_part_num >= jentries.front().part_num &&
           head_part_num >= new_head_part_num)) {
@@ -926,7 +924,8 @@ void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::u
   std::vector<fifo::journal_entry> jentries{{
       create, info.max_push_part_num + 1
     }};
-  if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
+  if (info.journal.contains({create, jentries.front().part_num}) &&
+      (!is_head || info.journal.contains({set_head, jentries.front().part_num}))) {
     l.unlock();
     ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
                  << " new part journaled, but not processed: tid="
@@ -2014,8 +2013,8 @@ private:
   FIFO* const fifo;
 
   std::vector<fifo::journal_entry> processed;
-  std::multimap<std::int64_t, fifo::journal_entry> journal;
-  std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
+  decltype(fifo->info.journal) journal;
+  decltype(journal)::iterator iter;
   std::int64_t new_tail;
   std::int64_t new_head;
   std::int64_t new_max;
@@ -2173,13 +2172,9 @@ public:
       std::vector<fifo::journal_entry> new_processed;
       std::unique_lock l(fifo->m);
       for (auto& e : processed) {
-       auto jiter = fifo->info.journal.find(e.part_num);
-       /* journal entry was already processed */
-       if (jiter == fifo->info.journal.end() ||
-           !(jiter->second == e)) {
-         continue;
+       if (fifo->info.journal.contains(e)) {
+         new_processed.push_back(e);
        }
-       new_processed.push_back(e);
       }
       processed = std::move(new_processed);
     }
@@ -2231,7 +2226,7 @@ public:
       ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                           << " processing entry: entry=" << *iter
                           << " tid=" << tid << dendl;
-      const auto entry = iter->second;
+      const auto entry = *iter;
       switch (entry.op) {
        using enum fifo::journal_entry::Op;
       case create:
@@ -2264,7 +2259,7 @@ public:
                         << " entering: tid=" << tid << dendl;
     switch (state) {
     case entry_callback:
-      finish_je(dpp, std::move(p), r, iter->second);
+      finish_je(dpp, std::move(p), r, *iter);
       return;
     case pp_callback:
       auto c = canceled;