#include <string>
#include <vector>
+#include <boost/container/flat_set.hpp>
+
#undef FMT_HEADER_ONLY
#define FMT_HEADER_ONLY 1
#include <fmt/format.h>
struct journal_entry {
enum class Op {
- unknown = 0,
+ unknown = -1,
create = 1,
set_head = 2,
remove = 3,
std::int64_t part_num{-1};
+ bool valid() const {
+ using enum Op;
+ switch (op) {
+ case create: [[fallthrough]];
+ case set_head: [[fallthrough]];
+ case remove:
+ return part_num >= 0;
+
+ default:
+ return false;
+ }
+ }
+
journal_entry() = default;
journal_entry(Op op, std::int64_t part_num)
: op(op), part_num(part_num) {}
void encode(ceph::buffer::list& bl) const {
+ ceph_assert(valid());
ENCODE_START(1, 1, bl);
encode((int)op, bl);
encode(part_num, bl);
std::int64_t min_push_part_num{0};
std::int64_t max_push_part_num{-1};
- std::multimap<int64_t, journal_entry> journal;
+ boost::container::flat_set<journal_entry> journal;
+ static_assert(journal_entry::Op::create < journal_entry::Op::set_head);
+
+ // So we can get rid of the multimap without breaking compatibility
+ void encode_journal(bufferlist& bl) const {
+ using ceph::encode;
+ assert(journal.size() <= std::numeric_limits<uint32_t>::max());
+ uint32_t n = static_cast<uint32_t>(journal.size());
+ encode(n, bl);
+ for (const auto& entry : journal) {
+ encode(entry.part_num, bl);
+ encode(entry, bl);
+ }
+ }
+ void decode_journal( bufferlist::const_iterator& p) {
+ using enum journal_entry::Op;
+ using ceph::decode;
+ uint32_t n;
+ decode(n, p);
+ journal.clear();
+ while (n--) {
+ decltype(journal_entry::part_num) dummy;
+ decode(dummy, p);
+ journal_entry e;
+ decode(e, p);
+ if (!e.valid()) {
+ throw ceph::buffer::malformed_input();
+ } else {
+ journal.insert(std::move(e));
+ }
+ }
+ }
bool need_new_head() const {
return (head_part_num < min_push_part_num);
}
std::map<int64_t, std::string> tags;
encode(tags, bl);
encode(head_tag, bl);
- encode(journal, bl);
+ encode_journal(bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
std::map<int64_t, std::string> tags;
decode(tags, bl);
decode(head_tag, bl);
- decode(journal, bl);
+ decode_journal(bl);
DECODE_FINISH(bl);
}
void dump(ceph::Formatter* f) const;
}
for (const auto& entry : update.journal_entries_add()) {
- if (std::find_if(journal.begin(), journal.end(),
- [&entry](const auto &x) { return x.second == entry; })
- != journal.end()) {
- continue;
- } else {
- journal.emplace(entry.part_num, entry);
+ auto [iter, inserted] = journal.insert(entry);
+ if (inserted) {
changed = true;
}
}
for (const auto& entry : update.journal_entries_rm()) {
- journal.erase(entry.part_num);
- changed = true;
+ auto count = journal.erase(entry);
+ if (count > 0) {
+ changed = true;
+ }
}
if (update.head_part_num() && (head_part_num != *update.head_part_num())) {
*
*/
+#include <algorithm>
#include <cstdint>
#include <numeric>
#include <optional>
l.unlock();
int r = 0;
- for (auto& [n, entry] : tmpjournal) {
+ for (auto& entry : tmpjournal) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " processing entry: entry=" << entry << " tid=" << tid
<< dendl;
<< " update canceled, retrying: i=" << i << " tid="
<< tid << dendl;
for (auto& e : processed) {
- auto jiter = info.journal.find(e.part_num);
- /* journal entry was already processed */
- if (jiter == info.journal.end() ||
- !(jiter->second == e)) {
- continue;
+ if (info.journal.contains(e)) {
+ new_processed.push_back(e);
}
- new_processed.push_back(e);
}
processed = std::move(new_processed);
}
std::vector<fifo::journal_entry> jentries{{
create, info.max_push_part_num + 1
}};
- if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
+ if (info.journal.contains(jentries.front())) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " new part journaled, but not processed: tid="
r = _update_meta(dpp, u, version, &canceled, tid, y);
if (r >= 0 && canceled) {
std::unique_lock l(m);
- auto found = (info.journal.find(jentries.front().part_num) !=
- info.journal.end());
+ auto found = (info.journal.contains({create, jentries.front().part_num}) ||
+ info.journal.contains({set_head, jentries.front().part_num}));
if ((info.max_push_part_num >= jentries.front().part_num &&
info.head_part_num >= new_head_part_num)) {
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
}
if (canceled) {
+ using enum fifo::journal_entry::Op;
std::unique_lock l(f->m);
- auto iter = f->info.journal.find(jentries.front().part_num);
+ auto found = (f->info.journal.contains({create, jentries.front().part_num}) ||
+ f->info.journal.contains({set_head, jentries.front().part_num}));
auto max_push_part_num = f->info.max_push_part_num;
auto head_part_num = f->info.head_part_num;
auto version = f->info.version;
- auto found = (iter != f->info.journal.end());
l.unlock();
if ((max_push_part_num >= jentries.front().part_num &&
head_part_num >= new_head_part_num)) {
std::vector<fifo::journal_entry> jentries{{
create, info.max_push_part_num + 1
}};
- if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
+ if (info.journal.contains({create, jentries.front().part_num}) &&
+ (!is_head || info.journal.contains({set_head, jentries.front().part_num}))) {
l.unlock();
ldpp_dout(dpp, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " new part journaled, but not processed: tid="
FIFO* const fifo;
std::vector<fifo::journal_entry> processed;
- std::multimap<std::int64_t, fifo::journal_entry> journal;
- std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
+ decltype(fifo->info.journal) journal;
+ decltype(journal)::iterator iter;
std::int64_t new_tail;
std::int64_t new_head;
std::int64_t new_max;
std::vector<fifo::journal_entry> new_processed;
std::unique_lock l(fifo->m);
for (auto& e : processed) {
- auto jiter = fifo->info.journal.find(e.part_num);
- /* journal entry was already processed */
- if (jiter == fifo->info.journal.end() ||
- !(jiter->second == e)) {
- continue;
+ if (fifo->info.journal.contains(e)) {
+ new_processed.push_back(e);
}
- new_processed.push_back(e);
}
processed = std::move(new_processed);
}
ldpp_dout(dpp, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " processing entry: entry=" << *iter
<< " tid=" << tid << dendl;
- const auto entry = iter->second;
+ const auto entry = *iter;
switch (entry.op) {
using enum fifo::journal_entry::Op;
case create:
<< " entering: tid=" << tid << dendl;
switch (state) {
case entry_callback:
- finish_je(dpp, std::move(p), r, iter->second);
+ finish_je(dpp, std::move(p), r, *iter);
return;
case pp_callback:
auto c = canceled;