void PGLog::IndexedLog::trim(
CephContext* cct,
eversion_t s,
- set<eversion_t> *trimmed,
+ set<string> *trimmed,
set<string>* trimmed_dups,
eversion_t *write_from_dups)
{
break;
lgeneric_subdout(cct, osd, 20) << "trim " << e << dendl;
if (trimmed)
- trimmed->emplace(e.version);
+ trimmed->insert(e.get_key_name());
unindex(e); // remove from index,
eversion_t::max(),
eversion_t(),
eversion_t(),
- set<eversion_t>(),
+ set<string>(),
set<string>(),
missing,
true, require_rollback, false,
eversion_t dirty_to,
eversion_t dirty_from,
eversion_t writeout_from,
- set<eversion_t> &&trimmed,
+ set<string> &&trimmed,
set<string> &&trimmed_dups,
const pg_missing_tracker_t &missing,
bool touch_log,
) {
set<string> to_remove;
to_remove.swap(trimmed_dups);
- for (auto& t : trimmed) {
- string key = t.get_key_name();
+ for (auto& key : trimmed) {
if (log_keys_debug) {
auto it = log_keys_debug->find(key);
ceph_assert(it != log_keys_debug->end());
log.add(mk_ple_mod(mk_obj(4), mk_evt(21, 165), mk_evt(26, 160)));
log.add(mk_ple_dt_rb(mk_obj(5), mk_evt(21, 167), mk_evt(31, 166)));
- std::set<eversion_t> trimmed;
+ std::set<std::string> trimmed;
std::set<std::string> trimmed_dups;
eversion_t write_from_dups = eversion_t::max();
SetUp(15);
- std::set<eversion_t> trimmed2;
+ std::set<std::string> trimmed2;
std::set<std::string> trimmed_dups2;
eversion_t write_from_dups2 = eversion_t::max();
log.add(mk_ple_mod(mk_obj(4), mk_evt(21, 165), mk_evt(26, 160)));
log.add(mk_ple_dt_rb(mk_obj(5), mk_evt(21, 167), mk_evt(31, 166)));
- std::set<eversion_t> trimmed;
+ std::set<std::string> trimmed;
std::set<std::string> trimmed_dups;
eversion_t write_from_dups = eversion_t::max();
log.add(mk_ple_mod(mk_obj(4), mk_evt(21, 165), mk_evt(26, 160)));
log.add(mk_ple_dt_rb(mk_obj(5), mk_evt(21, 167), mk_evt(31, 166)));
- std::set<eversion_t> trimmed;
+ std::set<std::string> trimmed;
std::set<std::string> trimmed_dups;
eversion_t write_from_dups = eversion_t::max();
log.add(mk_ple_mod(mk_obj(4), mk_evt(21, 165), mk_evt(26, 160)));
log.add(mk_ple_dt_rb(mk_obj(5), mk_evt(21, 167), mk_evt(31, 166)));
- std::set<eversion_t> trimmed;
+ std::set<std::string> trimmed;
std::set<std::string> trimmed_dups;
eversion_t write_from_dups = eversion_t::max();
return 0;
}
+bool need_trim_dups(ObjectStore *store, ObjectStore::CollectionHandle ch,
+ const ghobject_t &oid) {
+ size_t count_dups = 0;
+ ObjectMap::ObjectMapIterator p = store->get_omap_iterator(ch, oid);
+ if (!p)
+ return false;
+ for (p->seek_to_first(); p->valid(); p->next()) {
+ if (p->key().substr(0, 4) == string("dup_"))
+ if (++count_dups > g_ceph_context->_conf->osd_pg_log_dups_tracked)
+ return true;
+ }
+ return false;
+}
+
int do_trim_pg_log(ObjectStore *store, const coll_t &coll,
pg_info_t &info, const spg_t &pgid,
epoch_t map_epoch,
cerr << "Log bounds are: " << "(" << info.log_tail << ","
<< info.last_update << "]" << std::endl;
-
- uint64_t max_entries = g_ceph_context->_conf->osd_max_pg_log_entries;
- if (info.last_update.version - info.log_tail.version <= max_entries) {
- cerr << "Log not larger than osd_max_pg_log_entries " << max_entries << std::endl;
+ uint64_t max_entries = std::min<uint64_t>(g_ceph_context->_conf->osd_max_pg_log_entries,
+ (info.last_update.version) ? (info.last_update.version - 1)
+ : g_ceph_context->_conf->osd_max_pg_log_entries
+ );
+ if (info.last_update.version - info.log_tail.version <= max_entries &&
+ !need_trim_dups(store, ch, oid)) {
+ cerr << "Log not larger than osd_max_pg_log_entries " << max_entries << " and no duplicate entries to trim" << std::endl;
return 0;
}
+ PGLog::IndexedLog pg_log;
+ pg_log.head = info.last_update;
+ pg_log.skip_can_rollback_to_to_head();
+
ceph_assert(info.last_update.version > max_entries);
- version_t trim_to = info.last_update.version - max_entries;
+ eversion_t trim_to(info.last_update.epoch, info.last_update.version - max_entries);
+
size_t trim_at_once = g_ceph_context->_conf->osd_pg_log_trim_max;
+ size_t tracked_dups = g_ceph_context->_conf->osd_pg_log_dups_tracked;
+ std::deque<pg_log_dup_t> latest_dups;
+ g_ceph_context->_conf->osd_pg_log_dups_tracked = 0; // Fake dup trimming all
+
eversion_t new_tail;
bool done = false;
while (!done) {
// gather keys so we can delete them in a batch without
// affecting the iterator
- set<string> keys_to_trim;
{
ObjectMap::ObjectMapIterator p = store->get_omap_iterator(ch, oid);
if (!p)
continue;
if (p->key().substr(0, 7) == string("missing"))
continue;
- if (p->key().substr(0, 4) == string("dup_"))
- continue;
-
bufferlist bl = p->value();
auto bp = bl.cbegin();
- pg_log_entry_t e;
- try {
- e.decode_with_checksum(bp);
- } catch (const buffer::error &e) {
- cerr << "Error reading pg log entry: " << e << std::endl;
- }
- if (debug) {
- cerr << "read entry " << e << std::endl;
+
+ if (p->key().substr(0, 4) == string("dup_")) {
+ pg_log_dup_t dup;
+ try {
+ dup.decode(bp);
+ latest_dups.push_back(dup);
+ if (latest_dups.size() > tracked_dups) {
+ pg_log.dups.push_back(latest_dups.front());
+ latest_dups.pop_front();
+ }
+ if (debug) {
+ cerr << "read entry " << dup << std::endl;
+ }
+ } catch (buffer::error& e) {
+ cerr << "error reading log dup: " << p->key() << std::endl;
+ return -EFAULT;
+ }
+ } else {
+ pg_log_entry_t e;
+ try {
+ e.decode_with_checksum(bp);
+ pg_log.log.push_back(e);
+ if (debug) {
+ cerr << "read entry " << e << std::endl;
+ }
+ } catch (const buffer::error &e) {
+ cerr << "Error reading pg log entry: " << e << std::endl;
+ }
}
- if (e.version.version > trim_to) {
- done = true;
- break;
+ // We have enough pg logs entries to trim
+ if ((pg_log.log.size() + pg_log.dups.size()) >= trim_at_once) {
+ break;
}
- keys_to_trim.insert(p->key());
- new_tail = e.version;
- if (keys_to_trim.size() >= trim_at_once)
- break;
}
-
if (!p->valid())
done = true;
+
+ pg_log.index();
+
} // deconstruct ObjectMapIterator
- // delete the keys
- if (!dry_run && !keys_to_trim.empty()) {
- cout << "Removing keys " << *keys_to_trim.begin() << " - " << *keys_to_trim.rbegin() << std::endl;
- ObjectStore::Transaction t;
- t.omap_rmkeys(coll, oid, keys_to_trim);
- store->queue_transaction(ch, std::move(t));
- ch->flush();
+ if (!dry_run && ((pg_log.log.size() > 0) || (pg_log.dups.size() > 0))) {
+ std::set<std::string> trimmed;
+ std::set<std::string> trimmed_dups;
+ eversion_t write_from_dups = eversion_t::max();
+
+ pg_log.trim(g_ceph_context, trim_to, &trimmed, &trimmed_dups, &write_from_dups);
+ // Is there any trimming to do?
+ if (!trimmed.empty() || !trimmed_dups.empty()) {
+ new_tail = pg_log.tail;
+
+ ObjectStore::Transaction t;
+ if (!trimmed.empty()) {
+ cerr << "Removing keys " << *trimmed.begin() << " - " << *trimmed.rbegin() << std::endl;
+ t.omap_rmkeys(coll, oid, trimmed);
+ }
+ if (!trimmed_dups.empty()) {
+ cerr << "Removing Dups " << *trimmed_dups.begin() << " - " << *trimmed_dups.rbegin() << std::endl;
+ t.omap_rmkeys(coll, oid, trimmed_dups);
+ }
+ store->queue_transaction(ch, std::move(t));
+ ch->flush();
+ pg_log.log.clear();
+ pg_log.dups.clear();
+ }
}
}
// update pg info with new tail
if (!dry_run && new_tail != eversion_t()) {
info.log_tail = new_tail;
+
ObjectStore::Transaction t;
int ret = write_info(t, map_epoch, info, past_intervals);
if (ret)
ch->flush();
}
+ g_ceph_context->_conf->osd_pg_log_dups_tracked = tracked_dups; //Restore original
// compact the db since we just removed a bunch of data
cerr << "Finished trimming, now compacting..." << std::endl;
if (!dry_run)