From fcaa45d32d22653c6c8272a7a43af41bf3dc7e88 Mon Sep 17 00:00:00 2001 From: "Adam C. Emerson" Date: Wed, 30 Nov 2022 17:43:15 -0500 Subject: [PATCH] cls/fifo: Journal is flat_set, not multimap We don't really need the overhead and complexity of a multimap. Fixes: https://tracker.ceph.com/issues/57562 Signed-off-by: Adam C. Emerson --- src/cls/fifo/cls_fifo_types.h | 69 ++++++++++++++++++++----- src/rgw/driver/rados/cls_fifo_legacy.cc | 41 +++++++-------- 2 files changed, 75 insertions(+), 35 deletions(-) diff --git a/src/cls/fifo/cls_fifo_types.h b/src/cls/fifo/cls_fifo_types.h index 89ba1cc7102..2b9fec26af9 100644 --- a/src/cls/fifo/cls_fifo_types.h +++ b/src/cls/fifo/cls_fifo_types.h @@ -23,6 +23,8 @@ #include #include +#include + #undef FMT_HEADER_ONLY #define FMT_HEADER_ONLY 1 #include @@ -117,7 +119,7 @@ inline std::ostream& operator <<(std::ostream& m, const data_params& d) { struct journal_entry { enum class Op { - unknown = 0, + unknown = -1, create = 1, set_head = 2, remove = 3, @@ -125,11 +127,25 @@ struct journal_entry { std::int64_t part_num{-1}; + bool valid() const { + using enum Op; + switch (op) { + case create: [[fallthrough]]; + case set_head: [[fallthrough]]; + case remove: + return part_num >= 0; + + default: + return false; + } + } + journal_entry() = default; journal_entry(Op op, std::int64_t part_num) : op(op), part_num(part_num) {} void encode(ceph::buffer::list& bl) const { + ceph_assert(valid()); ENCODE_START(1, 1, bl); encode((int)op, bl); encode(part_num, bl); @@ -308,8 +324,39 @@ struct info { std::int64_t min_push_part_num{0}; std::int64_t max_push_part_num{-1}; - std::multimap journal; + boost::container::flat_set journal; + static_assert(journal_entry::Op::create < journal_entry::Op::set_head); + + // So we can get rid of the multimap without breaking compatibility + void encode_journal(bufferlist& bl) const { + using ceph::encode; + assert(journal.size() <= std::numeric_limits::max()); + uint32_t n = static_cast(journal.size()); + encode(n, bl); + for (const auto& entry : journal) { + encode(entry.part_num, bl); + encode(entry, bl); + } + } + void decode_journal( bufferlist::const_iterator& p) { + using enum journal_entry::Op; + using ceph::decode; + uint32_t n; + decode(n, p); + journal.clear(); + while (n--) { + decltype(journal_entry::part_num) dummy; + decode(dummy, p); + journal_entry e; + decode(e, p); + if (!e.valid()) { + throw ceph::buffer::malformed_input(); + } else { + journal.insert(std::move(e)); + } + } + } bool need_new_head() const { return (head_part_num < min_push_part_num); } @@ -332,7 +379,7 @@ struct info { std::map tags; encode(tags, bl); encode(head_tag, bl); - encode(journal, bl); + encode_journal(bl); ENCODE_FINISH(bl); } void decode(ceph::buffer::list::const_iterator& bl) { @@ -349,7 +396,7 @@ struct info { std::map tags; decode(tags, bl); decode(head_tag, bl); - decode(journal, bl); + decode_journal(bl); DECODE_FINISH(bl); } void dump(ceph::Formatter* f) const; @@ -379,19 +426,17 @@ struct info { } for (const auto& entry : update.journal_entries_add()) { - if (std::find_if(journal.begin(), journal.end(), - [&entry](const auto &x) { return x.second == entry; }) - != journal.end()) { - continue; - } else { - journal.emplace(entry.part_num, entry); + auto [iter, inserted] = journal.insert(entry); + if (inserted) { changed = true; } } for (const auto& entry : update.journal_entries_rm()) { - journal.erase(entry.part_num); - changed = true; + auto count = journal.erase(entry); + if (count > 0) { + changed = true; + } } if (update.head_part_num() && (head_part_num != *update.head_part_num())) { diff --git a/src/rgw/driver/rados/cls_fifo_legacy.cc b/src/rgw/driver/rados/cls_fifo_legacy.cc index 9946cfa413d..82d6c88a3b3 100644 --- a/src/rgw/driver/rados/cls_fifo_legacy.cc +++ b/src/rgw/driver/rados/cls_fifo_legacy.cc @@ -13,6 +13,7 @@ * */ +#include #include #include #include @@ -602,7 +603,7 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti l.unlock(); int r = 0; - for (auto& [n, entry] : tmpjournal) { + for (auto& entry : tmpjournal) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " processing entry: entry=" << entry << " tid=" << tid << dendl; @@ -689,13 +690,9 @@ int FIFO::process_journal(const DoutPrefixProvider *dpp, std::uint64_t tid, opti << " update canceled, retrying: i=" << i << " tid=" << tid << dendl; for (auto& e : processed) { - auto jiter = info.journal.find(e.part_num); - /* journal entry was already processed */ - if (jiter == info.journal.end() || - !(jiter->second == e)) { - continue; + if (info.journal.contains(e)) { + new_processed.push_back(e); } - new_processed.push_back(e); } processed = std::move(new_processed); } @@ -721,7 +718,7 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui std::vector jentries{{ create, info.max_push_part_num + 1 }}; - if (info.journal.find(jentries.front().part_num) != info.journal.end()) { + if (info.journal.contains(jentries.front())) { l.unlock(); ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " new part journaled, but not processed: tid=" @@ -753,8 +750,8 @@ int FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::ui r = _update_meta(dpp, u, version, &canceled, tid, y); if (r >= 0 && canceled) { std::unique_lock l(m); - auto found = (info.journal.find(jentries.front().part_num) != - info.journal.end()); + auto found = (info.journal.contains({create, jentries.front().part_num}) || + info.journal.contains({set_head, jentries.front().part_num})); if ((info.max_push_part_num >= jentries.front().part_num && info.head_part_num >= new_head_part_num)) { ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ @@ -880,12 +877,13 @@ struct NewPartPreparer : public Completion { } if (canceled) { + using enum fifo::journal_entry::Op; std::unique_lock l(f->m); - auto iter = f->info.journal.find(jentries.front().part_num); + auto found = (f->info.journal.contains({create, jentries.front().part_num}) || + f->info.journal.contains({set_head, jentries.front().part_num})); auto max_push_part_num = f->info.max_push_part_num; auto head_part_num = f->info.head_part_num; auto version = f->info.version; - auto found = (iter != f->info.journal.end()); l.unlock(); if ((max_push_part_num >= jentries.front().part_num && head_part_num >= new_head_part_num)) { @@ -926,7 +924,8 @@ void FIFO::_prepare_new_part(const DoutPrefixProvider *dpp, bool is_head, std::u std::vector jentries{{ create, info.max_push_part_num + 1 }}; - if (info.journal.find(jentries.front().part_num) != info.journal.end()) { + if (info.journal.contains({create, jentries.front().part_num}) && + (!is_head || info.journal.contains({set_head, jentries.front().part_num}))) { l.unlock(); ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " new part journaled, but not processed: tid=" @@ -2014,8 +2013,8 @@ private: FIFO* const fifo; std::vector processed; - std::multimap journal; - std::multimap::iterator iter; + decltype(fifo->info.journal) journal; + decltype(journal)::iterator iter; std::int64_t new_tail; std::int64_t new_head; std::int64_t new_max; @@ -2173,13 +2172,9 @@ public: std::vector new_processed; std::unique_lock l(fifo->m); for (auto& e : processed) { - auto jiter = fifo->info.journal.find(e.part_num); - /* journal entry was already processed */ - if (jiter == fifo->info.journal.end() || - !(jiter->second == e)) { - continue; + if (fifo->info.journal.contains(e)) { + new_processed.push_back(e); } - new_processed.push_back(e); } processed = std::move(new_processed); } @@ -2231,7 +2226,7 @@ public: ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__ << " processing entry: entry=" << *iter << " tid=" << tid << dendl; - const auto entry = iter->second; + const auto entry = *iter; switch (entry.op) { using enum fifo::journal_entry::Op; case create: @@ -2264,7 +2259,7 @@ public: << " entering: tid=" << tid << dendl; switch (state) { case entry_callback: - finish_je(dpp, std::move(p), r, iter->second); + finish_je(dpp, std::move(p), r, *iter); return; case pp_callback: auto c = canceled; -- 2.39.5