class MOSDSubOp : public Message {
- static const int HEAD_VERSION = 10;
+ static const int HEAD_VERSION = 11;
static const int COMPAT_VERSION = 1;
public:
// piggybacked osd/og state
eversion_t pg_trim_to; // primary->replica: trim to here
+ eversion_t pg_trim_rollback_to; // primary->replica: trim rollback
+ // info to here
osd_peer_stat_t peer_stat;
map<string,bufferlist> attrset;
if (header.version >= 10) {
::decode(updated_hit_set_history, p);
}
+ if (header.version >= 11) {
+ ::decode(pg_trim_rollback_to, p);
+ } else {
+ pg_trim_rollback_to = pg_trim_to;
+ }
}
virtual void encode_payload(uint64_t features) {
::encode(from, payload);
::encode(pgid.shard, payload);
::encode(updated_hit_set_history, payload);
+ ::encode(pg_trim_rollback_to, payload);
}
MOSDSubOp()
op.log_entries,
op.updated_hit_set_history,
op.trim_to,
+ op.trim_rollback_to,
!(op.t.empty()),
localt);
localt->append(op.t);
const eversion_t &at_version,
PGTransaction *_t,
const eversion_t &trim_to,
+ const eversion_t &trim_rollback_to,
vector<pg_log_entry_t> &log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
op->hoid = hoid;
op->version = at_version;
op->trim_to = trim_to;
+ op->trim_rollback_to = trim_rollback_to;
op->log_entries.swap(log_entries);
std::swap(op->updated_hit_set_history, hset_history);
op->on_local_applied_sync = on_local_applied_sync;
should_send ? iter->second : ObjectStore::Transaction(),
op->version,
op->trim_to,
+ op->trim_rollback_to,
op->log_entries,
op->updated_hit_set_history,
op->temp_added,
const eversion_t &at_version,
PGTransaction *t,
const eversion_t &trim_to,
+ const eversion_t &trim_rollback_to,
vector<pg_log_entry_t> &log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
hobject_t hoid;
eversion_t version;
eversion_t trim_to;
+ eversion_t trim_rollback_to;
vector<pg_log_entry_t> log_entries;
boost::optional<pg_hit_set_history_t> updated_hit_set_history;
Context *on_local_applied_sync;
void ECSubWrite::encode(bufferlist &bl) const
{
- ENCODE_START(2, 1, bl);
+ ENCODE_START(3, 1, bl);
::encode(from, bl);
::encode(tid, bl);
::encode(reqid, bl);
::encode(temp_added, bl);
::encode(temp_removed, bl);
::encode(updated_hit_set_history, bl);
+ ::encode(trim_rollback_to, bl);
ENCODE_FINISH(bl);
}
void ECSubWrite::decode(bufferlist::iterator &bl)
{
- DECODE_START(2, bl);
+ DECODE_START(3, bl);
::decode(from, bl);
::decode(tid, bl);
::decode(reqid, bl);
if (struct_v >= 2) {
::decode(updated_hit_set_history, bl);
}
+ if (struct_v >= 3) {
+ ::decode(trim_rollback_to, bl);
+ } else {
+ trim_rollback_to = trim_to;
+ }
DECODE_FINISH(bl);
}
lhs << "ECSubWrite(tid=" << rhs.tid
<< ", reqid=" << rhs.reqid
<< ", at_version=" << rhs.at_version
- << ", trim_to=" << rhs.trim_to;
+ << ", trim_to=" << rhs.trim_to
+ << ", trim_rollback_to=" << rhs.trim_rollback_to;
if (rhs.updated_hit_set_history)
lhs << ", has_updated_hit_set_history";
return lhs << ")";
f->dump_stream("reqid") << reqid;
f->dump_stream("at_version") << at_version;
f->dump_stream("trim_to") << trim_to;
+ f->dump_stream("trim_rollback_to") << trim_rollback_to;
f->dump_stream("has_updated_hit_set_history")
<< static_cast<bool>(updated_hit_set_history);
}
o.back()->reqid = osd_reqid_t(entity_name_t::CLIENT(123), 1, 45678);
o.back()->at_version = eversion_t(10, 300);
o.back()->trim_to = eversion_t(5, 42);
+ o.push_back(new ECSubWrite());
+ o.back()->tid = 9;
+ o.back()->reqid = osd_reqid_t(entity_name_t::CLIENT(123), 1, 45678);
+ o.back()->at_version = eversion_t(10, 300);
+ o.back()->trim_to = eversion_t(5, 42);
+ o.back()->trim_rollback_to = eversion_t(8, 250);
}
void ECSubWriteReply::encode(bufferlist &bl) const
ObjectStore::Transaction t;
eversion_t at_version;
eversion_t trim_to;
+ eversion_t trim_rollback_to;
vector<pg_log_entry_t> log_entries;
set<hobject_t> temp_added;
set<hobject_t> temp_removed;
const ObjectStore::Transaction &t,
eversion_t at_version,
eversion_t trim_to,
+ eversion_t trim_rollback_to,
vector<pg_log_entry_t> log_entries,
boost::optional<pg_hit_set_history_t> updated_hit_set_history,
const set<hobject_t> &temp_added,
: from(from), tid(tid), reqid(reqid),
soid(soid), stats(stats), t(t),
at_version(at_version),
- trim_to(trim_to), log_entries(log_entries),
+ trim_to(trim_to), trim_rollback_to(trim_rollback_to),
+ log_entries(log_entries),
temp_added(temp_added),
temp_removed(temp_removed),
updated_hit_set_history(updated_hit_set_history) {}
void PG::append_log(
- vector<pg_log_entry_t>& logv, eversion_t trim_to, ObjectStore::Transaction &t,
+ vector<pg_log_entry_t>& logv,
+ eversion_t trim_to,
+ eversion_t trim_rollback_to,
+ ObjectStore::Transaction &t,
bool transaction_applied)
{
if (transaction_applied)
p->offset = 0;
add_log_entry(*p, keys[p->get_key_name()]);
}
- if (!transaction_applied)
- pg_log.clear_can_rollback_to();
+
+ PGLogEntryHandler handler;
+ if (!transaction_applied) {
+ pg_log.clear_can_rollback_to(&handler);
+ } else if (trim_rollback_to > pg_log.get_rollback_trimmed_to()) {
+ pg_log.trim_rollback_info(
+ trim_rollback_to,
+ &handler);
+ }
dout(10) << "append_log adding " << keys.size() << " keys" << dendl;
t.omap_setkeys(coll_t::META_COLL, log_oid, keys);
- PGLogEntryHandler handler;
+
pg_log.trim(&handler, trim_to, info);
+
+ dout(10) << __func__ << ": trimming to " << trim_rollback_to
+ << " entries " << handler.to_trim << dendl;
handler.apply(this, &t);
// update the local pg, pg log
void add_log_entry(pg_log_entry_t& e, bufferlist& log_bl);
void append_log(
- vector<pg_log_entry_t>& logv, eversion_t trim_to, ObjectStore::Transaction &t,
+ vector<pg_log_entry_t>& logv,
+ eversion_t trim_to,
+ eversion_t trim_rollback_to,
+ ObjectStore::Transaction &t,
bool transaction_applied = true);
bool check_log_for_corruption(ObjectStore *store);
void trim_peers();
vector<pg_log_entry_t> &logv,
boost::optional<pg_hit_set_history_t> &hset_history,
const eversion_t &trim_to,
+ const eversion_t &trim_rollback_to,
bool transaction_applied,
ObjectStore::Transaction *t) = 0;
const eversion_t &at_version, ///< [in] version
PGTransaction *t, ///< [in] trans to execute
const eversion_t &trim_to, ///< [in] trim log to here
+ const eversion_t &trim_rollback_to, ///< [in] trim rollback info to here
vector<pg_log_entry_t> &log_entries, ///< [in] log entries for t
/// [in] hitset history (if updated with this transaction)
boost::optional<pg_hit_set_history_t> &hset_history,
eversion_t trim_to,
pg_info_t &info);
- void clear_can_rollback_to() {
+ void trim_rollback_info(
+ eversion_t trim_rollback_to,
+ LogEntryHandler *h) {
+ if (trim_rollback_to > log.can_rollback_to)
+ log.can_rollback_to = trim_rollback_to;
+ log.advance_rollback_info_trimmed_to(
+ trim_rollback_to,
+ h);
+ }
+
+ void clear_can_rollback_to(LogEntryHandler *h) {
log.can_rollback_to = log.head;
+ log.advance_rollback_info_trimmed_to(
+ log.head,
+ h);
}
//////////////////// get or set log & missing ////////////////////
const eversion_t &at_version,
PGTransaction *_t,
const eversion_t &trim_to,
+ const eversion_t &trim_rollback_to,
vector<pg_log_entry_t> &log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
tid,
reqid,
trim_to,
+ trim_rollback_to,
t->get_temp_added().size() ? *(t->get_temp_added().begin()) : hobject_t(),
t->get_temp_cleared().size() ?
*(t->get_temp_cleared().begin()) :hobject_t(),
}
clear_temp_objs(t->get_temp_cleared());
- parent->log_operation(log_entries, hset_history, trim_to, true, &local_t);
+ parent->log_operation(
+ log_entries,
+ hset_history,
+ trim_to,
+ trim_rollback_to,
+ true,
+ &local_t);
local_t.append(*op_t);
local_t.swap(*op_t);
const eversion_t &at_version,
PGTransaction *t,
const eversion_t &trim_to,
+ const eversion_t &trim_rollback_to,
vector<pg_log_entry_t> &log_entries,
boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
ceph_tid_t tid,
osd_reqid_t reqid,
eversion_t pg_trim_to,
+ eversion_t pg_trim_rollback_to,
hobject_t new_temp_oid,
hobject_t discard_temp_oid,
vector<pg_log_entry_t> &log_entries,
repop->ctx->at_version,
repop->ctx->op_t,
pg_trim_to,
+ min_last_complete_ondisk,
repop->ctx->log,
repop->ctx->updated_hset_history,
onapplied_sync,
ceph_tid_t tid,
osd_reqid_t reqid,
eversion_t pg_trim_to,
+ eversion_t pg_trim_rollback_to,
hobject_t new_temp_oid,
hobject_t discard_temp_oid,
vector<pg_log_entry_t> &log_entries,
wr->pg_stats = get_info().stats;
wr->pg_trim_to = pg_trim_to;
+ wr->pg_trim_rollback_to = pg_trim_rollback_to;
wr->new_temp_oid = new_temp_oid;
wr->discard_temp_oid = discard_temp_oid;
log,
m->updated_hit_set_history,
m->pg_trim_to,
+ m->pg_trim_rollback_to,
update_snaps,
&(rm->localt));
vector<pg_log_entry_t> &logv,
boost::optional<pg_hit_set_history_t> &hset_history,
const eversion_t &trim_to,
+ const eversion_t &trim_rollback_to,
bool transaction_applied,
ObjectStore::Transaction *t) {
if (hset_history) {
info.hit_set = *hset_history;
dirty_info = true;
}
- append_log(logv, trim_to, *t, transaction_applied);
+ append_log(logv, trim_to, trim_rollback_to, *t, transaction_applied);
}
void op_applied(