class MOSDSubOp : public Message {
- static const int HEAD_VERSION = 9;
+ static const int HEAD_VERSION = 10;
static const int COMPAT_VERSION = 1;
public:
hobject_t new_temp_oid; ///< new temp object that we must now start tracking
hobject_t discard_temp_oid; ///< previously used temp object that we can now stop tracking
+ /// non-empty if this transaction involves a hit_set history update
+ boost::optional<pg_hit_set_history_t> updated_hit_set_history;
+
int get_cost() const {
if (ops.size() == 1 && ops[0].op.op == CEPH_OSD_OP_PULL)
return ops[0].op.extent.length;
ghobject_t::NO_SHARD);
pgid.shard = ghobject_t::NO_SHARD;
}
+ if (header.version >= 10) {
+ ::decode(updated_hit_set_history, p);
+ }
}
virtual void encode_payload(uint64_t features) {
::encode(discard_temp_oid, payload);
::encode(from, payload);
::encode(pgid.shard, payload);
+ ::encode(updated_hit_set_history, payload);
}
MOSDSubOp()
out << " v " << version
<< " snapset=" << snapset << " snapc=" << snapc;
if (!data_subset.empty()) out << " subset " << data_subset;
+ if (updated_hit_set_history)
+ out << ", has_updated_hit_set_history";
out << ")";
}
};
clear_temp_objs(op.temp_removed);
get_parent()->log_operation(
op.log_entries,
+ op.updated_hit_set_history,
op.trim_to,
!(op.t.empty()),
localt);
PGTransaction *_t,
const eversion_t &trim_to,
vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
Context *on_all_applied,
Context *on_all_commit,
op->version = at_version;
op->trim_to = trim_to;
op->log_entries.swap(log_entries);
+ op->updated_hit_set_history.swap(hset_history);
op->on_local_applied_sync = on_local_applied_sync;
op->on_all_applied = on_all_applied;
op->on_all_commit = on_all_commit;
op->version,
op->trim_to,
op->log_entries,
+ op->updated_hit_set_history,
op->temp_added,
op->temp_cleared);
if (*i == get_parent()->whoami_shard()) {
PGTransaction *t,
const eversion_t &trim_to,
vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
Context *on_all_applied,
Context *on_all_commit,
eversion_t version;
eversion_t trim_to;
vector<pg_log_entry_t> log_entries;
+ boost::optional<pg_hit_set_history_t> updated_hit_set_history;
Context *on_local_applied_sync;
Context *on_all_applied;
Context *on_all_commit;
void ECSubWrite::encode(bufferlist &bl) const
{
- ENCODE_START(1, 1, bl);
+ ENCODE_START(2, 1, bl);
::encode(from, bl);
::encode(tid, bl);
::encode(reqid, bl);
::encode(log_entries, bl);
::encode(temp_added, bl);
::encode(temp_removed, bl);
+ ::encode(updated_hit_set_history, bl);
ENCODE_FINISH(bl);
}
void ECSubWrite::decode(bufferlist::iterator &bl)
{
- DECODE_START(1, bl);
+ DECODE_START(2, bl);
::decode(from, bl);
::decode(tid, bl);
::decode(reqid, bl);
::decode(log_entries, bl);
::decode(temp_added, bl);
::decode(temp_removed, bl);
+ if (struct_v >= 2) {
+ ::decode(updated_hit_set_history, bl);
+ }
DECODE_FINISH(bl);
}
std::ostream &operator<<(
std::ostream &lhs, const ECSubWrite &rhs)
{
- return lhs
- << "ECSubWrite(tid=" << rhs.tid
- << ", reqid=" << rhs.reqid
- << ", at_version=" << rhs.at_version
- << ", trim_to=" << rhs.trim_to << ")";
+ lhs << "ECSubWrite(tid=" << rhs.tid
+ << ", reqid=" << rhs.reqid
+ << ", at_version=" << rhs.at_version
+ << ", trim_to=" << rhs.trim_to;
+ if (rhs.updated_hit_set_history)
+ lhs << ", has_updated_hit_set_history";
+ return lhs << ")";
}
void ECSubWrite::dump(Formatter *f) const
f->dump_stream("reqid") << reqid;
f->dump_stream("at_version") << at_version;
f->dump_stream("trim_to") << trim_to;
+ f->dump_stream("has_updated_hit_set_history")
+ << static_cast<bool>(updated_hit_set_history);
}
void ECSubWrite::generate_test_instances(list<ECSubWrite*> &o)
vector<pg_log_entry_t> log_entries;
set<hobject_t> temp_added;
set<hobject_t> temp_removed;
+ boost::optional<pg_hit_set_history_t> updated_hit_set_history;
ECSubWrite() {}
ECSubWrite(
pg_shard_t from,
eversion_t at_version,
eversion_t trim_to,
vector<pg_log_entry_t> log_entries,
+ boost::optional<pg_hit_set_history_t> updated_hit_set_history,
const set<hobject_t> &temp_added,
const set<hobject_t> &temp_removed)
: from(from), tid(tid), reqid(reqid),
at_version(at_version),
trim_to(trim_to), log_entries(log_entries),
temp_added(temp_added),
- temp_removed(temp_removed) {}
+ temp_removed(temp_removed),
+ updated_hit_set_history(updated_hit_set_history) {}
void encode(bufferlist &bl) const;
void decode(bufferlist::iterator &bl);
void dump(Formatter *f) const;
virtual void log_operation(
vector<pg_log_entry_t> &logv,
+ boost::optional<pg_hit_set_history_t> &hset_history,
const eversion_t &trim_to,
bool transaction_applied,
ObjectStore::Transaction *t) = 0;
PGTransaction *t, ///< [in] trans to execute
const eversion_t &trim_to, ///< [in] trim log to here
vector<pg_log_entry_t> &log_entries, ///< [in] log entries for t
+ /// [in] hitset history (if updated with this transaction)
+ boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync, ///< [in] called when applied locally
Context *on_all_applied, ///< [in] called when all acked
Context *on_all_commit, ///< [in] called when all commit
PGTransaction *_t,
const eversion_t &trim_to,
vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
Context *on_all_acked,
Context *on_all_commit,
t->get_temp_cleared().size() ?
*(t->get_temp_cleared().begin()) :hobject_t(),
log_entries,
+ hset_history,
&op,
op_t);
}
clear_temp_objs(t->get_temp_cleared());
- parent->log_operation(log_entries, trim_to, true, &local_t);
+ parent->log_operation(log_entries, hset_history, trim_to, true, &local_t);
local_t.append(*op_t);
local_t.swap(*op_t);
PGTransaction *t,
const eversion_t &trim_to,
vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_history,
Context *on_local_applied_sync,
Context *on_all_applied,
Context *on_all_commit,
hobject_t new_temp_oid,
hobject_t discard_temp_oid,
vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_history,
InProgressOp *op,
ObjectStore::Transaction *op_t);
void op_applied(InProgressOp *op);
repop->ctx->op_t,
pg_trim_to,
repop->ctx->log,
+ repop->ctx->updated_hset_history,
onapplied_sync,
on_all_applied,
on_all_commit,
hobject_t new_temp_oid,
hobject_t discard_temp_oid,
vector<pg_log_entry_t> &log_entries,
+ boost::optional<pg_hit_set_history_t> &hset_hist,
InProgressOp *op,
ObjectStore::Transaction *op_t)
{
wr->new_temp_oid = new_temp_oid;
wr->discard_temp_oid = discard_temp_oid;
+ wr->updated_hit_set_history = hset_hist;
get_parent()->send_message_osd_cluster(
peer.osd, wr, get_osdmap()->get_epoch());
parent->update_stats(m->pg_stats);
parent->log_operation(
log,
+ m->updated_hit_set_history,
m->pg_trim_to,
update_snaps,
&(rm->localt));
repop->on_applied = new C_HitSetFlushing(this, flush_time);
OpContext *ctx = repop->ctx;
ctx->at_version = get_next_version();
+ ctx->updated_hset_history = info.hit_set;
+ pg_hit_set_history_t &updated_hit_set_hist = *(ctx->updated_hset_history);
- if (info.hit_set.current_last_stamp != utime_t()) {
+ if (updated_hit_set_hist.current_last_stamp != utime_t()) {
// FIXME: we cheat slightly here by bundling in a remove on a object
// other the RepGather object. we aren't carrying an ObjectContext for
// the deleted object over this period.
hobject_t old_obj =
- get_hit_set_current_object(info.hit_set.current_last_stamp);
+ get_hit_set_current_object(updated_hit_set_hist.current_last_stamp);
ctx->log.push_back(
pg_log_entry_t(pg_log_entry_t::DELETE,
old_obj,
ctx->at_version,
- info.hit_set.current_last_update,
+ updated_hit_set_hist.current_last_update,
0,
osd_reqid_t(),
ctx->mtime));
ctx->delta_stats.num_bytes -= st.st_size;
}
- info.hit_set.current_last_update = info.last_update; // *after* above remove!
- info.hit_set.current_info.version = ctx->at_version;
+ updated_hit_set_hist.current_last_update = info.last_update; // *after* above remove!
+ updated_hit_set_hist.current_info.version = ctx->at_version;
- info.hit_set.history.push_back(info.hit_set.current_info);
+ updated_hit_set_hist.history.push_back(updated_hit_set_hist.current_info);
hit_set_create();
- info.hit_set.current_info = pg_hit_set_info_t();
- info.hit_set.current_last_stamp = utime_t();
+ updated_hit_set_hist.current_info = pg_hit_set_info_t();
+ updated_hit_set_hist.current_last_stamp = utime_t();
// fabricate an object_info_t and SnapSet
obc->obs.oi.version = ctx->at_version;
void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max)
{
- for (unsigned num = info.hit_set.history.size(); num > max; --num) {
- list<pg_hit_set_info_t>::iterator p = info.hit_set.history.begin();
- assert(p != info.hit_set.history.end());
+ assert(repop->ctx->updated_hset_history);
+ pg_hit_set_history_t &updated_hit_set_hist =
+ *(repop->ctx->updated_hset_history);
+ for (unsigned num = updated_hit_set_hist.history.size(); num > max; --num) {
+ list<pg_hit_set_info_t>::iterator p = updated_hit_set_hist.history.begin();
+ assert(p != updated_hit_set_hist.history.end());
hobject_t oid = get_hit_set_archive_object(p->begin, p->end);
assert(!is_degraded_object(oid));
}
if (agent_state)
agent_state->remove_oldest_hit_set();
- info.hit_set.history.pop_front();
+ updated_hit_set_hist.history.pop_front();
struct stat st;
int r = osd->store->stat(
}
void log_operation(
vector<pg_log_entry_t> &logv,
+ boost::optional<pg_hit_set_history_t> &hset_history,
const eversion_t &trim_to,
bool transaction_applied,
ObjectStore::Transaction *t) {
+ if (hset_history) {
+ info.hit_set = *hset_history;
+ dirty_info = true;
+ }
append_log(logv, trim_to, *t, transaction_applied);
}
PGBackend::PGTransaction *op_t;
vector<pg_log_entry_t> log;
+ boost::optional<pg_hit_set_history_t> updated_hset_history;
interval_set<uint64_t> modified_ranges;
ObjectContextRef obc;