From 6509f78a5e08c07143ab48fadf493edfad4c543e Mon Sep 17 00:00:00 2001 From: =?utf8?q?Rados=C5=82aw=20Zarzy=C5=84ski?= Date: Wed, 27 Sep 2023 14:17:06 +0200 Subject: [PATCH] osd: shuffle the ECCommon MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Signed-off-by: Radosław Zarzyński --- src/osd/CMakeLists.txt | 1 + src/osd/ECBackend.cc | 840 +------------------------------------- src/osd/ECCommon.cc | 907 +++++++++++++++++++++++++++++++++++++++++ src/osd/ECCommon.h | 13 +- 4 files changed, 922 insertions(+), 839 deletions(-) create mode 100644 src/osd/ECCommon.cc diff --git a/src/osd/CMakeLists.txt b/src/osd/CMakeLists.txt index 7043f27e2932c..5f1e96573c3bd 100644 --- a/src/osd/CMakeLists.txt +++ b/src/osd/CMakeLists.txt @@ -17,6 +17,7 @@ set(osd_srcs PGLog.cc PrimaryLogPG.cc ReplicatedBackend.cc + ECCommon.cc ECBackend.cc ECTransaction.cc PGBackend.cc diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index 467315a8b7f30..afa5d5bba44e6 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -66,18 +66,6 @@ struct ECRecoveryHandle : public PGBackend::RecoveryHandle { list ops; }; -ostream &operator<<(ostream &lhs, const ECCommon::RMWPipeline::pipeline_state_t &rhs) { - switch (rhs.pipeline_state) { - case ECCommon::RMWPipeline::pipeline_state_t::CACHE_VALID: - return lhs << "CACHE_VALID"; - case ECCommon::RMWPipeline::pipeline_state_t::CACHE_INVALID: - return lhs << "CACHE_INVALID"; - default: - ceph_abort_msg("invalid pipeline state"); - } - return lhs; // unreachable -} - static ostream &operator<<(ostream &lhs, const map &rhs) { lhs << "["; @@ -112,80 +100,6 @@ static ostream &operator<<( << rhs.get<1>() << ", " << rhs.get<2>() << ")"; } -ostream &operator<<(ostream &lhs, const ECCommon::read_request_t &rhs) -{ - return lhs << "read_request_t(to_read=[" << rhs.to_read << "]" - << ", need=" << rhs.need - << ", want_attrs=" << rhs.want_attrs - << ")"; -} - -ostream &operator<<(ostream &lhs, const ECCommon::read_result_t &rhs) -{ - lhs << "read_result_t(r=" << rhs.r - << ", errors=" << rhs.errors; - if (rhs.attrs) { - lhs << ", attrs=" << *(rhs.attrs); - } else { - lhs << ", noattrs"; - } - return lhs << ", returned=" << rhs.returned << ")"; -} - -ostream &operator<<(ostream &lhs, const ECCommon::ReadOp &rhs) -{ - lhs << "ReadOp(tid=" << rhs.tid; - if (rhs.op && rhs.op->get_req()) { - lhs << ", op="; - rhs.op->get_req()->print(lhs); - } - return lhs << ", to_read=" << rhs.to_read - << ", complete=" << rhs.complete - << ", priority=" << rhs.priority - << ", obj_to_source=" << rhs.obj_to_source - << ", source_to_obj=" << rhs.source_to_obj - << ", in_progress=" << rhs.in_progress << ")"; -} - -void ECCommon::ReadOp::dump(Formatter *f) const -{ - f->dump_unsigned("tid", tid); - if (op && op->get_req()) { - f->dump_stream("op") << *(op->get_req()); - } - f->dump_stream("to_read") << to_read; - f->dump_stream("complete") << complete; - f->dump_int("priority", priority); - f->dump_stream("obj_to_source") << obj_to_source; - f->dump_stream("source_to_obj") << source_to_obj; - f->dump_stream("in_progress") << in_progress; -} - -ostream &operator<<(ostream &lhs, const ECCommon::RMWPipeline::Op &rhs) -{ - lhs << "Op(" << rhs.hoid - << " v=" << rhs.version - << " tt=" << rhs.trim_to - << " tid=" << rhs.tid - << " reqid=" << rhs.reqid; - if (rhs.client_op && rhs.client_op->get_req()) { - lhs << " client_op="; - rhs.client_op->get_req()->print(lhs); - } - lhs << " roll_forward_to=" << rhs.roll_forward_to - << " temp_added=" << rhs.temp_added - << " temp_cleared=" << rhs.temp_cleared - << " pending_read=" << rhs.pending_read - << " remote_read=" << rhs.remote_read - << " remote_read_result=" << rhs.remote_read_result - << " pending_apply=" << rhs.pending_apply - << " pending_commit=" << rhs.pending_commit - << " plan.to_read=" << rhs.plan.to_read - << " plan.will_write=" << rhs.plan.will_write - << ")"; - return lhs; -} - ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs) { return lhs << "RecoveryOp(" @@ -1348,184 +1262,13 @@ void ECBackend::handle_sub_read_reply( } } -void ECCommon::ReadPipeline::complete_read_op(ReadOp &rop) -{ - map::iterator reqiter = - rop.to_read.begin(); - map::iterator resiter = - rop.complete.begin(); - ceph_assert(rop.to_read.size() == rop.complete.size()); - for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) { - rop.on_complete->finish_single_request( - reqiter->first, - resiter->second, - reqiter->second.to_read); - } - ceph_assert(rop.on_complete); - std::move(*rop.on_complete).finish(rop.priority); - rop.on_complete = nullptr; - // if the read op is over. clean all the data of this tid. - for (set::iterator iter = rop.in_progress.begin(); - iter != rop.in_progress.end(); - iter++) { - shard_to_read_map[*iter].erase(rop.tid); - } - rop.in_progress.clear(); - tid_to_read_map.erase(rop.tid); -} - -struct FinishReadOp : public GenContext { - ECCommon::ReadPipeline& read_pipeline; - ceph_tid_t tid; - FinishReadOp(ECCommon::ReadPipeline& read_pipeline, ceph_tid_t tid) - : read_pipeline(read_pipeline), tid(tid) {} - void finish(ThreadPool::TPHandle&) override { - auto ropiter = read_pipeline.tid_to_read_map.find(tid); - ceph_assert(ropiter != read_pipeline.tid_to_read_map.end()); - read_pipeline.complete_read_op(ropiter->second); - } -}; - -template -void ECCommon::ReadPipeline::filter_read_op( - const OSDMapRef& osdmap, - ReadOp &op, - F&& on_erase) -{ - set to_cancel; - for (map >::iterator i = op.source_to_obj.begin(); - i != op.source_to_obj.end(); - ++i) { - if (osdmap->is_down(i->first.osd)) { - to_cancel.insert(i->second.begin(), i->second.end()); - op.in_progress.erase(i->first); - continue; - } - } - - if (to_cancel.empty()) - return; - - for (map >::iterator i = op.source_to_obj.begin(); - i != op.source_to_obj.end(); - ) { - for (set::iterator j = i->second.begin(); - j != i->second.end(); - ) { - if (to_cancel.count(*j)) - i->second.erase(j++); - else - ++j; - } - if (i->second.empty()) { - op.source_to_obj.erase(i++); - } else { - ceph_assert(!osdmap->is_down(i->first.osd)); - ++i; - } - } - - for (set::iterator i = to_cancel.begin(); - i != to_cancel.end(); - ++i) { - get_parent()->cancel_pull(*i); - - ceph_assert(op.to_read.count(*i)); - read_request_t &req = op.to_read.find(*i)->second; - dout(10) << __func__ << ": canceling " << req - << " for obj " << *i << dendl; - op.to_read.erase(*i); - op.complete.erase(*i); - on_erase(*i); - } - - if (op.in_progress.empty()) { - /* This case is odd. filter_read_op gets called while processing - * an OSDMap. Normal, non-recovery reads only happen from acting - * set osds. For this op to have had a read source go down and - * there not be an interval change, it must be part of a pull during - * log-based recovery. - * - * This callback delays calling complete_read_op until later to avoid - * dealing with recovery while handling an OSDMap. We assign a - * cost here of 1 because: - * 1) This should be very rare, and the operation itself was already - * throttled. - * 2) It shouldn't result in IO, rather it should result in restarting - * the pull on the affected objects and pushes from in-memory buffers - * on any now complete unaffected objects. - */ -#ifndef WITH_SEASTAR - get_parent()->schedule_recovery_work( - get_parent()->bless_unlocked_gencontext( - new FinishReadOp(*this, op.tid)), - 1); -#else - // TODO - ceph_abort_msg("not yet implemented"); -#endif - } -} - -template -void ECCommon::ReadPipeline::check_recovery_sources( - const OSDMapRef& osdmap, - F&& on_erase) -{ - set tids_to_filter; - for (map >::iterator - i = shard_to_read_map.begin(); - i != shard_to_read_map.end(); - ) { - if (osdmap->is_down(i->first.osd)) { - tids_to_filter.insert(i->second.begin(), i->second.end()); - shard_to_read_map.erase(i++); - } else { - ++i; - } - } - for (set::iterator i = tids_to_filter.begin(); - i != tids_to_filter.end(); - ++i) { - map::iterator j = tid_to_read_map.find(*i); - ceph_assert(j != tid_to_read_map.end()); - filter_read_op(osdmap, j->second, on_erase); - } -} - void ECBackend::check_recovery_sources(const OSDMapRef& osdmap) { +#if 0 read_pipeline.check_recovery_sources(osdmap, [this] (const hobject_t& obj) { recovery_ops.erase(obj); }); -} - -void ECCommon::ReadPipeline::on_change() -{ - for (map::iterator i = tid_to_read_map.begin(); - i != tid_to_read_map.end(); - ++i) { - dout(10) << __func__ << ": cancelling " << i->second << dendl; - } - tid_to_read_map.clear(); - shard_to_read_map.clear(); - in_progress_client_reads.clear(); -} - -void ECCommon::RMWPipeline::on_change() -{ - dout(10) << __func__ << dendl; - - completed_to = eversion_t(); - committed_to = eversion_t(); - pipeline_state.clear(); - waiting_reads.clear(); - waiting_state.clear(); - waiting_commit.clear(); - for (auto &&op: tid_to_op_map) { - cache.release_write_pin(op.second->pin); - } - tid_to_op_map.clear(); +#endif } void ECBackend::on_change() @@ -1656,271 +1399,6 @@ void ECBackend::submit_transaction( rmw_pipeline.start_rmw(std::move(op)); } -void ECCommon::RMWPipeline::call_write_ordered(std::function &&cb) { - if (!waiting_state.empty()) { - waiting_state.back().on_write.emplace_back(std::move(cb)); - } else if (!waiting_reads.empty()) { - waiting_reads.back().on_write.emplace_back(std::move(cb)); - } else { - // Nothing earlier in the pipeline, just call it - cb(); - } -} - -void ECCommon::ReadPipeline::get_all_avail_shards( - const hobject_t &hoid, - const set &error_shards, - set &have, - map &shards, - bool for_recovery) -{ - for (set::const_iterator i = - get_parent()->get_acting_shards().begin(); - i != get_parent()->get_acting_shards().end(); - ++i) { - dout(10) << __func__ << ": checking acting " << *i << dendl; - const pg_missing_t &missing = get_parent()->get_shard_missing(*i); - if (error_shards.find(*i) != error_shards.end()) - continue; - if (!missing.is_missing(hoid)) { - ceph_assert(!have.count(i->shard)); - have.insert(i->shard); - ceph_assert(!shards.count(i->shard)); - shards.insert(make_pair(i->shard, *i)); - } - } - - if (for_recovery) { - for (set::const_iterator i = - get_parent()->get_backfill_shards().begin(); - i != get_parent()->get_backfill_shards().end(); - ++i) { - if (error_shards.find(*i) != error_shards.end()) - continue; - if (have.count(i->shard)) { - ceph_assert(shards.count(i->shard)); - continue; - } - dout(10) << __func__ << ": checking backfill " << *i << dendl; - ceph_assert(!shards.count(i->shard)); - const pg_info_t &info = get_parent()->get_shard_info(*i); - const pg_missing_t &missing = get_parent()->get_shard_missing(*i); - if (hoid < info.last_backfill && - !missing.is_missing(hoid)) { - have.insert(i->shard); - shards.insert(make_pair(i->shard, *i)); - } - } - - map>::const_iterator miter = - get_parent()->get_missing_loc_shards().find(hoid); - if (miter != get_parent()->get_missing_loc_shards().end()) { - for (set::iterator i = miter->second.begin(); - i != miter->second.end(); - ++i) { - dout(10) << __func__ << ": checking missing_loc " << *i << dendl; - auto m = get_parent()->maybe_get_shard_missing(*i); - if (m) { - ceph_assert(!(*m).is_missing(hoid)); - } - if (error_shards.find(*i) != error_shards.end()) - continue; - have.insert(i->shard); - shards.insert(make_pair(i->shard, *i)); - } - } - } -} - -int ECCommon::ReadPipeline::get_min_avail_to_read_shards( - const hobject_t &hoid, - const set &want, - bool for_recovery, - bool do_redundant_reads, - map>> *to_read) -{ - // Make sure we don't do redundant reads for recovery - ceph_assert(!for_recovery || !do_redundant_reads); - - set have; - map shards; - set error_shards; - - get_all_avail_shards(hoid, error_shards, have, shards, for_recovery); - - map>> need; - int r = ec_impl->minimum_to_decode(want, have, &need); - if (r < 0) - return r; - - if (do_redundant_reads) { - vector> subchunks_list; - subchunks_list.push_back(make_pair(0, ec_impl->get_sub_chunk_count())); - for (auto &&i: have) { - need[i] = subchunks_list; - } - } - - if (!to_read) - return 0; - - for (auto &&i:need) { - ceph_assert(shards.count(shard_id_t(i.first))); - to_read->insert(make_pair(shards[shard_id_t(i.first)], i.second)); - } - return 0; -} - -int ECCommon::ReadPipeline::get_remaining_shards( - const hobject_t &hoid, - const set &avail, - const set &want, - const read_result_t &result, - map>> *to_read, - bool for_recovery) -{ - ceph_assert(to_read); - - set have; - map shards; - set error_shards; - for (auto &p : result.errors) { - error_shards.insert(p.first); - } - - get_all_avail_shards(hoid, error_shards, have, shards, for_recovery); - - map>> need; - int r = ec_impl->minimum_to_decode(want, have, &need); - if (r < 0) { - dout(0) << __func__ << " not enough shards left to try for " << hoid - << " read result was " << result << dendl; - return -EIO; - } - - set shards_left; - for (auto p : need) { - if (avail.find(p.first) == avail.end()) { - shards_left.insert(p.first); - } - } - - vector> subchunks; - subchunks.push_back(make_pair(0, ec_impl->get_sub_chunk_count())); - for (set::iterator i = shards_left.begin(); - i != shards_left.end(); - ++i) { - ceph_assert(shards.count(shard_id_t(*i))); - ceph_assert(avail.find(*i) == avail.end()); - to_read->insert(make_pair(shards[shard_id_t(*i)], subchunks)); - } - return 0; -} - -void ECCommon::ReadPipeline::start_read_op( - int priority, - map> &want_to_read, - map &to_read, - OpRequestRef _op, - bool do_redundant_reads, - bool for_recovery, - std::unique_ptr on_complete) -{ - ceph_tid_t tid = get_parent()->get_tid(); - ceph_assert(!tid_to_read_map.count(tid)); - auto &op = tid_to_read_map.emplace( - tid, - ReadOp( - priority, - tid, - do_redundant_reads, - for_recovery, - std::move(on_complete), - _op, - std::move(want_to_read), - std::move(to_read))).first->second; - dout(10) << __func__ << ": starting " << op << dendl; - if (_op) { - op.trace = _op->pg_trace; - op.trace.event("start ec read"); - } - do_read_op(op); -} - -void ECCommon::ReadPipeline::do_read_op(ReadOp &op) -{ - int priority = op.priority; - ceph_tid_t tid = op.tid; - - dout(10) << __func__ << ": starting read " << op << dendl; - - map messages; - for (map::iterator i = op.to_read.begin(); - i != op.to_read.end(); - ++i) { - bool need_attrs = i->second.want_attrs; - - for (auto j = i->second.need.begin(); - j != i->second.need.end(); - ++j) { - if (need_attrs) { - messages[j->first].attrs_to_read.insert(i->first); - need_attrs = false; - } - messages[j->first].subchunks[i->first] = j->second; - op.obj_to_source[i->first].insert(j->first); - op.source_to_obj[j->first].insert(i->first); - } - for (list >::const_iterator j = - i->second.to_read.begin(); - j != i->second.to_read.end(); - ++j) { - pair chunk_off_len = - sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>())); - for (auto k = i->second.need.begin(); - k != i->second.need.end(); - ++k) { - messages[k->first].to_read[i->first].push_back( - boost::make_tuple( - chunk_off_len.first, - chunk_off_len.second, - j->get<2>())); - } - ceph_assert(!need_attrs); - } - } - - std::vector> m; - m.reserve(messages.size()); - for (map::iterator i = messages.begin(); - i != messages.end(); - ++i) { - op.in_progress.insert(i->first); - shard_to_read_map[i->first].insert(op.tid); - i->second.tid = tid; - MOSDECSubOpRead *msg = new MOSDECSubOpRead; - msg->set_priority(priority); - msg->pgid = spg_t( - get_info().pgid.pgid, - i->first.shard); - msg->map_epoch = get_osdmap_epoch(); - msg->min_epoch = get_parent()->get_interval_start_epoch(); - msg->op = i->second; - msg->op.from = get_parent()->whoami_shard(); - msg->op.tid = tid; - if (op.trace) { - // initialize a child span for this shard - msg->trace.init("ec sub read", nullptr, &op.trace); - msg->trace.keyval("shard", i->first.shard.id); - } - m.push_back(std::make_pair(i->first.osd, msg)); - } - if (!m.empty()) { - get_parent()->send_message_osd_cluster(m, get_osdmap_epoch()); - } - - dout(10) << __func__ << ": started " << op << dendl; -} ECUtil::HashInfoRef ECBackend::get_hash_info( const hobject_t &hoid, bool create, const map> *attrs) @@ -1982,320 +1460,6 @@ ECUtil::HashInfoRef ECBackend::get_hash_info( return ref; } -void ECCommon::RMWPipeline::start_rmw(OpRef op) -{ - ceph_assert(op); - dout(10) << __func__ << ": " << *op << dendl; - - ceph_assert(!tid_to_op_map.count(op->tid)); - waiting_state.push_back(*op); - tid_to_op_map[op->tid] = std::move(op); - check_ops(); -} - -bool ECCommon::RMWPipeline::try_state_to_reads() -{ - if (waiting_state.empty()) - return false; - - Op *op = &(waiting_state.front()); - if (op->requires_rmw() && pipeline_state.cache_invalid()) { - ceph_assert(get_parent()->get_pool().allows_ecoverwrites()); - dout(20) << __func__ << ": blocking " << *op - << " because it requires an rmw and the cache is invalid " - << pipeline_state - << dendl; - return false; - } - - if (!pipeline_state.caching_enabled()) { - op->using_cache = false; - } else if (op->invalidates_cache()) { - dout(20) << __func__ << ": invalidating cache after this op" - << dendl; - pipeline_state.invalidate(); - } - - waiting_state.pop_front(); - waiting_reads.push_back(*op); - - if (op->using_cache) { - cache.open_write_pin(op->pin); - - extent_set empty; - for (auto &&hpair: op->plan.will_write) { - auto to_read_plan_iter = op->plan.to_read.find(hpair.first); - const extent_set &to_read_plan = - to_read_plan_iter == op->plan.to_read.end() ? - empty : - to_read_plan_iter->second; - - extent_set remote_read = cache.reserve_extents_for_rmw( - hpair.first, - op->pin, - hpair.second, - to_read_plan); - - extent_set pending_read = to_read_plan; - pending_read.subtract(remote_read); - - if (!remote_read.empty()) { - op->remote_read[hpair.first] = std::move(remote_read); - } - if (!pending_read.empty()) { - op->pending_read[hpair.first] = std::move(pending_read); - } - } - } else { - op->remote_read = op->plan.to_read; - } - - dout(10) << __func__ << ": " << *op << dendl; - - if (!op->remote_read.empty()) { - ceph_assert(get_parent()->get_pool().allows_ecoverwrites()); - objects_read_async_no_cache( - op->remote_read, - [op, this](map > &&results) { - for (auto &&i: results) { - op->remote_read_result.emplace(i.first, i.second.second); - } - check_ops(); - }); - } - - return true; -} - -bool ECCommon::RMWPipeline::try_reads_to_commit() -{ - if (waiting_reads.empty()) - return false; - Op *op = &(waiting_reads.front()); - if (op->read_in_progress()) - return false; - waiting_reads.pop_front(); - waiting_commit.push_back(*op); - - dout(10) << __func__ << ": starting commit on " << *op << dendl; - dout(20) << __func__ << ": " << cache << dendl; - - get_parent()->apply_stats( - op->hoid, - op->delta_stats); - - if (op->using_cache) { - for (auto &&hpair: op->pending_read) { - op->remote_read_result[hpair.first].insert( - cache.get_remaining_extents_for_rmw( - hpair.first, - op->pin, - hpair.second)); - } - op->pending_read.clear(); - } else { - ceph_assert(op->pending_read.empty()); - } - - map trans; - for (set::const_iterator i = - get_parent()->get_acting_recovery_backfill_shards().begin(); - i != get_parent()->get_acting_recovery_backfill_shards().end(); - ++i) { - trans[i->shard]; - } - - op->trace.event("start ec write"); - - map written; - op->generate_transactions( - ec_impl, - get_parent()->get_info().pgid.pgid, - sinfo, - &written, - &trans, - get_parent()->get_dpp(), - get_osdmap()->require_osd_release); - - dout(20) << __func__ << ": " << cache << dendl; - dout(20) << __func__ << ": written: " << written << dendl; - dout(20) << __func__ << ": op: " << *op << dendl; - - if (!get_parent()->get_pool().allows_ecoverwrites()) { - for (auto &&i: op->log_entries) { - if (i.requires_kraken()) { - derr << __func__ << ": log entry " << i << " requires kraken" - << " but overwrites are not enabled!" << dendl; - ceph_abort(); - } - } - } - - map written_set; - for (auto &&i: written) { - written_set[i.first] = i.second.get_interval_set(); - } - dout(20) << __func__ << ": written_set: " << written_set << dendl; - ceph_assert(written_set == op->plan.will_write); - - if (op->using_cache) { - for (auto &&hpair: written) { - dout(20) << __func__ << ": " << hpair << dendl; - cache.present_rmw_update(hpair.first, op->pin, hpair.second); - } - } - op->remote_read.clear(); - op->remote_read_result.clear(); - - ObjectStore::Transaction empty; - bool should_write_local = false; - ECSubWrite local_write_op; - std::vector> messages; - messages.reserve(get_parent()->get_acting_recovery_backfill_shards().size()); - set backfill_shards = get_parent()->get_backfill_shards(); - for (set::const_iterator i = - get_parent()->get_acting_recovery_backfill_shards().begin(); - i != get_parent()->get_acting_recovery_backfill_shards().end(); - ++i) { - op->pending_apply.insert(*i); - op->pending_commit.insert(*i); - map::iterator iter = - trans.find(i->shard); - ceph_assert(iter != trans.end()); - bool should_send = get_parent()->should_send_op(*i, op->hoid); - const pg_stat_t &stats = - (should_send || !backfill_shards.count(*i)) ? - get_info().stats : - get_parent()->get_shard_info().find(*i)->second.stats; - - ECSubWrite sop( - get_parent()->whoami_shard(), - op->tid, - op->reqid, - op->hoid, - stats, - should_send ? iter->second : empty, - op->version, - op->trim_to, - op->roll_forward_to, - op->log_entries, - op->updated_hit_set_history, - op->temp_added, - op->temp_cleared, - !should_send); - - ZTracer::Trace trace; - if (op->trace) { - // initialize a child span for this shard - trace.init("ec sub write", nullptr, &op->trace); - trace.keyval("shard", i->shard.id); - } - - if (*i == get_parent()->whoami_shard()) { - should_write_local = true; - local_write_op.claim(sop); - } else { - MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop); - r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard); - r->map_epoch = get_osdmap_epoch(); - r->min_epoch = get_parent()->get_interval_start_epoch(); - r->trace = trace; - messages.push_back(std::make_pair(i->osd, r)); - } - } - - if (!messages.empty()) { - get_parent()->send_message_osd_cluster(messages, get_osdmap_epoch()); - } - - if (should_write_local) { - handle_sub_write( - get_parent()->whoami_shard(), - op->client_op, - local_write_op, - op->trace); - } - - for (auto i = op->on_write.begin(); - i != op->on_write.end(); - op->on_write.erase(i++)) { - (*i)(); - } - - return true; -} - -struct ECDummyOp : ECCommon::RMWPipeline::Op { - void generate_transactions( - ceph::ErasureCodeInterfaceRef &ecimpl, - pg_t pgid, - const ECUtil::stripe_info_t &sinfo, - std::map *written, - std::map *transactions, - DoutPrefixProvider *dpp, - const ceph_release_t require_osd_release) final - { - // NOP, as -- in constrast to ECClassicalOp -- there is no - // transaction involved - } -}; - -bool ECCommon::RMWPipeline::try_finish_rmw() -{ - if (waiting_commit.empty()) - return false; - Op *op = &(waiting_commit.front()); - if (op->write_in_progress()) - return false; - waiting_commit.pop_front(); - - dout(10) << __func__ << ": " << *op << dendl; - dout(20) << __func__ << ": " << cache << dendl; - - if (op->roll_forward_to > completed_to) - completed_to = op->roll_forward_to; - if (op->version > committed_to) - committed_to = op->version; - - if (get_osdmap()->require_osd_release >= ceph_release_t::kraken) { - if (op->version > get_parent()->get_log().get_can_rollback_to() && - waiting_reads.empty() && - waiting_commit.empty()) { - // submit a dummy, transaction-empty op to kick the rollforward - auto tid = get_parent()->get_tid(); - auto nop = std::make_unique(); - nop->hoid = op->hoid; - nop->trim_to = op->trim_to; - nop->roll_forward_to = op->version; - nop->tid = tid; - nop->reqid = op->reqid; - waiting_reads.push_back(*nop); - tid_to_op_map[tid] = std::move(nop); - } - } - - if (op->using_cache) { - cache.release_write_pin(op->pin); - } - tid_to_op_map.erase(op->tid); - - if (waiting_reads.empty() && - waiting_commit.empty()) { - pipeline_state.clear(); - dout(20) << __func__ << ": clearing pipeline_state " - << pipeline_state - << dendl; - } - return true; -} - -void ECCommon::RMWPipeline::check_ops() -{ - while (try_state_to_reads() || - try_reads_to_commit() || - try_finish_rmw()); -} - int ECBackend::objects_read_sync( const hobject_t &hoid, uint64_t off, diff --git a/src/osd/ECCommon.cc b/src/osd/ECCommon.cc new file mode 100644 index 0000000000000..373cc4079d3ee --- /dev/null +++ b/src/osd/ECCommon.cc @@ -0,0 +1,907 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2013 Inktank Storage, Inc. + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#include +#include + +#include "ECCommon.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPushReply.h" +#include "messages/MOSDECSubOpWrite.h" +#include "messages/MOSDECSubOpWriteReply.h" +#include "messages/MOSDECSubOpRead.h" +#include "messages/MOSDECSubOpReadReply.h" +#include "ECMsgTypes.h" +#include "PGLog.h" + +#include "osd_tracer.h" + +#define dout_context cct +#define dout_subsys ceph_subsys_osd +#define DOUT_PREFIX_ARGS this +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) + +using std::dec; +using std::hex; +using std::less; +using std::list; +using std::make_pair; +using std::map; +using std::pair; +using std::ostream; +using std::set; +using std::string; +using std::unique_ptr; +using std::vector; + +using ceph::bufferhash; +using ceph::bufferlist; +using ceph::bufferptr; +using ceph::ErasureCodeInterfaceRef; +using ceph::Formatter; + +static ostream& _prefix(std::ostream *_dout, ECCommon::RMWPipeline *rmw_pipeline) { + return rmw_pipeline->get_parent()->gen_dbg_prefix(*_dout); +} +static ostream& _prefix(std::ostream *_dout, ECCommon::ReadPipeline *read_pipeline) { + return read_pipeline->get_parent()->gen_dbg_prefix(*_dout); +} + +ostream &operator<<(ostream &lhs, const ECCommon::RMWPipeline::pipeline_state_t &rhs) { + switch (rhs.pipeline_state) { + case ECCommon::RMWPipeline::pipeline_state_t::CACHE_VALID: + return lhs << "CACHE_VALID"; + case ECCommon::RMWPipeline::pipeline_state_t::CACHE_INVALID: + return lhs << "CACHE_INVALID"; + default: + ceph_abort_msg("invalid pipeline state"); + } + return lhs; // unreachable +} + +ostream &operator<<(ostream &lhs, const ECCommon::read_request_t &rhs) +{ + return lhs << "read_request_t(to_read=[" << rhs.to_read << "]" + << ", need=" << rhs.need + << ", want_attrs=" << rhs.want_attrs + << ")"; +} + +ostream &operator<<(ostream &lhs, const ECCommon::read_result_t &rhs) +{ + lhs << "read_result_t(r=" << rhs.r + << ", errors=" << rhs.errors; + if (rhs.attrs) { + lhs << ", attrs=" << *(rhs.attrs); + } else { + lhs << ", noattrs"; + } + return lhs << ", returned=" << rhs.returned << ")"; +} + +ostream &operator<<(ostream &lhs, const ECCommon::ReadOp &rhs) +{ + lhs << "ReadOp(tid=" << rhs.tid; +#ifndef WITH_SEASTAR + if (rhs.op && rhs.op->get_req()) { + lhs << ", op="; + rhs.op->get_req()->print(lhs); + } +#endif + return lhs << ", to_read=" << rhs.to_read + << ", complete=" << rhs.complete + << ", priority=" << rhs.priority + << ", obj_to_source=" << rhs.obj_to_source + << ", source_to_obj=" << rhs.source_to_obj + << ", in_progress=" << rhs.in_progress << ")"; +} + +void ECCommon::ReadOp::dump(Formatter *f) const +{ + f->dump_unsigned("tid", tid); +#ifndef WITH_SEASTAR + if (op && op->get_req()) { + f->dump_stream("op") << *(op->get_req()); + } +#endif + f->dump_stream("to_read") << to_read; + f->dump_stream("complete") << complete; + f->dump_int("priority", priority); + f->dump_stream("obj_to_source") << obj_to_source; + f->dump_stream("source_to_obj") << source_to_obj; + f->dump_stream("in_progress") << in_progress; +} + +ostream &operator<<(ostream &lhs, const ECCommon::RMWPipeline::Op &rhs) +{ + lhs << "Op(" << rhs.hoid + << " v=" << rhs.version + << " tt=" << rhs.trim_to + << " tid=" << rhs.tid + << " reqid=" << rhs.reqid; +#ifndef WITH_SEASTAR + if (rhs.client_op && rhs.client_op->get_req()) { + lhs << " client_op="; + rhs.client_op->get_req()->print(lhs); + } +#endif + lhs << " roll_forward_to=" << rhs.roll_forward_to + << " temp_added=" << rhs.temp_added + << " temp_cleared=" << rhs.temp_cleared + << " pending_read=" << rhs.pending_read + << " remote_read=" << rhs.remote_read + << " remote_read_result=" << rhs.remote_read_result + << " pending_apply=" << rhs.pending_apply + << " pending_commit=" << rhs.pending_commit + << " plan.to_read=" << rhs.plan.to_read + << " plan.will_write=" << rhs.plan.will_write + << ")"; + return lhs; +} + +void ECCommon::ReadPipeline::complete_read_op(ReadOp &rop) +{ + map::iterator reqiter = + rop.to_read.begin(); + map::iterator resiter = + rop.complete.begin(); + ceph_assert(rop.to_read.size() == rop.complete.size()); + for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) { + rop.on_complete->finish_single_request( + reqiter->first, + resiter->second, + reqiter->second.to_read); + } + ceph_assert(rop.on_complete); + std::move(*rop.on_complete).finish(rop.priority); + rop.on_complete = nullptr; + // if the read op is over. clean all the data of this tid. + for (set::iterator iter = rop.in_progress.begin(); + iter != rop.in_progress.end(); + iter++) { + shard_to_read_map[*iter].erase(rop.tid); + } + rop.in_progress.clear(); + tid_to_read_map.erase(rop.tid); +} + +struct FinishReadOp : public GenContext { + ECCommon::ReadPipeline& read_pipeline; + ceph_tid_t tid; + FinishReadOp(ECCommon::ReadPipeline& read_pipeline, ceph_tid_t tid) + : read_pipeline(read_pipeline), tid(tid) {} + void finish(ThreadPool::TPHandle&) override { + auto ropiter = read_pipeline.tid_to_read_map.find(tid); + ceph_assert(ropiter != read_pipeline.tid_to_read_map.end()); + read_pipeline.complete_read_op(ropiter->second); + } +}; + +template +void ECCommon::ReadPipeline::filter_read_op( + const OSDMapRef& osdmap, + ReadOp &op, + F&& on_erase) +{ + set to_cancel; + for (map >::iterator i = op.source_to_obj.begin(); + i != op.source_to_obj.end(); + ++i) { + if (osdmap->is_down(i->first.osd)) { + to_cancel.insert(i->second.begin(), i->second.end()); + op.in_progress.erase(i->first); + continue; + } + } + + if (to_cancel.empty()) + return; + + for (map >::iterator i = op.source_to_obj.begin(); + i != op.source_to_obj.end(); + ) { + for (set::iterator j = i->second.begin(); + j != i->second.end(); + ) { + if (to_cancel.count(*j)) + i->second.erase(j++); + else + ++j; + } + if (i->second.empty()) { + op.source_to_obj.erase(i++); + } else { + ceph_assert(!osdmap->is_down(i->first.osd)); + ++i; + } + } + + for (set::iterator i = to_cancel.begin(); + i != to_cancel.end(); + ++i) { + get_parent()->cancel_pull(*i); + + ceph_assert(op.to_read.count(*i)); + read_request_t &req = op.to_read.find(*i)->second; + dout(10) << __func__ << ": canceling " << req + << " for obj " << *i << dendl; + op.to_read.erase(*i); + op.complete.erase(*i); + on_erase(*i); + } + + if (op.in_progress.empty()) { + /* This case is odd. filter_read_op gets called while processing + * an OSDMap. Normal, non-recovery reads only happen from acting + * set osds. For this op to have had a read source go down and + * there not be an interval change, it must be part of a pull during + * log-based recovery. + * + * This callback delays calling complete_read_op until later to avoid + * dealing with recovery while handling an OSDMap. We assign a + * cost here of 1 because: + * 1) This should be very rare, and the operation itself was already + * throttled. + * 2) It shouldn't result in IO, rather it should result in restarting + * the pull on the affected objects and pushes from in-memory buffers + * on any now complete unaffected objects. + */ +#ifndef WITH_SEASTAR + get_parent()->schedule_recovery_work( + get_parent()->bless_unlocked_gencontext( + new FinishReadOp(*this, op.tid)), + 1); +#else + // TODO + ceph_abort_msg("not yet implemented"); +#endif + } +} + +template +void ECCommon::ReadPipeline::check_recovery_sources( + const OSDMapRef& osdmap, + F&& on_erase) +{ + set tids_to_filter; + for (map >::iterator + i = shard_to_read_map.begin(); + i != shard_to_read_map.end(); + ) { + if (osdmap->is_down(i->first.osd)) { + tids_to_filter.insert(i->second.begin(), i->second.end()); + shard_to_read_map.erase(i++); + } else { + ++i; + } + } + for (set::iterator i = tids_to_filter.begin(); + i != tids_to_filter.end(); + ++i) { + map::iterator j = tid_to_read_map.find(*i); + ceph_assert(j != tid_to_read_map.end()); + filter_read_op(osdmap, j->second, on_erase); + } +} + +void ECCommon::ReadPipeline::on_change() +{ + for (map::iterator i = tid_to_read_map.begin(); + i != tid_to_read_map.end(); + ++i) { + dout(10) << __func__ << ": cancelling " << i->second << dendl; + } + tid_to_read_map.clear(); + shard_to_read_map.clear(); + in_progress_client_reads.clear(); +} + +void ECCommon::RMWPipeline::on_change() +{ + dout(10) << __func__ << dendl; + + completed_to = eversion_t(); + committed_to = eversion_t(); + pipeline_state.clear(); + waiting_reads.clear(); + waiting_state.clear(); + waiting_commit.clear(); + for (auto &&op: tid_to_op_map) { + cache.release_write_pin(op.second->pin); + } + tid_to_op_map.clear(); +} + +void ECCommon::RMWPipeline::call_write_ordered(std::function &&cb) { + if (!waiting_state.empty()) { + waiting_state.back().on_write.emplace_back(std::move(cb)); + } else if (!waiting_reads.empty()) { + waiting_reads.back().on_write.emplace_back(std::move(cb)); + } else { + // Nothing earlier in the pipeline, just call it + cb(); + } +} + +void ECCommon::ReadPipeline::get_all_avail_shards( + const hobject_t &hoid, + const set &error_shards, + set &have, + map &shards, + bool for_recovery) +{ + for (set::const_iterator i = + get_parent()->get_acting_shards().begin(); + i != get_parent()->get_acting_shards().end(); + ++i) { + dout(10) << __func__ << ": checking acting " << *i << dendl; + const pg_missing_t &missing = get_parent()->get_shard_missing(*i); + if (error_shards.find(*i) != error_shards.end()) + continue; + if (!missing.is_missing(hoid)) { + ceph_assert(!have.count(i->shard)); + have.insert(i->shard); + ceph_assert(!shards.count(i->shard)); + shards.insert(make_pair(i->shard, *i)); + } + } + + if (for_recovery) { + for (set::const_iterator i = + get_parent()->get_backfill_shards().begin(); + i != get_parent()->get_backfill_shards().end(); + ++i) { + if (error_shards.find(*i) != error_shards.end()) + continue; + if (have.count(i->shard)) { + ceph_assert(shards.count(i->shard)); + continue; + } + dout(10) << __func__ << ": checking backfill " << *i << dendl; + ceph_assert(!shards.count(i->shard)); + const pg_info_t &info = get_parent()->get_shard_info(*i); + const pg_missing_t &missing = get_parent()->get_shard_missing(*i); + if (hoid < info.last_backfill && + !missing.is_missing(hoid)) { + have.insert(i->shard); + shards.insert(make_pair(i->shard, *i)); + } + } + + map>::const_iterator miter = + get_parent()->get_missing_loc_shards().find(hoid); + if (miter != get_parent()->get_missing_loc_shards().end()) { + for (set::iterator i = miter->second.begin(); + i != miter->second.end(); + ++i) { + dout(10) << __func__ << ": checking missing_loc " << *i << dendl; + auto m = get_parent()->maybe_get_shard_missing(*i); + if (m) { + ceph_assert(!(*m).is_missing(hoid)); + } + if (error_shards.find(*i) != error_shards.end()) + continue; + have.insert(i->shard); + shards.insert(make_pair(i->shard, *i)); + } + } + } +} + +int ECCommon::ReadPipeline::get_min_avail_to_read_shards( + const hobject_t &hoid, + const set &want, + bool for_recovery, + bool do_redundant_reads, + map>> *to_read) +{ + // Make sure we don't do redundant reads for recovery + ceph_assert(!for_recovery || !do_redundant_reads); + + set have; + map shards; + set error_shards; + + get_all_avail_shards(hoid, error_shards, have, shards, for_recovery); + + map>> need; + int r = ec_impl->minimum_to_decode(want, have, &need); + if (r < 0) + return r; + + if (do_redundant_reads) { + vector> subchunks_list; + subchunks_list.push_back(make_pair(0, ec_impl->get_sub_chunk_count())); + for (auto &&i: have) { + need[i] = subchunks_list; + } + } + + if (!to_read) + return 0; + + for (auto &&i:need) { + ceph_assert(shards.count(shard_id_t(i.first))); + to_read->insert(make_pair(shards[shard_id_t(i.first)], i.second)); + } + return 0; +} + +int ECCommon::ReadPipeline::get_remaining_shards( + const hobject_t &hoid, + const set &avail, + const set &want, + const read_result_t &result, + map>> *to_read, + bool for_recovery) +{ + ceph_assert(to_read); + + set have; + map shards; + set error_shards; + for (auto &p : result.errors) { + error_shards.insert(p.first); + } + + get_all_avail_shards(hoid, error_shards, have, shards, for_recovery); + + map>> need; + int r = ec_impl->minimum_to_decode(want, have, &need); + if (r < 0) { + dout(0) << __func__ << " not enough shards left to try for " << hoid + << " read result was " << result << dendl; + return -EIO; + } + + set shards_left; + for (auto p : need) { + if (avail.find(p.first) == avail.end()) { + shards_left.insert(p.first); + } + } + + vector> subchunks; + subchunks.push_back(make_pair(0, ec_impl->get_sub_chunk_count())); + for (set::iterator i = shards_left.begin(); + i != shards_left.end(); + ++i) { + ceph_assert(shards.count(shard_id_t(*i))); + ceph_assert(avail.find(*i) == avail.end()); + to_read->insert(make_pair(shards[shard_id_t(*i)], subchunks)); + } + return 0; +} + +void ECCommon::ReadPipeline::start_read_op( + int priority, + map> &want_to_read, + map &to_read, + OpRequestRef _op, + bool do_redundant_reads, + bool for_recovery, + std::unique_ptr on_complete) +{ + ceph_tid_t tid = get_parent()->get_tid(); + ceph_assert(!tid_to_read_map.count(tid)); + auto &op = tid_to_read_map.emplace( + tid, + ReadOp( + priority, + tid, + do_redundant_reads, + for_recovery, + std::move(on_complete), + _op, + std::move(want_to_read), + std::move(to_read))).first->second; + dout(10) << __func__ << ": starting " << op << dendl; + if (_op) { +#ifndef WITH_SEASTAR + op.trace = _op->pg_trace; +#endif + op.trace.event("start ec read"); + } + do_read_op(op); +} + +void ECCommon::ReadPipeline::do_read_op(ReadOp &op) +{ + int priority = op.priority; + ceph_tid_t tid = op.tid; + + dout(10) << __func__ << ": starting read " << op << dendl; + + map messages; + for (map::iterator i = op.to_read.begin(); + i != op.to_read.end(); + ++i) { + bool need_attrs = i->second.want_attrs; + + for (auto j = i->second.need.begin(); + j != i->second.need.end(); + ++j) { + if (need_attrs) { + messages[j->first].attrs_to_read.insert(i->first); + need_attrs = false; + } + messages[j->first].subchunks[i->first] = j->second; + op.obj_to_source[i->first].insert(j->first); + op.source_to_obj[j->first].insert(i->first); + } + for (list >::const_iterator j = + i->second.to_read.begin(); + j != i->second.to_read.end(); + ++j) { + pair chunk_off_len = + sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>())); + for (auto k = i->second.need.begin(); + k != i->second.need.end(); + ++k) { + messages[k->first].to_read[i->first].push_back( + boost::make_tuple( + chunk_off_len.first, + chunk_off_len.second, + j->get<2>())); + } + ceph_assert(!need_attrs); + } + } + + std::vector> m; + m.reserve(messages.size()); + for (map::iterator i = messages.begin(); + i != messages.end(); + ++i) { + op.in_progress.insert(i->first); + shard_to_read_map[i->first].insert(op.tid); + i->second.tid = tid; + MOSDECSubOpRead *msg = new MOSDECSubOpRead; + msg->set_priority(priority); + msg->pgid = spg_t( + get_info().pgid.pgid, + i->first.shard); + msg->map_epoch = get_osdmap_epoch(); + msg->min_epoch = get_parent()->get_interval_start_epoch(); + msg->op = i->second; + msg->op.from = get_parent()->whoami_shard(); + msg->op.tid = tid; + if (op.trace) { + // initialize a child span for this shard + msg->trace.init("ec sub read", nullptr, &op.trace); + msg->trace.keyval("shard", i->first.shard.id); + } + m.push_back(std::make_pair(i->first.osd, msg)); + } + if (!m.empty()) { + get_parent()->send_message_osd_cluster(m, get_osdmap_epoch()); + } + + dout(10) << __func__ << ": started " << op << dendl; +} + +void ECCommon::RMWPipeline::start_rmw(OpRef op) +{ + ceph_assert(op); + dout(10) << __func__ << ": " << *op << dendl; + + ceph_assert(!tid_to_op_map.count(op->tid)); + waiting_state.push_back(*op); + tid_to_op_map[op->tid] = std::move(op); + check_ops(); +} + +bool ECCommon::RMWPipeline::try_state_to_reads() +{ + if (waiting_state.empty()) + return false; + + Op *op = &(waiting_state.front()); + if (op->requires_rmw() && pipeline_state.cache_invalid()) { + ceph_assert(get_parent()->get_pool().allows_ecoverwrites()); + dout(20) << __func__ << ": blocking " << *op + << " because it requires an rmw and the cache is invalid " + << pipeline_state + << dendl; + return false; + } + + if (!pipeline_state.caching_enabled()) { + op->using_cache = false; + } else if (op->invalidates_cache()) { + dout(20) << __func__ << ": invalidating cache after this op" + << dendl; + pipeline_state.invalidate(); + } + + waiting_state.pop_front(); + waiting_reads.push_back(*op); + + if (op->using_cache) { + cache.open_write_pin(op->pin); + + extent_set empty; + for (auto &&hpair: op->plan.will_write) { + auto to_read_plan_iter = op->plan.to_read.find(hpair.first); + const extent_set &to_read_plan = + to_read_plan_iter == op->plan.to_read.end() ? + empty : + to_read_plan_iter->second; + + extent_set remote_read = cache.reserve_extents_for_rmw( + hpair.first, + op->pin, + hpair.second, + to_read_plan); + + extent_set pending_read = to_read_plan; + pending_read.subtract(remote_read); + + if (!remote_read.empty()) { + op->remote_read[hpair.first] = std::move(remote_read); + } + if (!pending_read.empty()) { + op->pending_read[hpair.first] = std::move(pending_read); + } + } + } else { + op->remote_read = op->plan.to_read; + } + + dout(10) << __func__ << ": " << *op << dendl; + + if (!op->remote_read.empty()) { + ceph_assert(get_parent()->get_pool().allows_ecoverwrites()); + objects_read_async_no_cache( + op->remote_read, + [op, this](map > &&results) { + for (auto &&i: results) { + op->remote_read_result.emplace(i.first, i.second.second); + } + check_ops(); + }); + } + + return true; +} + +bool ECCommon::RMWPipeline::try_reads_to_commit() +{ + if (waiting_reads.empty()) + return false; + Op *op = &(waiting_reads.front()); + if (op->read_in_progress()) + return false; + waiting_reads.pop_front(); + waiting_commit.push_back(*op); + + dout(10) << __func__ << ": starting commit on " << *op << dendl; + dout(20) << __func__ << ": " << cache << dendl; + + get_parent()->apply_stats( + op->hoid, + op->delta_stats); + + if (op->using_cache) { + for (auto &&hpair: op->pending_read) { + op->remote_read_result[hpair.first].insert( + cache.get_remaining_extents_for_rmw( + hpair.first, + op->pin, + hpair.second)); + } + op->pending_read.clear(); + } else { + ceph_assert(op->pending_read.empty()); + } + + map trans; + for (set::const_iterator i = + get_parent()->get_acting_recovery_backfill_shards().begin(); + i != get_parent()->get_acting_recovery_backfill_shards().end(); + ++i) { + trans[i->shard]; + } + + op->trace.event("start ec write"); + + map written; + op->generate_transactions( + ec_impl, + get_parent()->get_info().pgid.pgid, + sinfo, + &written, + &trans, + get_parent()->get_dpp(), + get_osdmap()->require_osd_release); + + dout(20) << __func__ << ": " << cache << dendl; + dout(20) << __func__ << ": written: " << written << dendl; + dout(20) << __func__ << ": op: " << *op << dendl; + + if (!get_parent()->get_pool().allows_ecoverwrites()) { + for (auto &&i: op->log_entries) { + if (i.requires_kraken()) { + derr << __func__ << ": log entry " << i << " requires kraken" + << " but overwrites are not enabled!" << dendl; + ceph_abort(); + } + } + } + + map written_set; + for (auto &&i: written) { + written_set[i.first] = i.second.get_interval_set(); + } + dout(20) << __func__ << ": written_set: " << written_set << dendl; + ceph_assert(written_set == op->plan.will_write); + + if (op->using_cache) { + for (auto &&hpair: written) { + dout(20) << __func__ << ": " << hpair << dendl; + cache.present_rmw_update(hpair.first, op->pin, hpair.second); + } + } + op->remote_read.clear(); + op->remote_read_result.clear(); + + ObjectStore::Transaction empty; + bool should_write_local = false; + ECSubWrite local_write_op; + std::vector> messages; + messages.reserve(get_parent()->get_acting_recovery_backfill_shards().size()); + set backfill_shards = get_parent()->get_backfill_shards(); + for (set::const_iterator i = + get_parent()->get_acting_recovery_backfill_shards().begin(); + i != get_parent()->get_acting_recovery_backfill_shards().end(); + ++i) { + op->pending_apply.insert(*i); + op->pending_commit.insert(*i); + map::iterator iter = + trans.find(i->shard); + ceph_assert(iter != trans.end()); + bool should_send = get_parent()->should_send_op(*i, op->hoid); + const pg_stat_t &stats = + (should_send || !backfill_shards.count(*i)) ? + get_info().stats : + get_parent()->get_shard_info().find(*i)->second.stats; + + ECSubWrite sop( + get_parent()->whoami_shard(), + op->tid, + op->reqid, + op->hoid, + stats, + should_send ? iter->second : empty, + op->version, + op->trim_to, + op->roll_forward_to, + op->log_entries, + op->updated_hit_set_history, + op->temp_added, + op->temp_cleared, + !should_send); + + ZTracer::Trace trace; + if (op->trace) { + // initialize a child span for this shard + trace.init("ec sub write", nullptr, &op->trace); + trace.keyval("shard", i->shard.id); + } + + if (*i == get_parent()->whoami_shard()) { + should_write_local = true; + local_write_op.claim(sop); + } else { + MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop); + r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard); + r->map_epoch = get_osdmap_epoch(); + r->min_epoch = get_parent()->get_interval_start_epoch(); + r->trace = trace; + messages.push_back(std::make_pair(i->osd, r)); + } + } + + if (!messages.empty()) { + get_parent()->send_message_osd_cluster(messages, get_osdmap_epoch()); + } + + if (should_write_local) { + handle_sub_write( + get_parent()->whoami_shard(), + op->client_op, + local_write_op, + op->trace); + } + + for (auto i = op->on_write.begin(); + i != op->on_write.end(); + op->on_write.erase(i++)) { + (*i)(); + } + + return true; +} + +struct ECDummyOp : ECCommon::RMWPipeline::Op { + void generate_transactions( + ceph::ErasureCodeInterfaceRef &ecimpl, + pg_t pgid, + const ECUtil::stripe_info_t &sinfo, + std::map *written, + std::map *transactions, + DoutPrefixProvider *dpp, + const ceph_release_t require_osd_release) final + { + // NOP, as -- in constrast to ECClassicalOp -- there is no + // transaction involved + } +}; + +bool ECCommon::RMWPipeline::try_finish_rmw() +{ + if (waiting_commit.empty()) + return false; + Op *op = &(waiting_commit.front()); + if (op->write_in_progress()) + return false; + waiting_commit.pop_front(); + + dout(10) << __func__ << ": " << *op << dendl; + dout(20) << __func__ << ": " << cache << dendl; + + if (op->roll_forward_to > completed_to) + completed_to = op->roll_forward_to; + if (op->version > committed_to) + committed_to = op->version; + + if (get_osdmap()->require_osd_release >= ceph_release_t::kraken) { + if (op->version > get_parent()->get_log().get_can_rollback_to() && + waiting_reads.empty() && + waiting_commit.empty()) { + // submit a dummy, transaction-empty op to kick the rollforward + auto tid = get_parent()->get_tid(); + auto nop = std::make_unique(); + nop->hoid = op->hoid; + nop->trim_to = op->trim_to; + nop->roll_forward_to = op->version; + nop->tid = tid; + nop->reqid = op->reqid; + waiting_reads.push_back(*nop); + tid_to_op_map[tid] = std::move(nop); + } + } + + if (op->using_cache) { + cache.release_write_pin(op->pin); + } + tid_to_op_map.erase(op->tid); + + if (waiting_reads.empty() && + waiting_commit.empty()) { + pipeline_state.clear(); + dout(20) << __func__ << ": clearing pipeline_state " + << pipeline_state + << dendl; + } + return true; +} + +void ECCommon::RMWPipeline::check_ops() +{ + while (try_state_to_reads() || + try_reads_to_commit() || + try_finish_rmw()); +} diff --git a/src/osd/ECCommon.h b/src/osd/ECCommon.h index a7585ea13dddb..46f8bb14772b5 100644 --- a/src/osd/ECCommon.h +++ b/src/osd/ECCommon.h @@ -130,7 +130,7 @@ struct ECListener { const eversion_t &roll_forward_to, const eversion_t &min_last_complete_ondisk, bool transaction_applied, - ObjectStore::Transaction &t, + ceph::os::Transaction &t, bool async = false) = 0; virtual void op_applied( const eversion_t &applied_version) = 0; @@ -583,3 +583,14 @@ struct ECCommon { }; }; +std::ostream &operator<<(std::ostream &lhs, + const ECCommon::RMWPipeline::pipeline_state_t &rhs); +std::ostream &operator<<(std::ostream &lhs, + const ECCommon::read_request_t &rhs); +std::ostream &operator<<(std::ostream &lhs, + const ECCommon::read_result_t &rhs); +std::ostream &operator<<(std::ostream &lhs, + const ECCommon::ReadOp &rhs); +std::ostream &operator<<(std::ostream &lhs, + const ECCommon::RMWPipeline::Op &rhs); + -- 2.39.5