<< "the pool allows ec overwrites but is not stored in "
<< "bluestore, so deep scrubbing will not detect bitrot";
}
- PG::_create(*rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
- PG::_init(*rctx.transaction, pgid, pp);
+ PG::_create(rctx.transaction, pgid, pgid.get_split_bits(pp->get_pg_num()));
+ PG::_init(rctx.transaction, pgid, pp);
int role = startmap->calc_pg_role(whoami, acting, acting.size());
if (!pp->is_replicated() && role != pgid.shard) {
pg->set_dynamic_perf_stats_queries(m_perf_queries);
}
- pg->handle_initialize(&rctx);
- pg->handle_activate_map(&rctx);
+ pg->handle_initialize(rctx);
+ pg->handle_activate_map(rctx);
dispatch_context(rctx, pg.get(), osdmap, nullptr);
pg->lock();
dout(10) << __func__ << " " << *pg << dendl;
epoch_t e = pg->get_osdmap_epoch();
- pg->handle_initialize(&rctx);
+ pg->handle_initialize(rctx);
pg->queue_null(e, e);
dispatch_context_transaction(rctx, pg);
pg->unlock();
epoch_t osd_epoch,
PG *pg,
ThreadPool::TPHandle &handle,
- PG::PeeringCtx *rctx)
+ PG::PeeringCtx &rctx)
{
if (osd_epoch <= pg->get_osdmap_epoch()) {
return true;
<< " is merge source, target is " << parent
<< dendl;
pg->write_if_dirty(rctx);
- dispatch_context_transaction(*rctx, pg, &handle);
+ dispatch_context_transaction(rctx, pg, &handle);
pg->ch->flush();
pg->on_shutdown();
OSDShard *sdata = pg->osd_shard;
ret = true;
out:
if (!new_pgs.empty()) {
- rctx->transaction->register_on_applied(new C_FinishSplits(this, new_pgs));
+ rctx.transaction.register_on_applied(new C_FinishSplits(this, new_pgs));
}
return ret;
}
const set<spg_t> &childpgids, set<PGRef> *out_pgs,
OSDMapRef curmap,
OSDMapRef nextmap,
- PG::PeeringCtx *rctx)
+ PG::PeeringCtx &rctx)
{
unsigned pg_num = nextmap->get_pg_num(parent->pg_id.pool());
parent->update_snap_mapper_bits(parent->get_pgid().get_split_bits(pg_num));
split_bits,
i->ps(),
&child->get_pool().info,
- rctx->transaction);
+ rctx.transaction);
parent->split_into(
i->pgid,
child,
split_bits);
- child->finish_split_stats(*stat_iter, rctx->transaction);
+ child->finish_split_stats(*stat_iter, rctx.transaction);
child->unlock();
}
ceph_assert(stat_iter != updated_stats.end());
- parent->finish_split_stats(*stat_iter, rctx->transaction);
+ parent->finish_split_stats(*stat_iter, rctx.transaction);
}
/*
PG::PeeringCtx OSD::create_context()
{
- ObjectStore::Transaction *t = new ObjectStore::Transaction;
- map<int, map<spg_t,pg_query_t> > *query_map =
- new map<int, map<spg_t, pg_query_t> >;
- map<int,vector<pair<pg_notify_t, PastIntervals> > > *notify_list =
- new map<int, vector<pair<pg_notify_t, PastIntervals> > >;
- map<int,vector<pair<pg_notify_t, PastIntervals> > > *info_map =
- new map<int,vector<pair<pg_notify_t, PastIntervals> > >;
- PG::PeeringCtx rctx(query_map, info_map, notify_list, t);
- return rctx;
+ return PG::PeeringCtx();
}
void OSD::dispatch_context_transaction(PG::PeeringCtx &ctx, PG *pg,
ThreadPool::TPHandle *handle)
{
- if (!ctx.transaction->empty() || ctx.transaction->has_contexts()) {
+ if (!ctx.transaction.empty() || ctx.transaction.has_contexts()) {
int tr = store->queue_transaction(
pg->ch,
- std::move(*ctx.transaction), TrackedOpRef(), handle);
+ std::move(ctx.transaction), TrackedOpRef(), handle);
ceph_assert(tr == 0);
- delete (ctx.transaction);
- ctx.transaction = new ObjectStore::Transaction;
+ ctx.reset_transaction();
}
}
} else if (!is_active()) {
dout(20) << __func__ << " not active" << dendl;
} else {
- do_notifies(*ctx.notify_list, curmap);
- do_queries(*ctx.query_map, curmap);
- do_infos(*ctx.info_map, curmap);
+ do_notifies(ctx.notify_list, curmap);
+ do_queries(ctx.query_map, curmap);
+ do_infos(ctx.info_map, curmap);
}
- if ((!ctx.transaction->empty() || ctx.transaction->has_contexts()) && pg) {
+ if ((!ctx.transaction.empty() || ctx.transaction.has_contexts()) && pg) {
int tr = store->queue_transaction(
pg->ch,
- std::move(*ctx.transaction), TrackedOpRef(),
+ std::move(ctx.transaction), TrackedOpRef(),
handle);
ceph_assert(tr == 0);
}
- delete ctx.notify_list;
- delete ctx.query_map;
- delete ctx.info_map;
- delete ctx.transaction;
}
-void OSD::discard_context(PG::PeeringCtx& ctx)
-{
- delete ctx.notify_list;
- delete ctx.query_map;
- delete ctx.info_map;
- delete ctx.transaction;
-}
-
-
/** do_notifies
* Send an MOSDPGNotify to a primary, with a list of PGs that I have
* content for, and they are primary for.
if (do_unfound) {
PG::PeeringCtx rctx = create_context();
rctx.handle = &handle;
- pg->find_unfound(queued, &rctx);
+ pg->find_unfound(queued, rctx);
dispatch_context(rctx, pg, pg->get_osdmap());
}
}
derr << __func__ << " unrecognized pg-less event " << evt->get_desc() << dendl;
ceph_abort();
}
- } else if (advance_pg(curmap->get_epoch(), pg, handle, &rctx)) {
- pg->do_peering_event(evt, &rctx);
+ } else if (advance_pg(curmap->get_epoch(), pg, handle, rctx)) {
+ pg->do_peering_event(evt, rctx);
if (pg->is_deleted()) {
- // do not dispatch rctx; the final _delete_some already did it.
- discard_context(rctx);
pg->unlock();
return;
}
epoch_t advance_to,
PG *pg,
ThreadPool::TPHandle &handle,
- PG::PeeringCtx *rctx);
+ PG::PeeringCtx &rctx);
void consume_map();
void activate_map();
const set<spg_t> &childpgids, set<PGRef> *out_pgs,
OSDMapRef curmap,
OSDMapRef nextmap,
- PG::PeeringCtx *rctx);
+ PG::PeeringCtx &rctx);
void _finish_splits(set<PGRef>& pgs);
// == monitor interaction ==
recovery_state.start_split_stats(childpgs, out);
}
-void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction *t)
+void PG::finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction &t)
{
recovery_state.finish_split_stats(stats, t);
}
-void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx *rctx,
+void PG::merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
unsigned split_bits,
const pg_merge_meta_t& last_pg_merge_meta)
{
for (auto& i : sources) {
auto& source = i.second;
// wipe out source's pgmeta
- rctx->transaction->remove(source->coll, source->pgmeta_oid);
+ rctx.transaction.remove(source->coll, source->pgmeta_oid);
// merge (and destroy source collection)
- rctx->transaction->merge_collection(source->coll, coll, split_bits);
+ rctx.transaction.merge_collection(source->coll, coll, split_bits);
}
// merge_collection does this, but maybe all of our sources were missing.
- rctx->transaction->collection_set_bits(coll, split_bits);
+ rctx.transaction.collection_set_bits(coll, split_bits);
snap_mapper.update_bits(split_bits);
}
const pg_history_t& history,
const PastIntervals& pi,
bool backfill,
- ObjectStore::Transaction *t)
+ ObjectStore::Transaction &t)
{
recovery_state.init(
role, newup, new_up_primary, newacting,
recovery_state.set_role(-1);
}
- PG::PeeringCtx rctx(0, 0, 0, new ObjectStore::Transaction);
- handle_initialize(&rctx);
+ PG::PeeringCtx rctx;
+ handle_initialize(rctx);
// note: we don't activate here because we know the OSD will advance maps
// during boot.
- write_if_dirty(*rctx.transaction);
- store->queue_transaction(ch, std::move(*rctx.transaction));
- delete rctx.transaction;
+ write_if_dirty(rctx.transaction);
+ store->queue_transaction(ch, std::move(rctx.transaction));
}
void PG::update_snap_map(
};
typedef std::shared_ptr<FlushState> FlushStateRef;
-void PG::start_flush_on_transaction(ObjectStore::Transaction *t)
+void PG::start_flush_on_transaction(ObjectStore::Transaction &t)
{
// flush in progress ops
FlushStateRef flush_trigger (std::make_shared<FlushState>(
this, get_osdmap_epoch()));
- t->register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
- t->register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
+ t.register_on_applied(new ContainerContext<FlushStateRef>(flush_trigger));
+ t.register_on_commit(new ContainerContext<FlushStateRef>(flush_trigger));
}
bool PG::try_flush_or_schedule_async()
return true;
}
-void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx *rctx)
+void PG::do_peering_event(PGPeeringEventRef evt, PeeringCtx &rctx)
{
dout(10) << __func__ << ": " << evt->get_desc() << dendl;
ceph_assert(have_same_or_newer_map(evt->get_epoch_sent()));
if (old_peering_evt(evt)) {
dout(10) << "discard old " << evt->get_desc() << dendl;
} else {
- recovery_state.handle_event(evt, rctx);
+ recovery_state.handle_event(evt, &rctx);
}
// write_if_dirty regardless of path above to ensure we capture any work
// done by OSD::advance_pg().
- write_if_dirty(*rctx->transaction);
+ write_if_dirty(rctx.transaction);
}
void PG::queue_peering_event(PGPeeringEventRef evt)
NullEvt())));
}
-void PG::find_unfound(epoch_t queued, PeeringCtx *rctx)
+void PG::find_unfound(epoch_t queued, PeeringCtx &rctx)
{
/*
* if we couldn't start any recovery ops and things are still
* It may be that our initial locations were bad and we errored
* out while trying to pull.
*/
- recovery_state.discover_all_missing(*rctx->query_map);
- if (rctx->query_map->empty()) {
+ recovery_state.discover_all_missing(rctx.query_map);
+ if (rctx.query_map.empty()) {
string action;
if (state_test(PG_STATE_BACKFILLING)) {
auto evt = PGPeeringEventRef(
OSDMapRef osdmap, OSDMapRef lastmap,
vector<int>& newup, int up_primary,
vector<int>& newacting, int acting_primary,
- PeeringCtx *rctx)
+ PeeringCtx &rctx)
{
dout(10) << __func__ << ": " << osdmap->get_epoch() << dendl;
osd_shard->update_pg_epoch(pg_slot, osdmap->get_epoch());
rctx);
}
-void PG::handle_activate_map(PeeringCtx *rctx)
+void PG::handle_activate_map(PeeringCtx &rctx)
{
dout(10) << __func__ << ": " << get_osdmap()->get_epoch()
<< dendl;
requeue_map_waiters();
}
-void PG::handle_initialize(PeeringCtx *rctx)
+void PG::handle_initialize(PeeringCtx &rctx)
{
dout(10) << __func__ << dendl;
PeeringState::Initialize evt;
- recovery_state.handle_event(evt, rctx);
+ recovery_state.handle_event(evt, &rctx);
}
void PG::handle_query_state(Formatter *f)
delete this;
}
-void PG::do_delete_work(ObjectStore::Transaction *t)
+void PG::do_delete_work(ObjectStore::Transaction &t)
{
dout(10) << __func__ << dendl;
&next);
dout(20) << __func__ << " " << olist << dendl;
- OSDriver::OSTransaction _t(osdriver.get_transaction(t));
+ OSDriver::OSTransaction _t(osdriver.get_transaction(&t));
int64_t num = 0;
for (auto& oid : olist) {
if (oid == pgmeta_oid) {
if (r != 0 && r != -ENOENT) {
ceph_abort();
}
- t->remove(coll, oid);
+ t.remove(coll, oid);
++num;
}
if (num) {
dout(20) << __func__ << " deleting " << num << " objects" << dendl;
Context *fin = new C_DeleteMore(this, get_osdmap_epoch());
- t->register_on_commit(fin);
+ t.register_on_commit(fin);
} else {
dout(20) << __func__ << " finished" << dendl;
if (cct->_conf->osd_inject_failure_on_pg_removal) {
// are the SnapMapper ContainerContexts.
{
PGRef pgref(this);
- PGLog::clear_info_log(info.pgid, t);
- t->remove_collection(coll);
- t->register_on_commit(new ContainerContext<PGRef>(pgref));
- t->register_on_applied(new ContainerContext<PGRef>(pgref));
- osd->store->queue_transaction(ch, std::move(*t));
+ PGLog::clear_info_log(info.pgid, &t);
+ t.remove_collection(coll);
+ t.register_on_commit(new ContainerContext<PGRef>(pgref));
+ t.register_on_applied(new ContainerContext<PGRef>(pgref));
+ osd->store->queue_transaction(ch, std::move(t));
}
ch->flush();
if (!osd->try_finish_pg_delete(this, pool.info.get_pg_num())) {
dout(1) << __func__ << " raced with merge, reinstantiating" << dendl;
ch = osd->store->create_new_collection(coll);
- _create(*t,
+ _create(t,
info.pgid,
info.pgid.get_split_bits(pool.info.get_pg_num()));
- _init(*t, info.pgid, &pool.info);
+ _init(t, info.pgid, &pool.info);
recovery_state.reset_last_persisted();
} else {
recovery_state.set_delete_complete();
const pg_history_t& history,
const PastIntervals& pim,
bool backfill,
- ObjectStore::Transaction *t);
+ ObjectStore::Transaction &t);
/// read existing pg state off disk
void read_state(ObjectStore *store);
int split_bits,
int seed,
const pg_pool_t *pool,
- ObjectStore::Transaction *t) = 0;
+ ObjectStore::Transaction &t) = 0;
void split_into(pg_t child_pgid, PG *child, unsigned split_bits);
- void merge_from(map<spg_t,PGRef>& sources, PeeringCtx *rctx,
+ void merge_from(map<spg_t,PGRef>& sources, PeeringCtx &rctx,
unsigned split_bits,
const pg_merge_meta_t& last_pg_merge_meta);
- void finish_split_stats(const object_stat_sum_t& stats, ObjectStore::Transaction *t);
+ void finish_split_stats(const object_stat_sum_t& stats,
+ ObjectStore::Transaction &t);
void scrub(epoch_t queued, ThreadPool::TPHandle &handle);
void clear_primary_state() override;
epoch_t oldest_stored_osdmap() override;
- OstreamTemp &get_clog_error() override;
- OstreamTemp &get_clog_info() override;
- OstreamTemp &get_clog_debug() override;
+ OstreamTemp get_clog_error() override;
+ OstreamTemp get_clog_info() override;
+ OstreamTemp get_clog_debug() override;
void schedule_event_after(
PGPeeringEventRef event,
}
PGLog::LogEntryHandlerRef get_log_handler(
- ObjectStore::Transaction *t) override {
- return std::make_unique<PG::PGLogEntryHandler>(this, t);
+ ObjectStore::Transaction &t) override {
+ return std::make_unique<PG::PGLogEntryHandler>(this, &t);
}
- void do_delete_work(ObjectStore::Transaction *t) override;
+ void do_delete_work(ObjectStore::Transaction &t) override;
void clear_ready_to_merge() override;
void set_not_ready_to_merge_target(pg_t pgid, pg_t src) override;
void rebuild_missing_set_with_deletes(PGLog &pglog) override;
void queue_peering_event(PGPeeringEventRef evt);
- void do_peering_event(PGPeeringEventRef evt, PeeringCtx *rcx);
+ void do_peering_event(PGPeeringEventRef evt, PeeringCtx &rcx);
void queue_null(epoch_t msg_epoch, epoch_t query_epoch);
void queue_flushed(epoch_t started_at);
void handle_advance_map(
OSDMapRef osdmap, OSDMapRef lastmap,
vector<int>& newup, int up_primary,
vector<int>& newacting, int acting_primary,
- PeeringCtx *rctx);
- void handle_activate_map(PeeringCtx *rctx);
- void handle_initialize(PeeringCtx *rctx);
+ PeeringCtx &rctx);
+ void handle_activate_map(PeeringCtx &rctx);
+ void handle_initialize(PeeringCtx &rxcx);
void handle_query_state(Formatter *f);
/**
uint64_t *ops_begun) = 0;
// more work after the above, but with a PeeringCtx
- void find_unfound(epoch_t queued, PeeringCtx *rctx);
+ void find_unfound(epoch_t queued, PeeringCtx &rctx);
virtual void get_watchers(std::list<obj_watch_item_t> *ls) = 0;
bool try_fast_info,
PerfCounters *logger = nullptr);
- void write_if_dirty(PeeringCtx *rctx) {
- write_if_dirty(*rctx->transaction);
+ void write_if_dirty(PeeringCtx &rctx) {
+ write_if_dirty(rctx.transaction);
}
protected:
void write_if_dirty(ObjectStore::Transaction& t) {
bool try_flush_or_schedule_async() override;
void start_flush_on_transaction(
- ObjectStore::Transaction *t) override;
+ ObjectStore::Transaction &t) override;
void update_history(const pg_history_t& history) {
recovery_state.update_history(history);
void PeeringState::PeeringMachine::send_query(
pg_shard_t to, const pg_query_t &query) {
ceph_assert(state->rctx);
- ceph_assert(state->rctx->query_map);
- (*state->rctx->query_map)[to.osd][
+ state->rctx->query_map[to.osd][
spg_t(context< PeeringMachine >().spgid.pgid, to.shard)] = query;
}
orig_ctx = new_ctx;
if (new_ctx) {
if (messages_pending_flush) {
- rctx = PeeringCtx(*messages_pending_flush, *new_ctx);
+ rctx.emplace(*messages_pending_flush, *new_ctx);
} else {
- rctx = *new_ctx;
+ rctx.emplace(*new_ctx);
}
rctx->start_time = ceph_clock_now();
}
ceph_assert(orig_ctx);
ceph_assert(rctx);
messages_pending_flush = BufferedRecoveryMessages();
- rctx = PeeringCtx(*messages_pending_flush, *orig_ctx);
+ rctx.emplace(*messages_pending_flush, *orig_ctx);
}
void PeeringState::clear_blocked_outgoing() {
ceph_assert(orig_ctx);
ceph_assert(rctx);
- rctx = PeeringCtx(*orig_ctx);
- rctx->accept_buffered_messages(*messages_pending_flush);
+ orig_ctx->accept_buffered_messages(*messages_pending_flush);
+ rctx.emplace(*orig_ctx);
messages_pending_flush = boost::optional<BufferedRecoveryMessages>();
}
}
machine.event_count++;
- rctx = boost::optional<PeeringCtx>();
+ rctx = std::nullopt;
orig_ctx = NULL;
}
OSDMapRef osdmap, OSDMapRef lastmap,
vector<int>& newup, int up_primary,
vector<int>& newacting, int acting_primary,
- PeeringCtx *rctx)
+ PeeringCtx &rctx)
{
ceph_assert(lastmap->get_epoch() == osdmap_ref->get_epoch());
ceph_assert(lastmap == osdmap_ref);
AdvMap evt(
osdmap, lastmap, newup, up_primary,
newacting, acting_primary);
- handle_event(evt, rctx);
+ handle_event(evt, &rctx);
if (pool.info.last_change == osdmap_ref->get_epoch()) {
pl->on_pool_change();
}
last_require_osd_release = osdmap->require_osd_release;
}
-void PeeringState::activate_map(PeeringCtx *rctx)
+void PeeringState::activate_map(PeeringCtx &rctx)
{
psdout(10) << __func__ << dendl;
ActMap evt;
- handle_event(evt, rctx);
+ handle_event(evt, &rctx);
if (osdmap_ref->get_epoch() - last_persisted_osdmap >
cct->_conf->osd_pg_epoch_persisted_max_stale) {
psdout(20) << __func__ << ": Dirtying info: last_persisted is "
<< last_persisted_osdmap
<< " while current is " << osdmap_ref->get_epoch() << dendl;
}
- write_if_dirty(*rctx->transaction);
+ write_if_dirty(rctx.transaction);
if (get_osdmap()->check_new_blacklist_entries()) {
pl->check_blacklisted_watchers();
const OSDMapRef lastmap,
const vector<int>& newup, int new_up_primary,
const vector<int>& newacting, int new_acting_primary,
- ObjectStore::Transaction *t)
+ ObjectStore::Transaction &t)
{
const OSDMapRef osdmap = get_osdmap();
bool PeeringState::search_for_missing(
const pg_info_t &oinfo, const pg_missing_t &omissing,
pg_shard_t from,
- PeeringCtx *ctx)
+ PeeringCtxWrapper &ctx)
{
uint64_t num_unfound_before = missing_loc.num_unfound();
bool found_missing = missing_loc.add_source_info(
- from, oinfo, omissing, ctx->handle);
+ from, oinfo, omissing, ctx.handle);
if (found_missing && num_unfound_before != missing_loc.num_unfound())
pl->publish_stats_to_osd();
// avoid doing this if the peer is empty. This is abit of paranoia
oinfo.last_update != eversion_t()) {
pg_info_t tinfo(oinfo);
tinfo.pgid.shard = pg_whoami.shard;
- (*(ctx->info_map))[from.osd].emplace_back(
+ ctx.info_map[from.osd].emplace_back(
pg_notify_t(
from.shard, pg_whoami.shard,
get_osdmap_epoch(),
map<int,
vector<
pair<pg_notify_t, PastIntervals> > > *activator_map,
- PeeringCtx *ctx)
+ PeeringCtxWrapper &ctx)
{
ceph_assert(!is_peered());
info.purged_snaps.swap(purged);
}
- ceph_assert(ctx);
// start up replicas
ceph_assert(!acting_recovery_backfill.empty());
// and covers vast majority of the use cases, like one OSD/host is down for
// a while for hardware repairing
if (complete_shards.size() + 1 == acting_recovery_backfill.size()) {
- missing_loc.add_batch_sources_info(complete_shards, ctx->handle);
+ missing_loc.add_batch_sources_info(complete_shards, ctx.handle);
} else {
missing_loc.add_source_info(pg_whoami, info, pg_log.get_missing(),
- ctx->handle);
+ ctx.handle);
for (set<pg_shard_t>::iterator i = acting_recovery_backfill.begin();
i != acting_recovery_backfill.end();
++i) {
*i,
peer_info[*i],
peer_missing[*i],
- ctx->handle);
+ ctx.handle);
}
}
for (map<pg_shard_t, pg_missing_t>::iterator i = peer_missing.begin();
pl->on_activate(std::move(to_trim));
}
if (acting.size() >= pool.info.min_size) {
- PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(&t)};
+ PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
pg_log.roll_forward(rollbacker.get());
}
}
ObjectStore::Transaction& t, pg_info_t &oinfo, pg_log_t &olog,
pg_shard_t from)
{
- PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(&t)};
+ PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
pg_log.merge_log(
oinfo, olog, from, info, rollbacker.get(), dirty_info, dirty_big_info);
}
void PeeringState::rewind_divergent_log(
ObjectStore::Transaction& t, eversion_t newhead)
{
- PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(&t)};
+ PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
pg_log.rewind_divergent_log(
newhead, info, rollbacker.get(), dirty_info, dirty_big_info);
}
pl->send_cluster_message(from.osd, mlog, get_osdmap_epoch(), true);
}
-void PeeringState::fulfill_query(const MQuery& query, PeeringCtx *rctx)
+void PeeringState::fulfill_query(const MQuery& query, PeeringCtxWrapper &rctx)
{
if (query.query.type == pg_query_t::INFO) {
pair<pg_shard_t, pg_info_t> notify_info;
update_history(query.query.history);
fulfill_info(query.from, query.query, notify_info);
- rctx->send_notify(
+ rctx.send_notify(
notify_info.first,
pg_notify_t(
notify_info.first.shard, pg_whoami.shard,
void PeeringState::merge_from(
map<spg_t,PeeringState *>& sources,
- PeeringCtx *rctx,
+ PeeringCtx &rctx,
unsigned split_bits,
const pg_merge_meta_t& last_pg_merge_meta)
{
}
}
- PGLog::LogEntryHandlerRef handler{pl->get_log_handler(rctx->transaction)};
+ PGLog::LogEntryHandlerRef handler{pl->get_log_handler(rctx.transaction)};
pg_log.roll_forward(handler.get());
info.last_complete = info.last_update; // to fake out trim()
// prepare log
PGLog::LogEntryHandlerRef handler{
- source->pl->get_log_handler(rctx->transaction)};
+ source->pl->get_log_handler(rctx.transaction)};
source->pg_log.roll_forward(handler.get());
source->info.last_complete = source->info.last_update; // to fake out trim()
source->pg_log.reset_recovery_pointers();
}
void PeeringState::finish_split_stats(
- const object_stat_sum_t& stats, ObjectStore::Transaction *t)
+ const object_stat_sum_t& stats, ObjectStore::Transaction &t)
{
info.stats.stats.sum = stats;
- write_if_dirty(*t);
+ write_if_dirty(t);
}
void PeeringState::update_blocked_by()
const pg_history_t& history,
const PastIntervals& pi,
bool backfill,
- ObjectStore::Transaction *t)
+ ObjectStore::Transaction &t)
{
psdout(10) << "init role " << role << " up "
<< newup << " acting " << newacting
dirty_info = true;
dirty_big_info = true;
- write_if_dirty(*t);
+ write_if_dirty(t);
}
void PeeringState::dump_peering_state(Formatter *f)
ceph_assert(!entries.empty());
ceph_assert(entries.begin()->version > info.last_update);
- PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(&t)};
+ PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
bool invalidate_stats =
pg_log.append_new_log_entries(
info.last_backfill,
}
psdout(10) << "append_log " << pg_log.get_log() << " " << logv << dendl;
- PGLog::LogEntryHandlerRef handler{pl->get_log_handler(&t)};
+ PGLog::LogEntryHandlerRef handler{pl->get_log_handler(t)};
if (!transaction_applied) {
/* We must be a backfill or async recovery peer, so it's ok if we apply
* out-of-turn since we won't be considered when
* write in question must be fully committed, so it's not valid
* to roll it back anyway (and we'll be rolled forward shortly
* anyway) */
- PGLog::LogEntryHandlerRef handler{pl->get_log_handler(&t)};
+ PGLog::LogEntryHandlerRef handler{pl->get_log_handler(t)};
pg_log.roll_forward_to(v, handler.get());
}
ps->try_mark_clean();
- context< PeeringMachine >().get_cur_transaction()->register_on_commit(
+ context< PeeringMachine >().get_cur_transaction().register_on_commit(
pl->on_clean());
}
ceph_assert(ps->is_primary());
psdout(10) << "In Active, about to call activate" << dendl;
ps->start_flush(context< PeeringMachine >().get_cur_transaction());
- ps->activate(*context< PeeringMachine >().get_cur_transaction(),
+ ps->activate(context< PeeringMachine >().get_cur_transaction(),
ps->get_osdmap_epoch(),
- *context< PeeringMachine >().get_query_map(),
- context< PeeringMachine >().get_info_map(),
+ context< PeeringMachine >().get_query_map(),
+ &context< PeeringMachine >().get_info_map(),
context< PeeringMachine >().get_recovery_ctx());
// everyone has to commit/ack before we are truly active
if (ps->have_unfound()) {
// object may have become unfound
- ps->discover_all_missing(*context< PeeringMachine >().get_query_map());
+ ps->discover_all_missing(context< PeeringMachine >().get_query_map());
}
uint64_t unfound = ps->missing_loc.num_unfound();
ps->proc_replica_info(
notevt.from, notevt.notify.info, notevt.notify.epoch_sent);
if (ps->have_unfound() || (ps->is_degraded() && ps->might_have_unfound.count(notevt.from))) {
- ps->discover_all_missing(*context< PeeringMachine >().get_query_map());
+ ps->discover_all_missing(context< PeeringMachine >().get_query_map());
}
}
return discard_event();
DECLARE_LOCALS
psdout(10) << "In ReplicaActive, about to call activate" << dendl;
map<int, map<spg_t, pg_query_t> > query_map;
- ps->activate(*context< PeeringMachine >().get_cur_transaction(),
- actevt.activation_epoch,
- query_map, NULL, NULL);
+ ps->activate(
+ context< PeeringMachine >().get_cur_transaction(),
+ actevt.activation_epoch,
+ query_map,
+ NULL,
+ context< PeeringMachine >().get_recovery_ctx());
psdout(10) << "Activate Finished" << dendl;
return discard_event();
}
boost::statechart::result PeeringState::ReplicaActive::react(const MInfoRec& infoevt)
{
DECLARE_LOCALS
- ps->proc_primary_info(*context<PeeringMachine>().get_cur_transaction(),
+ ps->proc_primary_info(context<PeeringMachine>().get_cur_transaction(),
infoevt.info);
return discard_event();
}
{
DECLARE_LOCALS
psdout(10) << "received log from " << logevt.from << dendl;
- ObjectStore::Transaction* t = context<PeeringMachine>().get_cur_transaction();
- ps->merge_log(*t, logevt.msg->info, logevt.msg->log, logevt.from);
+ ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
+ ps->merge_log(t, logevt.msg->info, logevt.msg->log, logevt.from);
ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
return discard_event();
MOSDPGLog *msg = logevt.msg.get();
psdout(10) << "got info+log from osd." << logevt.from << " " << msg->info << " " << msg->log << dendl;
- ObjectStore::Transaction* t = context<PeeringMachine>().get_cur_transaction();
+ ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
if (msg->info.last_backfill == hobject_t()) {
// restart backfill
ps->info = msg->info;
ps->pg_log.reset_backfill();
} else {
- ps->merge_log(*t, msg->info, msg->log, logevt.from);
+ ps->merge_log(t, msg->info, msg->log, logevt.from);
}
ceph_assert(ps->pg_log.get_head() == ps->info.last_update);
if (ps->info.last_update > infoevt.info.last_update) {
// rewind divergent log entries
- ObjectStore::Transaction* t = context<PeeringMachine>().get_cur_transaction();
- ps->rewind_divergent_log(*t, infoevt.info.last_update);
+ ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
+ ps->rewind_divergent_log(t, infoevt.info.last_update);
ps->info.stats = infoevt.info.stats;
ps->info.hit_set = infoevt.info.hit_set;
}
context< PeeringMachine >().log_enter(state_name);
DECLARE_LOCALS
ps->deleting = true;
- ObjectStore::Transaction* t = context<PeeringMachine>().get_cur_transaction();
+ ObjectStore::Transaction &t = context<PeeringMachine>().get_cur_transaction();
// clear log
PGLog::LogEntryHandlerRef rollbacker{pl->get_log_handler(t)};
psdout(10) << "leaving GetLog" << dendl;
if (msg) {
psdout(10) << "processing master log" << dendl;
- ps->proc_master_log(*context<PeeringMachine>().get_cur_transaction(),
+ ps->proc_master_log(context<PeeringMachine>().get_cur_transaction(),
msg->info, msg->log, msg->missing,
auth_log_shard);
}
virtual bool try_flush_or_schedule_async() = 0;
/// Arranges for a commit on t to call on_flushed() once flushed.
virtual void start_flush_on_transaction(
- ObjectStore::Transaction *t) = 0;
+ ObjectStore::Transaction &t) = 0;
/// Notification that all outstanding flushes for interval have completed
virtual void on_flushed() = 0;
// =================== Event notification ====================
virtual void on_pool_change() = 0;
virtual void on_role_change() = 0;
- virtual void on_change(ObjectStore::Transaction *t) = 0;
+ virtual void on_change(ObjectStore::Transaction &t) = 0;
virtual void on_activate(interval_set<snapid_t> to_trim) = 0;
virtual void on_activate_complete() = 0;
virtual void on_new_interval() = 0;
// ====================== PG deletion =======================
/// Notification of removal complete, t must be populated to complete removal
- virtual void on_removal(ObjectStore::Transaction *t) = 0;
+ virtual void on_removal(ObjectStore::Transaction &t) = 0;
/// Perform incremental removal work
- virtual void do_delete_work(ObjectStore::Transaction *t) = 0;
+ virtual void do_delete_work(ObjectStore::Transaction &t) = 0;
// ======================= PG Merge =========================
virtual void clear_ready_to_merge() = 0;
// ================== Peering log events ====================
/// Get handler for rolling forward/back log entries
virtual PGLog::LogEntryHandlerRef get_log_handler(
- ObjectStore::Transaction *t) = 0;
+ ObjectStore::Transaction &t) = 0;
// ============ On disk representation changes ==============
virtual void rebuild_missing_set_with_deletes(PGLog &pglog) = 0;
};
struct PeeringCtx {
- utime_t start_time;
- map<int, map<spg_t, pg_query_t> > *query_map;
- map<int, vector<pair<pg_notify_t, PastIntervals> > > *info_map;
- map<int, vector<pair<pg_notify_t, PastIntervals> > > *notify_list;
- ObjectStore::Transaction *transaction;
- HBHandle* handle;
- PeeringCtx(map<int, map<spg_t, pg_query_t> > *query_map,
- map<int,
- vector<pair<pg_notify_t, PastIntervals> > > *info_map,
- map<int,
- vector<pair<pg_notify_t, PastIntervals> > > *notify_list,
- ObjectStore::Transaction *transaction)
- : query_map(query_map), info_map(info_map),
- notify_list(notify_list),
- transaction(transaction),
- handle(NULL) {}
-
- PeeringCtx(BufferedRecoveryMessages &buf, PeeringCtx &rctx)
- : query_map(&(buf.query_map)),
- info_map(&(buf.info_map)),
- notify_list(&(buf.notify_list)),
- transaction(rctx.transaction),
- handle(rctx.handle) {}
+ map<int, map<spg_t, pg_query_t> > query_map;
+ map<int, vector<pair<pg_notify_t, PastIntervals> > > info_map;
+ map<int, vector<pair<pg_notify_t, PastIntervals> > > notify_list;
+ ObjectStore::Transaction transaction;
+ HBHandle* handle = nullptr;
+
+ PeeringCtx() = default;
+
+ void reset_transaction() {
+ transaction = ObjectStore::Transaction();
+ }
void accept_buffered_messages(BufferedRecoveryMessages &m) {
- ceph_assert(query_map);
- ceph_assert(info_map);
- ceph_assert(notify_list);
for (map<int, map<spg_t, pg_query_t> >::iterator i = m.query_map.begin();
i != m.query_map.end();
++i) {
- map<spg_t, pg_query_t> &omap = (*query_map)[i->first];
+ map<spg_t, pg_query_t> &omap = query_map[i->first];
for (map<spg_t, pg_query_t>::iterator j = i->second.begin();
j != i->second.end();
++j) {
i != m.info_map.end();
++i) {
vector<pair<pg_notify_t, PastIntervals> > &ovec =
- (*info_map)[i->first];
+ info_map[i->first];
ovec.reserve(ovec.size() + i->second.size());
ovec.insert(ovec.end(), i->second.begin(), i->second.end());
}
i != m.notify_list.end();
++i) {
vector<pair<pg_notify_t, PastIntervals> > &ovec =
- (*notify_list)[i->first];
+ notify_list[i->first];
ovec.reserve(ovec.size() + i->second.size());
ovec.insert(ovec.end(), i->second.begin(), i->second.end());
}
}
+ };
+
+private:
+ /**
+ * Wraps PeeringCtx to hide the difference between buffering messages to
+ * be sent after flush or immediately.
+ */
+ struct PeeringCtxWrapper {
+ utime_t start_time;
+ map<int, map<spg_t, pg_query_t> > &query_map;
+ map<int, vector<pair<pg_notify_t, PastIntervals> > > &info_map;
+ map<int, vector<pair<pg_notify_t, PastIntervals> > > ¬ify_list;
+ ObjectStore::Transaction &transaction;
+ HBHandle * const handle = nullptr;
+
+ PeeringCtxWrapper(PeeringCtx &wrapped) :
+ query_map(wrapped.query_map),
+ info_map(wrapped.info_map),
+ notify_list(wrapped.notify_list),
+ transaction(wrapped.transaction),
+ handle(wrapped.handle) {}
+
+ PeeringCtxWrapper(BufferedRecoveryMessages &buf, PeeringCtx &wrapped)
+ : query_map(buf.query_map),
+ info_map(buf.info_map),
+ notify_list(buf.notify_list),
+ transaction(wrapped.transaction),
+ handle(wrapped.handle) {}
+
+ PeeringCtxWrapper(PeeringCtxWrapper &&ctx) = default;
void send_notify(pg_shard_t to,
const pg_notify_t &info, const PastIntervals &pi) {
- ceph_assert(notify_list);
- (*notify_list)[to.osd].emplace_back(info, pi);
+ notify_list[to.osd].emplace_back(info, pi);
}
};
+public:
struct QueryState : boost::statechart::event< QueryState > {
Formatter *f;
event_count(0) {}
/* Accessor functions for state methods */
- ObjectStore::Transaction* get_cur_transaction() {
+ ObjectStore::Transaction& get_cur_transaction() {
ceph_assert(state->rctx);
- ceph_assert(state->rctx->transaction);
return state->rctx->transaction;
}
void send_query(pg_shard_t to, const pg_query_t &query);
- map<int, map<spg_t, pg_query_t> > *get_query_map() {
+ map<int, map<spg_t, pg_query_t> > &get_query_map() {
ceph_assert(state->rctx);
- ceph_assert(state->rctx->query_map);
return state->rctx->query_map;
}
- map<int, vector<pair<pg_notify_t, PastIntervals> > > *get_info_map() {
+ map<int, vector<pair<pg_notify_t, PastIntervals> > > &get_info_map() {
ceph_assert(state->rctx);
- ceph_assert(state->rctx->info_map);
return state->rctx->info_map;
}
- PeeringCtx *get_recovery_ctx() { return &*(state->rctx); }
+ PeeringCtxWrapper &get_recovery_ctx() {
+ assert(state->rctx);
+ return *(state->rctx);
+ }
void send_notify(pg_shard_t to,
const pg_notify_t &info, const PastIntervals &pi) {
* the message lists for messages_pending_flush while blocking messages
* or into orig_ctx otherwise
*/
- boost::optional<PeeringCtx> rctx;
+ std::optional<PeeringCtxWrapper> rctx;
/**
* OSDMap state
const OSDMapRef lastmap,
const vector<int>& newup, int up_primary,
const vector<int>& newacting, int acting_primary,
- ObjectStore::Transaction *t);
+ ObjectStore::Transaction &t);
void on_new_interval();
void clear_recovery_state();
void clear_primary_state();
bool search_for_missing(
const pg_info_t &oinfo, const pg_missing_t &omissing,
pg_shard_t fromosd,
- PeeringCtx*);
+ PeeringCtxWrapper &rctx);
void build_might_have_unfound();
void log_weirdness();
void activate(
epoch_t activation_epoch,
map<int, map<spg_t,pg_query_t> >& query_map,
map<int, vector<pair<pg_notify_t, PastIntervals> > > *activator_map,
- PeeringCtx *ctx);
+ PeeringCtxWrapper &ctx);
void rewind_divergent_log(ObjectStore::Transaction& t, eversion_t newhead);
void merge_log(
pair<pg_shard_t, pg_info_t> ¬ify_info);
void fulfill_log(
pg_shard_t from, const pg_query_t &query, epoch_t query_epoch);
- void fulfill_query(const MQuery& q, PeeringCtx *rctx);
+ void fulfill_query(const MQuery& q, PeeringCtxWrapper &rctx);
void try_mark_clean();
/// Process evt
void handle_event(const boost::statechart::event_base &evt,
- PeeringCtx *rctx) {
+ PeeringCtx *rctx) {
start_handle(rctx);
machine.process_event(evt);
end_handle();
/// Process evt
void handle_event(PGPeeringEventRef evt,
- PeeringCtx *rctx) {
+ PeeringCtx *rctx) {
start_handle(rctx);
machine.process_event(evt->get_event());
end_handle();
const pg_history_t& history,
const PastIntervals& pi,
bool backfill,
- ObjectStore::Transaction *t);
+ ObjectStore::Transaction &t);
/// Init pg instance from disk state
template <typename F>
/// Update new child with stats
void finish_split_stats(
- const object_stat_sum_t& stats, ObjectStore::Transaction *t);
+ const object_stat_sum_t& stats, ObjectStore::Transaction &t);
/// Split state for child_pgid into *child
void split_into(
/// Merge state from sources
void merge_from(
map<spg_t,PeeringState *>& sources,
- PeeringCtx *rctx,
+ PeeringCtx &rctx,
unsigned split_bits,
const pg_merge_meta_t& last_pg_merge_meta);
int up_primary, ///< [in] new up primary
vector<int>& newacting, ///< [in] new acting
int acting_primary, ///< [in] new acting primary
- PeeringCtx *rctx ///< [out] recovery context
+ PeeringCtx &rctx ///< [out] recovery context
);
/// Activates most recently updated map
void activate_map(
- PeeringCtx *rctx ///< [out] recovery context
+ PeeringCtx &rctx ///< [out] recovery context
);
/// resets last_persisted_osdmap
* complete_flush is called once for each start_flush call as
* required by start_flush_on_transaction).
*/
- void start_flush(ObjectStore::Transaction *t) {
+ void start_flush(ObjectStore::Transaction &t) {
flushes_in_progress++;
pl->start_flush_on_transaction(t);
}
}
}
-void PrimaryLogPG::on_removal(ObjectStore::Transaction *t)
+void PrimaryLogPG::on_removal(ObjectStore::Transaction &t)
{
dout(10) << __func__ << dendl;
on_shutdown();
- t->register_on_commit(new C_DeleteMore(this, get_osdmap_epoch()));
+ t.register_on_commit(new C_DeleteMore(this, get_osdmap_epoch()));
}
void PrimaryLogPG::clear_async_reads()
agent_setup();
}
-void PrimaryLogPG::on_change(ObjectStore::Transaction *t)
+void PrimaryLogPG::on_change(ObjectStore::Transaction &t)
{
dout(10) << __func__ << dendl;
// registered watches.
context_registry_on_change();
- pgbackend->on_change_cleanup(t);
- scrubber.cleanup_store(t);
+ pgbackend->on_change_cleanup(&t);
+ scrubber.cleanup_store(&t);
pgbackend->on_change();
// clear snap_trimmer state
int split_bits,
int seed,
const pg_pool_t *pool,
- ObjectStore::Transaction *t) override {
+ ObjectStore::Transaction &t) override {
coll_t target = coll_t(child);
- PG::_create(*t, child, split_bits);
- t->split_collection(
+ PG::_create(t, child, split_bits);
+ t.split_collection(
coll,
split_bits,
seed,
target);
- PG::_init(*t, child, pool);
+ PG::_init(t, child, pool);
}
private:
void plpg_on_role_change() override;
void plpg_on_pool_change() override;
void clear_async_reads();
- void on_change(ObjectStore::Transaction *t) override;
+ void on_change(ObjectStore::Transaction &t) override;
void on_activate_complete() override;
void on_flushed() override;
- void on_removal(ObjectStore::Transaction *t) override;
+ void on_removal(ObjectStore::Transaction &t) override;
void on_shutdown() override;
bool check_failsafe_full() override;
bool maybe_preempt_replica_scrub(const hobject_t& oid) override {