void PGLog::IndexedLog::trim(
CephContext* cct,
eversion_t s,
- set<eversion_t> *trimmed)
+ set<eversion_t> *trimmed,
+ set<string> *trimmed_dups)
{
if (complete_to != log.end() &&
complete_to->version <= s) {
unindex(e); // remove from index,
+ // add to dup list
+ if (e.version.version + 1000 > s.version) {
+ dirty_dups = true;
+ dups.push_back(pg_log_dup_t(e));
+ dup_index[e.reqid] = &(dups.back());
+ for (const auto& extra : e.extra_reqids) {
+ dups.push_back(pg_log_dup_t(e.version, extra.second,
+ extra.first, e.return_code));
+ dup_index[extra->first] = &(dups.back());
+ }
+ }
+
if (rollback_info_trimmed_to_riter == log.rend() ||
e.version == rollback_info_trimmed_to_riter->version) {
log.pop_front();
}
}
+ while (!dups.empty()) {
+ auto &e = *dups.begin();
+ if (e.version.version + 1000 > s.version)
+ break;
+ generic_dout(20) << "trim dup " << e << dendl;
+ if (trimmed_dups)
+ trimmed_dups->insert(e.get_key_name());
+ dup_index.erase(e.reqid);
+ dups.pop_front();
+ }
+
// raise tail?
if (tail < s)
tail = s;
assert(trim_to <= info.last_complete);
dout(10) << "trim " << log << " to " << trim_to << dendl;
- log.trim(cct, trim_to, &trimmed);
+ log.trim(cct, trim_to, &trimmed, &trimmed_dups);
info.log_tail = log.tail;
}
}
<< ", dirty_from: " << dirty_from
<< ", writeout_from: " << writeout_from
<< ", trimmed: " << trimmed
+ << ", trimmed_dups: " << trimmed_dups
<< ", clear_divergent_priors: " << clear_divergent_priors
<< dendl;
_write_log_and_missing(
dirty_from,
writeout_from,
trimmed,
+ trimmed_dups,
missing,
!touched_log,
require_rollback,
eversion_t dirty_from,
eversion_t writeout_from,
const set<eversion_t> &trimmed,
+ const set<string> &trimmed_dups,
bool dirty_divergent_priors,
bool touch_log,
bool require_rollback,
set<string> *log_keys_debug
)
{
- set<string> to_remove;
+ set<string> to_remove(trimmed_dups);
for (set<eversion_t>::const_iterator i = trimmed.begin();
i != trimmed.end();
++i) {
(*km)[p->get_key_name()].claim(bl);
}
+ if (dirty_dups) {
+ pg_log_dup_t min;
+ t.omap_rmkeyrange(
+ coll, log_oid,
+ min.get_key_name(), log.dups.begin()->get_key_name());
+ for (const auto& entry : log.dups) {
+ bufferlist bl;
+ ::encode(entry, bl);
+ (*km)[entry.get_key_name()].claim(bl);
+ }
+ }
+
if (log_keys_debug) {
for (map<string, bufferlist>::iterator i = (*km).begin();
i != (*km).end();
eversion_t dirty_from,
eversion_t writeout_from,
const set<eversion_t> &trimmed,
+ const set<string> &trimmed_dups,
const pg_missing_tracker_t &missing,
bool touch_log,
bool require_rollback,
bool *rebuilt_missing_with_deletes, // in/out param
set<string> *log_keys_debug
) {
- set<string> to_remove;
+ set<string> to_remove(trimmed_dups);
for (set<eversion_t>::const_iterator i = trimmed.begin();
i != trimmed.end();
++i) {
(*km)[p->get_key_name()].claim(bl);
}
+ if (dirty_dups) {
+ pg_log_dup_t min;
+ t.omap_rmkeyrange(
+ coll, log_oid,
+ min.get_key_name(), log.dups.begin()->get_key_name());
+ for (const auto& entry : log.dups) {
+ bufferlist bl;
+ ::encode(entry, bl);
+ (*km)[entry.get_key_name()].claim(bl);
+ }
+ }
+
if (log_keys_debug) {
for (map<string, bufferlist>::iterator i = (*km).begin();
i != (*km).end();
mutable ceph::unordered_map<hobject_t,pg_log_entry_t*> objects; // ptrs into log. be careful!
mutable ceph::unordered_map<osd_reqid_t,pg_log_entry_t*> caller_ops;
mutable ceph::unordered_multimap<osd_reqid_t,pg_log_entry_t*> extra_caller_ops;
+ mutable ceph::unordered_map<osd_reqid_t, pg_log_dup_t*> dup_index;
// recovery pointers
list<pg_log_entry_t>::iterator complete_to; // not inclusive of referenced item
objects.clear();
caller_ops.clear();
extra_caller_ops.clear();
+ dup_index.clear();
indexed_data = 0;
}
void unindex(pg_log_entry_t& e) {
void trim(
CephContext* cct,
eversion_t s,
- set<eversion_t> *trimmed);
+ set<eversion_t> *trimmed,
+ set<string> *trimmed_dups);
ostream& print(ostream& out) const;
};
eversion_t dirty_from; ///< must clear/writeout all keys >= dirty_from
eversion_t writeout_from; ///< must writout keys >= writeout_from
set<eversion_t> trimmed; ///< must clear keys in trimmed
+ set<string> trimmed_dups; ///< must clear keys in trimmed_dups
CephContext *cct;
bool pg_log_debug;
/// Log is clean on [dirty_to, dirty_from)
bool touched_log;
bool clear_divergent_priors;
+ bool dirty_dups; /// log.dups is updated
bool rebuilt_missing_with_deletes = false;
void mark_dirty_to(eversion_t to) {
(writeout_from != eversion_t::max()) ||
!(trimmed.empty()) ||
!missing.is_clean() ||
+ !(trimmed_dups.empty()) ||
rebuilt_missing_with_deletes;
}
void mark_log_for_rewrite() {
dirty_from = eversion_t::max();
touched_log = true;
trimmed.clear();
+ trimmed_dups.clear();
writeout_from = eversion_t::max();
check();
missing.flush();
+ dirty_dups = false;
}
public:
// cppcheck-suppress noExplicitConstructor
eversion_t dirty_from,
eversion_t writeout_from,
const set<eversion_t> &trimmed,
+ const set<string> &trimmed_dups,
bool dirty_divergent_priors,
bool touch_log,
bool require_rollback,
eversion_t dirty_from,
eversion_t writeout_from,
const set<eversion_t> &trimmed,
+ const set<string> &trimmed_dups,
const pg_missing_tracker_t &missing,
bool touch_log,
bool require_rollback,
bool has_divergent_priors = false;
missing.may_include_deletes = false;
list<pg_log_entry_t> entries;
+ list<pg_log_dup_t> dups;
if (p) {
for (p->seek_to_first(); p->valid() ; p->next(false)) {
// non-log pgmeta_oid keys are prefixed with _; skip those
assert(missing.may_include_deletes);
}
missing.add(oid, item.need, item.have, item.is_delete());
+ } else if (p->key().substr(0, 4) == string("dup_")) {
+ pg_log_dup_t dup;
+ ::decode(dup, bp);
+ if (!dups.empty()) {
+ assert(dups.back().version < dup.version);
+ }
+ dups.push_back(dup);
} else {
pg_log_entry_t e;
e.decode_with_checksum(bp);
info.log_tail,
on_disk_can_rollback_to,
on_disk_rollback_info_trimmed_to,
- std::move(entries));
+ std::move(entries),
+ std::move(dups));
if (has_divergent_priors || debug_verify_stored_missing) {
// build missing
return out;
}
+// -- pg_log_dup_t --
+
+string pg_log_dup_t::get_key_name() const
+{
+ return "dup_" + version.get_key_name();
+}
+
+void pg_log_dup_t::encode(bufferlist &bl) const
+{
+ ENCODE_START(1, 1, bl);
+ ::encode(reqid, bl);
+ ::encode(version, bl);
+ ::encode(user_version, bl);
+ ::encode(return_code, bl);
+ ENCODE_FINISH(bl);
+}
+
+void pg_log_dup_t::decode(bufferlist::iterator &bl)
+{
+ DECODE_START(1, bl);
+ ::decode(reqid, bl);
+ ::decode(version, bl);
+ ::decode(user_version, bl);
+ ::decode(return_code, bl);
+ DECODE_FINISH(bl);
+}
+
+void pg_log_dup_t::dump(Formatter *f) const
+{
+ f->dump_stream("reqid") << reqid;
+ f->dump_stream("version") << version;
+ f->dump_stream("user_version") << user_version;
+ f->dump_stream("return_code") << return_code;
+}
+
+void pg_log_dup_t::generate_test_instances(list<pg_log_dup_t*>& o)
+{
+ o.push_back(new pg_log_dup_t());
+ o.push_back(new pg_log_dup_t(osd_reqid_t(entity_name_t::CLIENT(777), 8, 999),
+ eversion_t(1,2), 1, 0);
+ o.push_back(new pg_log_dup_t(osd_reqid_t(entity_name_t::CLIENT(777), 8, 999),
+ eversion_t(1,2), 2, -ENOENT);
+}
+
+ostream& operator<<(ostream& out, const pg_log_dup_t& e)
+{
+ out << e.reqid << " v" << e.version << " uv" << e.user_version
+ << " rc=" << e.return_code;
+ return out;
+}
+
// -- pg_log_t --
void pg_log_t::encode(bufferlist& bl) const
{
- ENCODE_START(6, 3, bl);
+ ENCODE_START(7, 3, bl);
::encode(head, bl);
::encode(tail, bl);
::encode(log, bl);
::encode(can_rollback_to, bl);
::encode(rollback_info_trimmed_to, bl);
+ ::encode(dups, bl);
ENCODE_FINISH(bl);
}
void pg_log_t::decode(bufferlist::iterator &bl, int64_t pool)
{
- DECODE_START_LEGACY_COMPAT_LEN(6, 3, 3, bl);
+ DECODE_START_LEGACY_COMPAT_LEN(7, 3, 3, bl);
::decode(head, bl);
::decode(tail, bl);
if (struct_v < 2) {
::decode(rollback_info_trimmed_to, bl);
else
rollback_info_trimmed_to = tail;
+
+ if (struct_v >= 7)
+ ::decode(dups, bl);
+
DECODE_FINISH(bl);
// handle hobject_t format change
f->close_section();
}
f->close_section();
+ f->open_array_section("dups");
+ for (const auto& entry : dups) {
+ f->open_object_section("entry");
+ entry.dump(f);
+ f->close_section();
+ }
+ f->close_section();
}
void pg_log_t::generate_test_instances(list<pg_log_t*>& o)
}
}
-ostream& pg_log_t::print(ostream& out) const
+ostream& pg_log_t::print(ostream& out) const
{
out << *this << std::endl;
for (list<pg_log_entry_t>::const_iterator p = log.begin();
p != log.end();
- ++p)
+ ++p)
out << *p << std::endl;
+ for (const auto& entry : dups) {
+ out << " dup entry: " << entry << std::endl;
+ }
return out;
}
ostream& operator<<(ostream& out, const pg_log_entry_t& e);
+struct pg_log_dup_t {
+ osd_reqid_t reqid; // caller+tid to uniquely identify request
+ eversion_t version;
+ version_t user_version; // the user version for this entry
+ int32_t return_code; // only stored for ERRORs for dup detection
+ pg_log_dup_t()
+ : user_version(0), return_code(0) {}
+ pg_log_dup_t(const pg_log_entry_t &entry) explicit
+ : reqid(entry.reqid), version(entry.version),
+ user_version(entry.user_version), return_code(entry.return_code)
+ {}
+ pg_log_dup_t(const eversion_t& v, version_t uv,
+ const osd_reqid_t& rid, int return_code)
+ : reqid(rid), version(v), user_version(uv),
+ return_code(return_code)
+ {}
+ string get_key_name() const;
+ void encode(bufferlist &bl) const;
+ void decode(bufferlist::iterator &bl);
+ void dump(Formatter *f) const;
+ static void generate_test_instances(list<pg_log_dup_t*>& o);
+};
+WRITE_CLASS_ENCODER(pg_log_dup_t)
/**
* pg_log_t - incremental log of recent pg changes.
public:
mempool::osd_pglog::list<pg_log_entry_t> log; // the actual log.
+ mempool::osd_pglog::list<pg_log_dup_t> dups; // entries just for dup op detection
pg_log_t() = default;
pg_log_t(const eversion_t &last_update,
const eversion_t &log_tail,
const eversion_t &can_rollback_to,
const eversion_t &rollback_info_trimmed_to,
- mempool::osd_pglog::list<pg_log_entry_t> &&entries)
+ mempool::osd_pglog::list<pg_log_entry_t> &&entries,
+ mempool::osd_pglog::list<pg_log_dup_t> &&dup_entries)
: head(last_update), tail(log_tail), can_rollback_to(can_rollback_to),
rollback_info_trimmed_to(rollback_info_trimmed_to),
- log(std::move(entries)) {}
+ log(std::move(entries)), dups(std::move(dup_entries)) {}
pg_log_t(const eversion_t &last_update,
const eversion_t &log_tail,
const eversion_t &can_rollback_to,
const eversion_t &rollback_info_trimmed_to,
- const std::list<pg_log_entry_t> &entries)
+ const std::list<pg_log_entry_t> &entries,
+ const std::list<pg_log_dup_t> &dup_entries)
: head(last_update), tail(log_tail), can_rollback_to(can_rollback_to),
rollback_info_trimmed_to(rollback_info_trimmed_to) {
for (auto &&entry: entries) {
log.push_back(entry);
}
+ for (auto &&entry: dup_entries) {
+ dups.push_back(entry);
+ }
}
void clear() {
eversion_t z;
rollback_info_trimmed_to = can_rollback_to = head = tail = z;
log.clear();
+ dups.clear();
}
eversion_t get_rollback_info_trimmed_to() const {
oldlog.erase(i++);
}
+ // osd_reqid is unique, so it doesn't matter if there are extra
+ // dup entries in each pg. To avoid storing oid with the dup
+ // entries, just copy the whole list.
+ auto childdups(dups);
+
return pg_log_t(
head,
tail,
can_rollback_to,
rollback_info_trimmed_to,
- std::move(childlog));
+ std::move(childlog),
+ std::move(childdups));
}
mempool::osd_pglog::list<pg_log_entry_t> rewind_from_head(eversion_t newhead) {