rados -p test rm foo
done
- # this demonstrates the problem - it should fail
- test_log_size $PGID 41 || return 1
+ # log should have been trimmed down to min_entries with one extra
+ test_log_size $PGID 21 || return 1
if [ "$which" = "test1" ];
then
else
PRIMARY=$(ceph pg $PGID query | jq '.info.stats.up_primary')
kill_daemons $dir TERM osd.$PRIMARY || return 1
- CEPH_ARGS="--osd-max-pg-log-entries=30 --osd-pg-log-trim-max=5" ceph-objectstore-tool --data-path $dir/$PRIMARY --pgid $PGID --op trim-pg-log || return 1
+
+ CEPH_ARGS="--osd-max-pg-log-entries=2" ceph-objectstore-tool --data-path $dir/$PRIMARY --pgid $PGID --op trim-pg-log || return 1
run_osd $dir $PRIMARY || return 1
wait_for_clean || return 1
- test_log_size $PGID 30 || return 1
+ test_log_size $PGID 2 || return 1
fi
}
class MOSDPGUpdateLogMissing : public MOSDFastDispatchOp {
- static const int HEAD_VERSION = 2;
+ static const int HEAD_VERSION = 3;
static const int COMPAT_VERSION = 1;
shard_id_t from;
ceph_tid_t rep_tid = 0;
mempool::osd_pglog::list<pg_log_entry_t> entries;
+ // piggybacked osd/pg state
+ eversion_t pg_trim_to; // primary->replica: trim to here
+ eversion_t pg_roll_forward_to; // primary->replica: trim rollback info to here
epoch_t get_epoch() const { return map_epoch; }
spg_t get_pgid() const { return pgid; }
shard_id_t from,
epoch_t epoch,
epoch_t min_epoch,
- ceph_tid_t rep_tid)
+ ceph_tid_t rep_tid,
+ eversion_t pg_trim_to,
+ eversion_t pg_roll_forward_to)
: MOSDFastDispatchOp(MSG_OSD_PG_UPDATE_LOG_MISSING, HEAD_VERSION,
COMPAT_VERSION),
map_epoch(epoch),
pgid(pgid),
from(from),
rep_tid(rep_tid),
- entries(entries) {}
+ entries(entries),
+ pg_trim_to(pg_trim_to),
+ pg_roll_forward_to(pg_roll_forward_to)
+ {}
private:
~MOSDPGUpdateLogMissing() override {}
out << "pg_update_log_missing(" << pgid << " epoch " << map_epoch
<< "/" << min_epoch
<< " rep_tid " << rep_tid
- << " entries " << entries << ")";
+ << " entries " << entries
+ << " trim_to " << pg_trim_to
+ << " roll_forward_to " << pg_roll_forward_to
+ << ")";
}
void encode_payload(uint64_t features) override {
encode(rep_tid, payload);
encode(entries, payload);
encode(min_epoch, payload);
+ encode(pg_trim_to, payload);
+ encode(pg_roll_forward_to, payload);
}
void decode_payload() override {
bufferlist::iterator p = payload.begin();
} else {
min_epoch = map_epoch;
}
+ if (header.version >= 3) {
+ decode(pg_trim_to, p);
+ decode(pg_roll_forward_to, p);
+ }
}
};
class MOSDPGUpdateLogMissingReply : public MOSDFastDispatchOp {
- static const int HEAD_VERSION = 2;
+ static const int HEAD_VERSION = 3;
static const int COMPAT_VERSION = 1;
spg_t pgid;
shard_id_t from;
ceph_tid_t rep_tid = 0;
+ // piggybacked osd state
+ eversion_t last_complete_ondisk;
epoch_t get_epoch() const { return map_epoch; }
spg_t get_pgid() const { return pgid; }
shard_id_t from,
epoch_t epoch,
epoch_t min_epoch,
- ceph_tid_t rep_tid)
+ ceph_tid_t rep_tid,
+ eversion_t last_complete_ondisk)
: MOSDFastDispatchOp(
MSG_OSD_PG_UPDATE_LOG_MISSING_REPLY,
HEAD_VERSION,
min_epoch(min_epoch),
pgid(pgid),
from(from),
- rep_tid(rep_tid)
+ rep_tid(rep_tid),
+ last_complete_ondisk(last_complete_ondisk)
{}
private:
void print(ostream& out) const override {
out << "pg_update_log_missing_reply(" << pgid << " epoch " << map_epoch
<< "/" << min_epoch
- << " rep_tid " << rep_tid << ")";
+ << " rep_tid " << rep_tid
+ << " lcod " << last_complete_ondisk << ")";
}
void encode_payload(uint64_t features) override {
encode(from, payload);
encode(rep_tid, payload);
encode(min_epoch, payload);
+ encode(last_complete_ondisk, payload);
}
void decode_payload() override {
bufferlist::iterator p = payload.begin();
} else {
min_epoch = map_epoch;
}
+ if (header.version >= 3) {
+ decode(last_complete_ondisk, p);
+ }
}
};
bool PG::append_log_entries_update_missing(
const mempool::osd_pglog::list<pg_log_entry_t> &entries,
- ObjectStore::Transaction &t)
+ ObjectStore::Transaction &t, boost::optional<eversion_t> trim_to,
+ boost::optional<eversion_t> roll_forward_to)
{
assert(!entries.empty());
assert(entries.begin()->version > info.last_update);
PGLogEntryHandler rollbacker{this, &t};
+ if (roll_forward_to) {
+ pg_log.roll_forward(&rollbacker);
+ }
bool invalidate_stats =
pg_log.append_new_log_entries(info.last_backfill,
info.last_backfill_bitwise,
entries,
&rollbacker);
+
+ if (roll_forward_to && entries.rbegin()->soid > info.last_backfill) {
+ pg_log.roll_forward(&rollbacker);
+ }
+ if (roll_forward_to && *roll_forward_to > pg_log.get_can_rollback_to()) {
+ pg_log.roll_forward_to(*roll_forward_to, &rollbacker);
+ last_rollback_info_trimmed_to_applied = *roll_forward_to;
+ }
+
info.last_update = pg_log.get_head();
if (pg_log.get_missing().num_missing() == 0) {
// advance last_complete since nothing else is missing!
info.last_complete = info.last_update;
}
-
info.stats.stats_invalid = info.stats.stats_invalid || invalidate_stats;
+
+ dout(20) << __func__ << "trim_to bool = " << bool(trim_to) << " trim_to = " << (trim_to ? *trim_to : eversion_t()) << dendl;
+ if (trim_to)
+ pg_log.trim(*trim_to, info);
dirty_info = true;
write_if_dirty(t);
return invalidate_stats;
void PG::merge_new_log_entries(
const mempool::osd_pglog::list<pg_log_entry_t> &entries,
- ObjectStore::Transaction &t)
+ ObjectStore::Transaction &t,
+ boost::optional<eversion_t> trim_to,
+ boost::optional<eversion_t> roll_forward_to)
{
dout(10) << __func__ << " " << entries << dendl;
assert(is_primary());
- bool rebuild_missing = append_log_entries_update_missing(entries, t);
+ bool rebuild_missing = append_log_entries_update_missing(entries, t, trim_to, roll_forward_to);
for (set<pg_shard_t>::const_iterator i = actingbackfill.begin();
i != actingbackfill.end();
++i) {
bool append_log_entries_update_missing(
const mempool::osd_pglog::list<pg_log_entry_t> &entries,
- ObjectStore::Transaction &t);
+ ObjectStore::Transaction &t,
+ boost::optional<eversion_t> trim_to,
+ boost::optional<eversion_t> roll_forward_to);
/**
* Merge entries updating missing as necessary on all
*/
void merge_new_log_entries(
const mempool::osd_pglog::list<pg_log_entry_t> &entries,
- ObjectStore::Transaction &t);
+ ObjectStore::Transaction &t,
+ boost::optional<eversion_t> trim_to,
+ boost::optional<eversion_t> roll_forward_to);
void reset_interval_flush();
void start_peering_interval(
[this, entries, repop, on_complete]() {
ObjectStore::Transaction t;
eversion_t old_last_update = info.last_update;
- merge_new_log_entries(entries, t);
+ merge_new_log_entries(entries, t, pg_trim_to, min_last_complete_ondisk);
set<pg_shard_t> waiting_on;
pg_whoami.shard,
get_osdmap()->get_epoch(),
last_peering_reset,
- repop->rep_tid);
+ repop->rep_tid,
+ pg_trim_to,
+ min_last_complete_ondisk);
osd->send_message_osd_cluster(
peer.osd, m, get_osdmap()->get_epoch());
waiting_on.insert(peer);
int r = osd->store->queue_transaction(ch, std::move(t), NULL);
assert(r == 0);
});
+
+ calc_trim_to();
}
void PrimaryLogPG::cancel_log_updates()
op->get_req());
assert(m->get_type() == MSG_OSD_PG_UPDATE_LOG_MISSING);
ObjectStore::Transaction t;
- append_log_entries_update_missing(m->entries, t);
+ boost::optional<eversion_t> op_trim_to, op_roll_forward_to;
+ if (m->pg_trim_to != eversion_t())
+ op_trim_to = m->pg_trim_to;
+ if (m->pg_roll_forward_to != eversion_t())
+ op_roll_forward_to = m->pg_roll_forward_to;
+
+ dout(20) << __func__ << " op_trim_to = " << op_trim_to << " op_roll_forward_to = " << op_roll_forward_to << dendl;
+
+ append_log_entries_update_missing(m->entries, t, op_trim_to, op_roll_forward_to);
+ eversion_t new_lcod = info.last_complete;
Context *complete = new FunctionContext(
[=](int) {
op->get_req());
lock();
if (!pg_has_reset_since(msg->get_epoch())) {
+ update_last_complete_ondisk(new_lcod);
MOSDPGUpdateLogMissingReply *reply =
new MOSDPGUpdateLogMissingReply(
spg_t(info.pgid.pgid, primary_shard().shard),
pg_whoami.shard,
msg->get_epoch(),
msg->min_epoch,
- msg->get_tid());
+ msg->get_tid(),
+ new_lcod);
reply->set_priority(CEPH_MSG_PRIO_HIGH);
msg->get_connection()->send_message(reply);
}
if (it != log_entry_update_waiting_on.end()) {
if (it->second.waiting_on.count(m->get_from())) {
it->second.waiting_on.erase(m->get_from());
+ if (m->last_complete_ondisk != eversion_t()) {
+ update_peer_last_complete_ondisk(m->get_from(), m->last_complete_ondisk);
+ }
} else {
osd->clog->error()
<< info.pgid << " got reply "