}
}
-ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
+ReplicatedPG::OpContextUPtr ReplicatedPG::trim_object(const hobject_t &coid)
{
// load clone info
bufferlist bl;
}
}
- RepGather *repop = simple_repop_create(obc);
- OpContext *ctx = repop->ctx;
+ OpContextUPtr ctx = simple_opc_create(obc);
ctx->snapset_obc = snapset_obc;
ctx->lock_to_release = OpContext::W_LOCK;
ctx->release_snapset_obc = true;
ctx->at_version = get_next_version();
+
PGBackend::PGTransaction *t = ctx->op_t;
if (new_snaps.empty()) {
coi.version = ctx->at_version;
bl.clear();
::encode(coi, bl);
- setattr_maybe_cache(ctx->obc, ctx, t, OI_ATTR, bl);
+ setattr_maybe_cache(ctx->obc, ctx.get(), t, OI_ATTR, bl);
ctx->log.push_back(
pg_log_entry_t(
bl.clear();
::encode(ctx->snapset_obc->obs.oi, bl);
attrs[OI_ATTR].claim(bl);
- setattrs_maybe_cache(ctx->snapset_obc, ctx, t, attrs);
+ setattrs_maybe_cache(ctx->snapset_obc, ctx.get(), t, attrs);
if (pool.info.require_rollback()) {
set<string> changing;
}
}
- return repop;
+ return ctx;
}
void ReplicatedPG::snap_trimmer(epoch_t queued)
dout(20) << __func__ << " using temp " << cop->results.temp_oid << dendl;
}
ObjectContextRef tempobc = get_object_context(cop->results.temp_oid, true);
- RepGather *repop = simple_repop_create(tempobc);
+ OpContextUPtr ctx = simple_opc_create(tempobc);
if (cop->temp_cursor.is_initial()) {
- repop->ctx->new_temp_oid = cop->results.temp_oid;
+ ctx->new_temp_oid = cop->results.temp_oid;
}
- _write_copy_chunk(cop, repop->ctx->op_t);
- simple_repop_submit(repop);
+ _write_copy_chunk(cop, ctx->op_t);
+ simple_opc_submit(std::move(ctx));
dout(10) << __func__ << " fetching more" << dendl;
_copy_some(cobc, cop);
return;
dout(10) << __func__ << " abort; will clean up partial work" << dendl;
ObjectContextRef tempobc = get_object_context(results->temp_oid, false);
assert(tempobc);
- RepGather *repop = simple_repop_create(tempobc);
- repop->ctx->op_t->remove(results->temp_oid);
- simple_repop_submit(repop);
+ OpContextUPtr ctx = simple_opc_create(tempobc);
+ ctx->op_t->remove(results->temp_oid);
+ simple_opc_submit(std::move(ctx));
results->started_temp_obj = false;
}
hobject_t head(soid.get_head());
ObjectContextRef obc = get_object_context(head, false);
assert(obc);
- RepGather *repop = simple_repop_create(obc);
- OpContext *tctx = repop->ctx;
+
+ OpContextUPtr tctx = simple_opc_create(obc);
tctx->at_version = get_next_version();
filter_snapc(tctx->new_snapset.snaps);
vector<snapid_t> new_clones;
tctx->lock_to_release = OpContext::W_LOCK;
dout(20) << __func__ << " took lock on obc, " << obc->rwstate << dendl;
- finish_ctx(tctx, pg_log_entry_t::PROMOTE);
+ finish_ctx(tctx.get(), pg_log_entry_t::PROMOTE);
- simple_repop_submit(repop);
+ simple_opc_submit(std::move(tctx));
return;
}
return;
}
- RepGather *repop = simple_repop_create(obc);
- OpContext *tctx = repop->ctx;
+ OpContextUPtr tctx = simple_opc_create(obc);
tctx->at_version = get_next_version();
++tctx->delta_stats.num_objects;
tctx->lock_to_release = OpContext::W_LOCK;
dout(20) << __func__ << " took lock on obc, " << obc->rwstate << dendl;
- finish_ctx(tctx, pg_log_entry_t::PROMOTE);
+ finish_ctx(tctx.get(), pg_log_entry_t::PROMOTE);
- simple_repop_submit(repop);
+ simple_opc_submit(std::move(tctx));
osd->logger->inc(l_osd_tier_promote);
}
dout(10) << __func__ << " clearing DIRTY flag for " << oid << dendl;
- RepGather *repop = simple_repop_create(fop->obc);
- OpContext *ctx = repop->ctx;
+ OpContextUPtr ctx = simple_opc_create(fop->obc);
ctx->on_finish = fop->on_flush;
fop->on_flush = NULL;
ctx->new_obs.oi.clear_flag(object_info_t::FLAG_DIRTY);
--ctx->delta_stats.num_objects_dirty;
- finish_ctx(ctx, pg_log_entry_t::CLEAN);
+ finish_ctx(ctx.get(), pg_log_entry_t::CLEAN);
osd->logger->inc(l_osd_tier_clean);
requeue_ops(ls);
}
- simple_repop_submit(repop);
+ simple_opc_submit(std::move(ctx));
flush_ops.erase(oid);
repop->all_applied = true;
if (!repop->rep_aborted) {
eval_repop(repop);
- if (repop->on_applied) {
- repop->on_applied->complete(0);
- repop->on_applied = NULL;
- }
}
}
// ondisk?
if (repop->all_committed) {
+ for (auto p = repop->on_committed.begin();
+ p != repop->on_committed.end();
+ repop->on_committed.erase(p++)) {
+ (*p)();
+ }
+
if (repop->ctx->op && !repop->log_op_stat) {
log_op_stats(repop->ctx);
repop->log_op_stat = true;
// applied?
if (repop->all_applied) {
+ dout(10) << " applied: " << *repop << " " << dendl;
+ for (auto p = repop->on_applied.begin();
+ p != repop->on_applied.end();
+ repop->on_applied.erase(p++)) {
+ (*p)();
+ }
// send dup acks, in order
if (waiting_for_ack.count(repop->v)) {
calc_min_last_complete_ondisk();
- // kick snap_trimmer if necessary
- if (repop->queue_snap_trimmer) {
- queue_snap_trim();
+ for (auto p = repop->on_success.begin();
+ p != repop->on_success.end();
+ repop->on_success.erase(p++)) {
+ (*p)();
}
dout(10) << " removing " << *repop << dendl;
repop->ctx->op_t = NULL;
}
-ReplicatedPG::RepGather *ReplicatedPG::new_repop(OpContext *ctx, ObjectContextRef obc,
- ceph_tid_t rep_tid)
+ReplicatedPG::RepGather *ReplicatedPG::new_repop(
+ OpContext *ctx, ObjectContextRef obc,
+ ceph_tid_t rep_tid)
{
if (ctx->op)
dout(10) << "new_repop rep_tid " << rep_tid << " on " << *ctx->op->get_req() << dendl;
dout(20) << " clone_obc " << *repop->ctx->clone_obc << dendl;
if (repop->ctx->snapset_obc)
dout(20) << " snapset_obc " << *repop->ctx->snapset_obc << dendl;
+
+ for (auto p = repop->on_finish.begin();
+ p != repop->on_finish.end();
+ repop->on_finish.erase(p++)) {
+ (*p)();
+ }
+
release_op_ctx_locks(repop->ctx);
repop->ctx->finish(0); // FIXME: return value here is sloppy
repop->put();
osd->logger->dec(l_osd_op_wip);
}
-ReplicatedPG::RepGather *ReplicatedPG::simple_repop_create(ObjectContextRef obc)
+ReplicatedPG::OpContextUPtr ReplicatedPG::simple_opc_create(ObjectContextRef obc)
{
dout(20) << __func__ << " " << obc->obs.oi.soid << dendl;
vector<OSDOp> ops;
ceph_tid_t rep_tid = osd->get_tid();
osd_reqid_t reqid(osd->get_cluster_msgr_name(), 0, rep_tid);
- OpContext *ctx = new OpContext(OpRequestRef(), reqid, ops, obc, this);
+ OpContextUPtr ctx(new OpContext(OpRequestRef(), reqid, ops, obc, this));
ctx->op_t = pgbackend->get_transaction();
ctx->mtime = ceph_clock_now(g_ceph_context);
- RepGather *repop = new_repop(ctx, obc, rep_tid);
- return repop;
+ return ctx;
}
-void ReplicatedPG::simple_repop_submit(RepGather *repop)
+void ReplicatedPG::simple_opc_submit(OpContextUPtr ctx)
{
+ RepGather *repop = new_repop(ctx.get(), ctx->obc, ctx->reqid.tid);
dout(20) << __func__ << " " << repop << dendl;
issue_repop(repop);
eval_repop(repop);
return;
}
- RepGather *repop = simple_repop_create(obc);
- OpContext *ctx = repop->ctx;
+ OpContextUPtr ctx = simple_opc_create(obc);
ctx->at_version = get_next_version();
object_info_t& oi = ctx->new_obs.oi;
0,
osd_reqid_t(), ctx->mtime));
- oi.prior_version = repop->obc->obs.oi.version;
+ oi.prior_version = obc->obs.oi.version;
oi.version = ctx->at_version;
bufferlist bl;
::encode(oi, bl);
- setattr_maybe_cache(obc, repop->ctx, t, OI_ATTR, bl);
+ setattr_maybe_cache(obc, ctx.get(), t, OI_ATTR, bl);
if (pool.info.require_rollback()) {
map<string, boost::optional<bufferlist> > to_set;
}
// no ctx->delta_stats
-
- // obc ref swallowed by repop!
- simple_repop_submit(repop);
+ simple_opc_submit(std::move(ctx));
// apply new object state.
ctx->obc->obs = ctx->new_obs;
repop_queue.pop_front();
dout(10) << " canceling repop tid " << repop->rep_tid << dendl;
repop->rep_aborted = true;
- if (repop->on_applied) {
- delete repop->on_applied;
- repop->on_applied = NULL;
- }
+ repop->on_applied.clear();
+ repop->on_committed.clear();
+ repop->on_success.clear();
if (requeue) {
if (repop->ctx->op) {
ObjectContextRef obc = get_object_context(oid, false);
assert(obc);
- RepGather *repop = simple_repop_create(obc);
- OpContext *ctx = repop->ctx;
+ OpContextUPtr ctx = simple_opc_create(obc);
ctx->at_version = get_next_version();
ctx->updated_hset_history = info.hit_set;
utime_t now = ceph_clock_now(cct);
ctx->mtime = now;
- hit_set_trim(repop, 0);
- apply_ctx_stats(ctx);
- simple_repop_submit(repop);
+ hit_set_trim(ctx, 0);
+ apply_ctx_stats(ctx.get());
+ simple_opc_submit(std::move(ctx));
}
info.hit_set = pg_hit_set_history_t();
return true;
}
-struct C_HitSetFlushing : public Context {
- ReplicatedPGRef pg;
- time_t hit_set_name;
- C_HitSetFlushing(ReplicatedPG *p, time_t n) : pg(p), hit_set_name(n) { }
- void finish(int r) {
- pg->hit_set_flushing.erase(hit_set_name);
- }
-};
-
void ReplicatedPG::hit_set_persist()
{
dout(10) << __func__ << dendl;
unsigned max = pool.info.hit_set_count;
utime_t now = ceph_clock_now(cct);
- RepGather *repop;
hobject_t oid;
time_t flush_time = 0;
flush_time = new_hset.begin;
ObjectContextRef obc = get_object_context(oid, true);
- repop = simple_repop_create(obc);
- if (flush_time != 0)
- repop->on_applied = new C_HitSetFlushing(this, flush_time);
- OpContext *ctx = repop->ctx;
+ OpContextUPtr ctx = simple_opc_create(obc);
+ if (flush_time != 0) {
+ ReplicatedPGRef pg(this);
+ ctx->register_on_applied(
+ [pg, flush_time]() {
+ pg->hit_set_flushing.erase(flush_time);
+ });
+ }
+
ctx->at_version = get_next_version();
ctx->updated_hset_history = info.hit_set;
pg_hit_set_history_t &updated_hit_set_hist = *(ctx->updated_hset_history);
map <string, bufferlist> attrs;
attrs[OI_ATTR].claim(boi);
attrs[SS_ATTR].claim(bss);
- setattrs_maybe_cache(ctx->obc, ctx, ctx->op_t, attrs);
+ setattrs_maybe_cache(ctx->obc, ctx.get(), ctx->op_t, attrs);
ctx->log.push_back(
pg_log_entry_t(
pg_log_entry_t::MODIFY,
ctx->log.back().mod_desc.mark_unrollbackable();
}
- hit_set_trim(repop, max);
+ hit_set_trim(ctx, max);
- apply_ctx_stats(ctx);
- simple_repop_submit(repop);
+ apply_ctx_stats(ctx.get());
+ simple_opc_submit(std::move(ctx));
}
-void ReplicatedPG::hit_set_trim(RepGather *repop, unsigned max)
+void ReplicatedPG::hit_set_trim(OpContextUPtr &ctx, unsigned max)
{
- assert(repop->ctx->updated_hset_history);
+ assert(ctx->updated_hset_history);
pg_hit_set_history_t &updated_hit_set_hist =
- *(repop->ctx->updated_hset_history);
+ *(ctx->updated_hset_history);
for (unsigned num = updated_hit_set_hist.history.size(); num > max; --num) {
list<pg_hit_set_info_t>::iterator p = updated_hit_set_hist.history.begin();
assert(p != updated_hit_set_hist.history.end());
assert(!is_degraded_or_backfilling_object(oid));
dout(20) << __func__ << " removing " << oid << dendl;
- ++repop->ctx->at_version.version;
- repop->ctx->log.push_back(
+ ++ctx->at_version.version;
+ ctx->log.push_back(
pg_log_entry_t(pg_log_entry_t::DELETE,
oid,
- repop->ctx->at_version,
+ ctx->at_version,
p->version,
0,
osd_reqid_t(),
- repop->ctx->mtime));
+ ctx->mtime));
if (pool.info.require_rollback()) {
- if (repop->ctx->log.back().mod_desc.rmobject(
- repop->ctx->at_version.version)) {
- repop->ctx->op_t->stash(oid, repop->ctx->at_version.version);
+ if (ctx->log.back().mod_desc.rmobject(
+ ctx->at_version.version)) {
+ ctx->op_t->stash(oid, ctx->at_version.version);
} else {
- repop->ctx->op_t->remove(oid);
+ ctx->op_t->remove(oid);
}
} else {
- repop->ctx->op_t->remove(oid);
- repop->ctx->log.back().mod_desc.mark_unrollbackable();
+ ctx->op_t->remove(oid);
+ ctx->log.back().mod_desc.mark_unrollbackable();
}
updated_hit_set_hist.history.pop_front();
ObjectContextRef obc = get_object_context(oid, false);
assert(obc);
- --repop->ctx->delta_stats.num_objects;
- --repop->ctx->delta_stats.num_objects_hit_set_archive;
- repop->ctx->delta_stats.num_bytes -= obc->obs.oi.size;
- repop->ctx->delta_stats.num_bytes_hit_set_archive -= obc->obs.oi.size;
+ --ctx->delta_stats.num_objects;
+ --ctx->delta_stats.num_objects_hit_set_archive;
+ ctx->delta_stats.num_bytes -= obc->obs.oi.size;
+ ctx->delta_stats.num_bytes_hit_set_archive -= obc->obs.oi.size;
}
}
}
dout(10) << __func__ << " evicting " << obc->obs.oi << dendl;
- RepGather *repop = simple_repop_create(obc);
- OpContext *ctx = repop->ctx;
+ OpContextUPtr ctx = simple_opc_create(obc);
Context *on_evict = new C_AgentEvictStartStop(this);
ctx->on_finish = on_evict;
ctx->lock_to_release = OpContext::W_LOCK;
ctx->at_version = get_next_version();
assert(ctx->new_obs.exists);
- int r = _delete_oid(ctx, true);
+ int r = _delete_oid(ctx.get(), true);
if (obc->obs.oi.is_omap())
ctx->delta_stats.num_objects_omap--;
ctx->delta_stats.num_evict++;
if (obc->obs.oi.is_dirty())
--ctx->delta_stats.num_objects_dirty;
assert(r == 0);
- finish_ctx(ctx, pg_log_entry_t::DELETE, false);
- simple_repop_submit(repop);
+ finish_ctx(ctx.get(), pg_log_entry_t::DELETE, false);
+ simple_opc_submit(std::move(ctx));
osd->logger->inc(l_osd_tier_evict);
osd->logger->inc(l_osd_agent_evict);
return true;
dout(10) << __func__ << " recording digests for " << p->first << dendl;
ObjectContextRef obc = get_object_context(p->first, false);
assert(obc);
- RepGather *repop = simple_repop_create(obc);
- OpContext *ctx = repop->ctx;
+ OpContextUPtr ctx = simple_opc_create(obc);
ctx->at_version = get_next_version();
ctx->mtime = utime_t(); // do not update mtime
ctx->new_obs.oi.set_data_digest(p->second.first);
ctx->new_obs.oi.set_omap_digest(p->second.second);
- finish_ctx(ctx, pg_log_entry_t::MODIFY, true, true);
+ finish_ctx(ctx.get(), pg_log_entry_t::MODIFY, true, true);
ctx->on_finish = new C_ScrubDigestUpdated(this);
- simple_repop_submit(repop);
+ simple_opc_submit(std::move(ctx));
++scrubber.num_digest_updates_pending;
}
ReplicatedPG::SnapTrimmer::~SnapTrimmer()
{
- while (!repops.empty()) {
- (*repops.begin())->put();
- repops.erase(repops.begin());
- }
+ in_flight.clear();
}
void ReplicatedPG::SnapTrimmer::log_enter(const char *state_name)
void ReplicatedPG::TrimmingObjects::exit()
{
context< SnapTrimmer >().log_exit(state_name, enter_time);
- // Clean up repops in case of reset
- set<RepGather *> &repops = context<SnapTrimmer>().repops;
- for (set<RepGather *>::iterator i = repops.begin();
- i != repops.end();
- repops.erase(i++)) {
- (*i)->put();
- }
+ context<SnapTrimmer>().in_flight.clear();
}
boost::statechart::result ReplicatedPG::TrimmingObjects::react(const SnapTrim&)
{
dout(10) << "TrimmingObjects react" << dendl;
- ReplicatedPG *pg = context< SnapTrimmer >().pg;
+ ReplicatedPGRef pg = context< SnapTrimmer >().pg;
snapid_t snap_to_trim = context<SnapTrimmer>().snap_to_trim;
- set<RepGather *> &repops = context<SnapTrimmer>().repops;
+ auto &in_flight = context<SnapTrimmer>().in_flight;
dout(10) << "TrimmingObjects: trimming snap " << snap_to_trim << dendl;
- for (set<RepGather *>::iterator i = repops.begin();
- i != repops.end();
- ) {
- if ((*i)->all_applied && (*i)->all_committed) {
- (*i)->put();
- repops.erase(i++);
- } else {
- ++i;
- }
- }
-
- while (repops.size() < g_conf->osd_pg_max_concurrent_snap_trims) {
+ while (in_flight.size() < g_conf->osd_pg_max_concurrent_snap_trims) {
// Get next
hobject_t old_pos = pos;
int r = pg->snap_mapper.get_next_object_to_trim(snap_to_trim, &pos);
}
dout(10) << "TrimmingObjects react trimming " << pos << dendl;
- RepGather *repop = pg->trim_object(pos);
- if (!repop) {
+ OpContextUPtr ctx = pg->trim_object(pos);
+ if (!ctx) {
dout(10) << __func__ << " could not get write lock on obj "
<< pos << dendl;
pos = old_pos;
return discard_event();
}
- assert(repop);
- repop->queue_snap_trimmer = true;
+ assert(ctx);
+ hobject_t to_remove = pos;
+ ctx->register_on_success(
+ [pg, to_remove, &in_flight]() {
+ in_flight.erase(to_remove);
+ pg->queue_snap_trim();
+ });
- pg->apply_ctx_stats(repop->ctx);
+ pg->apply_ctx_stats(ctx.get());
- repops.insert(repop->get());
- pg->simple_repop_submit(repop);
+ in_flight.insert(pos);
+ pg->simple_opc_submit(std::move(ctx));
}
return discard_event();
}
void ReplicatedPG::WaitingOnReplicas::exit()
{
context< SnapTrimmer >().log_exit(state_name, enter_time);
-
- // Clean up repops in case of reset
- set<RepGather *> &repops = context<SnapTrimmer>().repops;
- for (set<RepGather *>::iterator i = repops.begin();
- i != repops.end();
- repops.erase(i++)) {
- (*i)->put();
- }
+ context<SnapTrimmer>().in_flight.clear();
}
boost::statechart::result ReplicatedPG::WaitingOnReplicas::react(const SnapTrim&)
{
- // Have all the repops applied?
+ // Have all the trims finished?
dout(10) << "Waiting on Replicas react" << dendl;
ReplicatedPG *pg = context< SnapTrimmer >().pg;
- set<RepGather *> &repops = context<SnapTrimmer>().repops;
- for (set<RepGather *>::iterator i = repops.begin();
- i != repops.end();
- repops.erase(i++)) {
- if (!(*i)->all_applied || !(*i)->all_committed) {
- return discard_event();
- } else {
- (*i)->put();
- }
+ if (!context<SnapTrimmer>().in_flight.empty()) {
+ return discard_event();
}
snapid_t &sn = context<SnapTrimmer>().snap_to_trim;