From: Samuel Just Date: Tue, 15 Nov 2016 23:48:21 +0000 (-0800) Subject: ECBackend: integrate cache and rmw pipeline X-Git-Tag: v11.1.0~245^2~13 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=1e95f2ce642485e25b74f7d873fda7e4b75a425f;p=ceph.git ECBackend: integrate cache and rmw pipeline Implements the rmw pipeline and integrates the cache. HashInfo now maintains a projected size for use during the planning phase of the pipeline. (Doesn't build without subsequent patches, not worth stubbing out the interfaces) Signed-off-by: Samuel Just --- diff --git a/ceph-erasure-code-corpus b/ceph-erasure-code-corpus index 0b00610443a..b5c863495c1 160000 --- a/ceph-erasure-code-corpus +++ b/ceph-erasure-code-corpus @@ -1 +1 @@ -Subproject commit 0b00610443a916fabc6668c03337f64d1f773ec9 +Subproject commit b5c863495c16975478aa5fc2ca33293c2e0c1a5f diff --git a/src/common/buffer.cc b/src/common/buffer.cc index 892c17f8fe7..bf72ed7d5ad 100644 --- a/src/common/buffer.cc +++ b/src/common/buffer.cc @@ -1870,6 +1870,14 @@ static simple_spinlock_t buffer_debug_lock = SIMPLE_SPINLOCK_INITIALIZER; append("\n", 1); } } + + void buffer::list::prepend_zero(unsigned len) + { + ptr bp(len); + bp.zero(false); + _len += len; + _buffers.emplace_front(std::move(bp)); + } void buffer::list::append_zero(unsigned len) { diff --git a/src/include/buffer.h b/src/include/buffer.h index 4c05e628dbe..1e30e3e2b6d 100644 --- a/src/include/buffer.h +++ b/src/include/buffer.h @@ -832,6 +832,7 @@ namespace buffer CEPH_BUFFER_API { void append(const list& bl); void append(std::istream& in); void append_zero(unsigned len); + void prepend_zero(unsigned len); /* * get a char diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index 389200dfc79..fe2260ae086 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -38,6 +38,18 @@ struct ECRecoveryHandle : public PGBackend::RecoveryHandle { list ops; }; +ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) { + switch (rhs.pipeline_state) { + case ECBackend::pipeline_state_t::CACHE_VALID: + return lhs << "CACHE_VALID"; + case ECBackend::pipeline_state_t::CACHE_INVALID: + return lhs << "CACHE_INVALID"; + default: + assert(0 == "invalid pipeline state"); + } + return lhs; // unreachable +} + static ostream &operator<<(ostream &lhs, const map &rhs) { lhs << "["; @@ -132,8 +144,17 @@ ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs) lhs << " client_op="; rhs.client_op->get_req()->print(lhs); } - lhs << " pending_commit=" << rhs.pending_commit + lhs << " roll_forward_to=" << rhs.roll_forward_to + << " temp_added=" << rhs.temp_added + << " pending_commit=" << rhs.pending_commit + << " temp_cleared=" << rhs.temp_cleared + << " pending_read=" << rhs.pending_read + << " remote_read=" << rhs.remote_read + << " remote_read_result=" << rhs.remote_read_result << " pending_apply=" << rhs.pending_apply + << " pending_commit=" << rhs.pending_commit + << " plan.to_read=" << rhs.plan.to_read + << " plan.will_write=" << rhs.plan.will_write << ")"; return lhs; } @@ -509,8 +530,18 @@ void ECBackend::continue_recovery_op( op.state = RecoveryOp::READING; assert(!op.recovery_progress.data_complete); set want(op.missing_on_shards.begin(), op.missing_on_shards.end()); + uint64_t from = op.recovery_progress.data_recovered_to; + uint64_t amount = get_recovery_chunk_size(); + + if (op.recovery_progress.first && op.obc) { + /* We've got the attrs and the hinfo, might as well use them */ + op.hinfo = get_hash_info(op.hoid); + assert(op.hinfo); + op.xattrs = op.obc->attr_cache; + ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]); + } + set to_read; - uint64_t recovery_max_chunk = get_recovery_chunk_size(); int r = get_min_avail_to_read_shards( op.hoid, want, true, false, &to_read); if (r != 0) { @@ -526,11 +557,12 @@ void ECBackend::continue_recovery_op( this, op.hoid, op.recovery_progress.data_recovered_to, - recovery_max_chunk, + amount, to_read, - op.recovery_progress.first); - op.extent_requested = make_pair(op.recovery_progress.data_recovered_to, - recovery_max_chunk); + op.recovery_progress.first && !op.obc); + op.extent_requested = make_pair( + from, + amount); dout(10) << __func__ << ": IDLE return " << op << dendl; return; } @@ -898,8 +930,8 @@ void ECBackend::handle_sub_write( new SubWriteApplied(this, msg, op.tid, op.at_version))); vector tls; tls.reserve(2); - tls.push_back(std::move(localt)); tls.push_back(std::move(op.t)); + tls.push_back(std::move(localt)); get_parent()->queue_transactions(tls, msg); } @@ -914,12 +946,15 @@ void ECBackend::handle_sub_read( i != op.to_read.end(); ++i) { int r = 0; - ECUtil::HashInfoRef hinfo = get_hash_info(i->first); - if (!hinfo) { - r = -EIO; - get_parent()->clog_error() << __func__ << ": No hinfo for " << i->first << "\n"; - dout(5) << __func__ << ": No hinfo for " << i->first << dendl; - goto error; + ECUtil::HashInfoRef hinfo; + if (!get_parent()->get_pool().is_hacky_ecoverwrites()) { + hinfo = get_hash_info(i->first); + if (!hinfo) { + r = -EIO; + get_parent()->clog_error() << __func__ << ": No hinfo for " << i->first << "\n"; + dout(5) << __func__ << ": No hinfo for " << i->first << dendl; + goto error; + } } for (list >::iterator j = i->second.begin(); j != i->second.end(); ++j) { @@ -948,22 +983,25 @@ void ECBackend::handle_sub_read( ); } - // This shows that we still need deep scrub because large enough files - // are read in sections, so the digest check here won't be done here. - // Do NOT check osd_read_eio_on_bad_digest here. We need to report - // the state of our chunk in case other chunks could substitute. - if ((bl.length() == hinfo->get_total_chunk_size()) && - (j->get<0>() == 0)) { - dout(20) << __func__ << ": Checking hash of " << i->first << dendl; - bufferhash h(-1); - h << bl; - if (h.digest() != hinfo->get_chunk_hash(shard)) { - get_parent()->clog_error() << __func__ << ": Bad hash for " << i->first << " digest 0x" - << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << "\n"; - dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x" - << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl; - r = -EIO; - goto error; + if (!get_parent()->get_pool().is_hacky_ecoverwrites()) { + // This shows that we still need deep scrub because large enough files + // are read in sections, so the digest check here won't be done here. + // Do NOT check osd_read_eio_on_bad_digest here. We need to report + // the state of our chunk in case other chunks could substitute. + assert(hinfo->has_chunk_hash()); + if ((bl.length() == hinfo->get_total_chunk_size()) && + (j->get<0>() == 0)) { + dout(20) << __func__ << ": Checking hash of " << i->first << dendl; + bufferhash h(-1); + h << bl; + if (h.digest() != hinfo->get_chunk_hash(shard)) { + get_parent()->clog_error() << __func__ << ": Bad hash for " << i->first << " digest 0x" + << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << "\n"; + dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x" + << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl; + r = -EIO; + goto error; + } } } } @@ -1012,7 +1050,18 @@ void ECBackend::handle_sub_write_reply( assert(i->second.pending_apply.count(from)); i->second.pending_apply.erase(from); } - check_op(&(i->second)); + + if (i->second.pending_apply.empty() && i->second.on_all_applied) { + dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl; + i->second.on_all_applied->complete(0); + i->second.on_all_applied = 0; + } + if (i->second.pending_commit.empty() && i->second.on_all_commit) { + dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl; + i->second.on_all_commit->complete(0); + i->second.on_all_commit = 0; + } + check_ops(); } void ECBackend::handle_sub_read_reply( @@ -1275,7 +1324,18 @@ void ECBackend::check_recovery_sources(const OSDMapRef& osdmap) void ECBackend::on_change() { dout(10) << __func__ << dendl; - writing.clear(); + + completed_to = eversion_t(); + committed_to = eversion_t(); + pipeline_state.clear(); + waiting_reads.clear(); + waiting_state.clear(); + waiting_commit.clear(); + for (auto &&op: tid_to_op_map) { + cache.release_write_pin(op.second.pin); + } + tid_to_op_map.clear(); + tid_to_op_map.clear(); for (map::iterator i = tid_to_read_map.begin(); i != tid_to_read_map.end(); @@ -1290,12 +1350,6 @@ void ECBackend::on_change() } } tid_to_read_map.clear(); - for (list::iterator i = in_progress_client_reads.begin(); - i != in_progress_client_reads.end(); - ++i) { - delete i->on_complete; - i->on_complete = NULL; - } in_progress_client_reads.clear(); shard_to_read_map.clear(); clear_recovery_state(); @@ -1334,8 +1388,9 @@ void ECBackend::dump_recovery_info(Formatter *f) const void ECBackend::submit_transaction( const hobject_t &hoid, + const object_stat_sum_t &delta_stats, const eversion_t &at_version, - PGTransactionUPtr &&_t, + PGTransactionUPtr &&t, const eversion_t &trim_to, const eversion_t &roll_forward_to, const vector &log_entries, @@ -1351,9 +1406,10 @@ void ECBackend::submit_transaction( assert(!tid_to_op_map.count(tid)); Op *op = &(tid_to_op_map[tid]); op->hoid = hoid; + op->delta_stats = delta_stats; op->version = at_version; op->trim_to = trim_to; - op->roll_forward_to = roll_forward_to; + op->roll_forward_to = MAX(roll_forward_to, committed_to); op->log_entries = log_entries; std::swap(op->updated_hit_set_history, hset_history); op->on_local_applied_sync = on_local_applied_sync; @@ -1363,33 +1419,22 @@ void ECBackend::submit_transaction( op->reqid = reqid; op->client_op = client_op; - op->t = std::move(_t); - - set need_hinfos; - ECTransaction::get_append_objects(*(op->t), &need_hinfos); - for (set::iterator i = need_hinfos.begin(); - i != need_hinfos.end(); - ++i) { - ECUtil::HashInfoRef ref = get_hash_info(*i, false); - if (!ref) { - derr << __func__ << ": get_hash_info(" << *i << ")" - << " returned a null pointer and there is no " - << " way to recover from such an error in this " - << " context" << dendl; - assert(0); - } - op->unstable_hash_infos.insert( - make_pair( - *i, - ref)); - } - dout(10) << __func__ << ": op " << *op << " starting" << dendl; - start_write(op); - writing.push_back(op); + start_rmw(op, std::move(t)); dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl; } +void ECBackend::call_write_ordered(std::function &&cb) { + if (!waiting_state.empty()) { + waiting_state.back().on_write.emplace_back(std::move(cb)); + } else if (!waiting_reads.empty()) { + waiting_reads.back().on_write.emplace_back(std::move(cb)); + } else { + // Nothing earlier in the pipeline, just call it + cb(); + } +} + int ECBackend::get_min_avail_to_read_shards( const hobject_t &hoid, const set &want, @@ -1532,19 +1577,15 @@ void ECBackend::start_read_op( op.for_recovery = for_recovery; dout(10) << __func__ << ": starting " << op << dendl; do_read_op( - op, - to_read); + op); } -void ECBackend::do_read_op( - ReadOp &op, - map &to_read) +void ECBackend::do_read_op(ReadOp &op) { int priority = op.priority; ceph_tid_t tid = op.tid; - op.to_read.swap(to_read); - dout(10) << __func__ << ": starting additional " << op << dendl; + dout(10) << __func__ << ": starting read " << op << dendl; map messages; for (mapsecond.want_attrs; + list > > &reslist = + op.complete[i->first].returned; for (set::const_iterator j = i->second.need.begin(); j != i->second.need.end(); ++j) { @@ -1566,14 +1610,21 @@ void ECBackend::do_read_op( i->second.to_read.begin(); j != i->second.to_read.end(); ++j) { + reslist.push_back( + boost::make_tuple( + j->get<0>(), + j->get<1>(), + map())); pair chunk_off_len = sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>())); for (set::const_iterator k = i->second.need.begin(); k != i->second.need.end(); ++k) { - messages[*k].to_read[i->first].push_back(boost::make_tuple(chunk_off_len.first, - chunk_off_len.second, - j->get<2>())); + messages[*k].to_read[i->first].push_back( + boost::make_tuple( + chunk_off_len.first, + chunk_off_len.second, + j->get<2>())); } assert(!need_attrs); } @@ -1599,7 +1650,7 @@ void ECBackend::do_read_op( msg, get_parent()->get_epoch()); } - dout(10) << __func__ << ": started additional " << op << dendl; + dout(10) << __func__ << ": started " << op << dendl; } ECUtil::HashInfoRef ECBackend::get_hash_info( @@ -1654,33 +1705,130 @@ ECUtil::HashInfoRef ECBackend::get_hash_info( return ref; } -void ECBackend::check_op(Op *op) +void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t) { - if (op->pending_apply.empty() && op->on_all_applied) { - dout(10) << __func__ << " Calling on_all_applied on " << *op << dendl; - op->on_all_applied->complete(0); - op->on_all_applied = 0; - } - if (op->pending_commit.empty() && op->on_all_commit) { - dout(10) << __func__ << " Calling on_all_commit on " << *op << dendl; - op->on_all_commit->complete(0); - op->on_all_commit = 0; - } - if (op->pending_apply.empty() && op->pending_commit.empty()) { - // done! - assert(writing.front() == op); - dout(10) << __func__ << " Completing " << *op << dendl; - writing.pop_front(); - tid_to_op_map.erase(op->tid); - } - for (map::iterator i = tid_to_op_map.begin(); - i != tid_to_op_map.end(); - ++i) { - dout(20) << __func__ << " tid " << i->first <<": " << i->second << dendl; + assert(op); + + op->plan = ECTransaction::get_write_plan( + sinfo, + std::move(t), + [&](const hobject_t &i) { + ECUtil::HashInfoRef ref = get_hash_info(i, false); + if (!ref) { + derr << __func__ << ": get_hash_info(" << i << ")" + << " returned a null pointer and there is no " + << " way to recover from such an error in this " + << " context" << dendl; + assert(0); + } + return ref; + }, + get_parent()->get_dpp()); + + dout(10) << __func__ << ": " << *op << dendl; + + waiting_state.push_back(*op); + check_ops(); +} + +bool ECBackend::try_state_to_reads() +{ + if (waiting_state.empty()) + return false; + + Op *op = &(waiting_state.front()); + if (op->requires_rmw() && pipeline_state.cache_invalid()) { + dout(20) << __func__ << ": blocking " << *op + << " because it requires an rmw and the cache is invalid" + << pipeline_state + << dendl; + return false; + } + + if (op->invalidates_cache()) { + dout(20) << __func__ << ": invalidating cache after this op" + << dendl; + pipeline_state.invalidate(); } + + waiting_state.pop_front(); + waiting_reads.push_back(*op); + + if (op->requires_rmw() || pipeline_state.caching_enabled()) { + cache.open_write_pin(op->pin); + + extent_set empty; + for (auto &&hpair: op->plan.will_write) { + auto to_read_plan_iter = op->plan.to_read.find(hpair.first); + const extent_set &to_read_plan = + to_read_plan_iter == op->plan.to_read.end() ? + empty : + to_read_plan_iter->second; + + extent_set remote_read = cache.reserve_extents_for_rmw( + hpair.first, + op->pin, + hpair.second, + to_read_plan); + + extent_set pending_read = to_read_plan; + pending_read.subtract(remote_read); + + if (!remote_read.empty()) { + op->remote_read[hpair.first] = std::move(remote_read); + } + if (!pending_read.empty()) { + op->pending_read[hpair.first] = std::move(pending_read); + } + } + } else { + op->remote_read = op->plan.to_read; + } + + dout(10) << __func__ << ": " << *op << dendl; + + if (!op->remote_read.empty()) { + objects_read_async_no_cache( + op->remote_read, + [this, op](hobject_t::bitwisemap &&results) { + op->remote_read_result = std::move(results); + check_ops(); + }); + } + + return true; } -void ECBackend::start_write(Op *op) { +bool ECBackend::try_reads_to_commit() +{ + if (waiting_reads.empty()) + return false; + Op *op = &(waiting_reads.front()); + if (op->read_in_progress()) + return false; + waiting_reads.pop_front(); + waiting_commit.push_back(*op); + + dout(10) << __func__ << ": starting commit on " << *op << dendl; + dout(20) << __func__ << ": " << cache << dendl; + + get_parent()->apply_stats( + op->hoid, + op->delta_stats); + + if (pipeline_state.caching_enabled() || op->requires_rmw()) { + for (auto &&hpair: op->pending_read) { + op->remote_read_result[hpair.first].insert( + cache.get_remaining_extents_for_rmw( + hpair.first, + op->pin, + hpair.second)); + } + op->pending_read.clear(); + } else { + assert(op->pending_read.empty()); + } + map trans; for (set::const_iterator i = get_parent()->get_actingbackfill_shards().begin(); @@ -1688,20 +1836,56 @@ void ECBackend::start_write(Op *op) { ++i) { trans[i->shard]; } - ObjectStore::Transaction empty; - ECTransaction::generate_transactions( - *(op->t), - op->unstable_hash_infos, - ec_impl, - get_parent()->get_info().pgid.pgid, - sinfo, - op->log_entries, - &trans, - &(op->temp_added), - &(op->temp_cleared)); + hobject_t::bitwisemap written; + if (op->plan.t) { + ECTransaction::generate_transactions( + op->plan, + ec_impl, + get_parent()->get_info().pgid.pgid, + !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN), + sinfo, + op->remote_read_result, + op->log_entries, + &written, + &trans, + &(op->temp_added), + &(op->temp_cleared), + get_parent()->get_dpp()); + } + + dout(20) << __func__ << ": " << cache << dendl; + dout(20) << __func__ << ": written: " << written << dendl; + dout(20) << __func__ << ": op: " << *op << dendl; + + if (!get_parent()->get_pool().is_hacky_ecoverwrites()) { + for (auto &&i: op->log_entries) { + if (i.requires_kraken()) { + derr << __func__ << ": log entry " << i << " requires kraken" + << " but overwrites are not enabled!" << dendl; + assert(0); + } + } + } + + hobject_t::bitwisemap written_set; + for (auto &&i: written) { + written_set[i.first] = i.second.get_interval_set(); + } + dout(20) << __func__ << ": written_set: " << written_set << dendl; + assert(written_set == op->plan.will_write); + + if (pipeline_state.caching_enabled() || op->requires_rmw()) { + for (auto &&hpair: written) { + dout(20) << __func__ << ": " << hpair << dendl; + cache.present_rmw_update(hpair.first, op->pin, hpair.second); + } + } + op->remote_read.clear(); + op->remote_read_result.clear(); dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl; + ObjectStore::Transaction empty; for (set::const_iterator i = get_parent()->get_actingbackfill_shards().begin(); @@ -1747,6 +1931,69 @@ void ECBackend::start_write(Op *op) { i->osd, r, get_parent()->get_epoch()); } } + + for (auto i = op->on_write.begin(); + i != op->on_write.end(); + op->on_write.erase(i++)) { + (*i)(); + } + + return true; +} + +bool ECBackend::try_finish_rmw() +{ + if (waiting_commit.empty()) + return false; + Op *op = &(waiting_commit.front()); + if (op->write_in_progress()) + return false; + waiting_commit.pop_front(); + + dout(10) << __func__ << ": " << *op << dendl; + dout(20) << __func__ << ": " << cache << dendl; + + if (op->roll_forward_to > completed_to) + completed_to = op->roll_forward_to; + if (op->version > committed_to) + committed_to = op->version; + + if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) { + if (op->version > get_parent()->get_log().get_can_rollback_to() && + waiting_reads.empty() && + waiting_commit.empty()) { + // submit a dummy transaction to kick the rollforward + auto tid = get_parent()->get_tid(); + Op *nop = &(tid_to_op_map[tid]); + nop->hoid = op->hoid; + nop->trim_to = op->trim_to; + nop->roll_forward_to = op->version; + nop->tid = tid; + nop->reqid = op->reqid; + waiting_reads.push_back(*nop); + } + } + + if (pipeline_state.caching_enabled() || op->requires_rmw()) { + cache.release_write_pin(op->pin); + } + tid_to_op_map.erase(op->tid); + + if (waiting_reads.empty() && + waiting_commit.empty()) { + pipeline_state.clear(); + dout(20) << __func__ << ": clearing pipeline_state " + << pipeline_state + << dendl; + } + return true; +} + +void ECBackend::check_ops() +{ + while (try_state_to_reads() || + try_reads_to_commit() || + try_finish_rmw()); } int ECBackend::objects_read_sync( @@ -1759,31 +2006,137 @@ int ECBackend::objects_read_sync( return -EOPNOTSUPP; } +void ECBackend::objects_read_async( + const hobject_t &hoid, + const list, + pair > > &to_read, + Context *on_complete, + bool fast_read) +{ + hobject_t::bitwisemap > > + reads; + + uint32_t flags = 0; + extent_set es; + for (list, + pair > >::const_iterator i = + to_read.begin(); + i != to_read.end(); + ++i) { + pair tmp = + sinfo.offset_len_to_stripe_bounds( + make_pair(i->first.get<0>(), i->first.get<1>())); + + extent_set esnew; + esnew.insert(tmp.first, tmp.second); + es.union_of(esnew); + flags |= i->first.get<2>(); + } + + if (!es.empty()) { + auto &offsets = reads[hoid]; + for (auto j = es.begin(); + j != es.end(); + ++j) { + offsets.push_back( + boost::make_tuple( + j.get_start(), + j.get_len(), + flags)); + } + } + + struct cb { + ECBackend *ec; + hobject_t hoid; + list, + pair > > to_read; + unique_ptr on_complete; + cb(const cb&) = delete; + cb(cb &&) = default; + cb(ECBackend *ec, + const hobject_t &hoid, + const list, + pair > > &to_read, + Context *on_complete) + : ec(ec), + hoid(hoid), + to_read(to_read), + on_complete(on_complete) {} + void operator()(hobject_t::bitwisemap &&results) { + auto dpp = ec->get_parent()->get_dpp(); + ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results + << dendl; + ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache + << dendl; + + auto &got = results[hoid]; + + for (auto &&read: to_read) { + assert(read.second.first); + uint64_t offset = read.first.get<0>(); + uint64_t length = read.first.get<1>(); + auto range = got.get_containing_range(offset, length); + assert(range.first != range.second); + assert(range.first.get_off() <= offset); + assert( + (offset + length) <= + (range.first.get_off() + range.first.get_len())); + read.second.first->substr_of( + range.first.get_val(), + offset - range.first.get_off(), + length); + if (read.second.second) { + read.second.second->complete(length); + read.second.second = nullptr; + } + } + to_read.clear(); + if (on_complete) { + on_complete.release()->complete(0); + } + } + ~cb() { + for (auto &&i: to_read) { + delete i.second.second; + } + to_read.clear(); + } + }; + objects_read_and_reconstruct( + reads, + fast_read, + make_gen_lambda_context &&, cb>( + cb(this, + hoid, + to_read, + on_complete))); +} + struct CallClientContexts : public GenContext &> { + hobject_t hoid; ECBackend *ec; ECBackend::ClientAsyncReadStatus *status; - list, - pair > > to_read; + list > to_read; CallClientContexts( + hobject_t hoid, ECBackend *ec, ECBackend::ClientAsyncReadStatus *status, - const list, - pair > > &to_read) - : ec(ec), status(status), to_read(to_read) {} + const list > &to_read) + : hoid(hoid), ec(ec), status(status), to_read(to_read) {} void finish(pair &in) { ECBackend::read_result_t &res = in.second; + extent_map result; if (res.r != 0) goto out; assert(res.returned.size() == to_read.size()); assert(res.r == 0); assert(res.errors.empty()); - for (list, - pair > >::iterator i = to_read.begin(); - i != to_read.end(); - to_read.erase(i++)) { + for (auto &&read: to_read) { pair adjusted = - ec->sinfo.offset_len_to_stripe_bounds(make_pair(i->first.get<0>(), i->first.get<1>())); + ec->sinfo.offset_len_to_stripe_bounds( + make_pair(read.get<0>(), read.get<1>())); assert(res.returned.front().get<0>() == adjusted.first && res.returned.front().get<1>() == adjusted.second); map to_decode; @@ -1803,82 +2156,64 @@ struct CallClientContexts : res.r = r; goto out; } - assert(i->second.second); - assert(i->second.first); - i->second.first->substr_of( + bufferlist trimmed; + trimmed.substr_of( bl, - i->first.get<0>() - adjusted.first, - MIN(i->first.get<1>(), bl.length() - (i->first.get<0>() - adjusted.first))); - if (i->second.second) { - i->second.second->complete(i->second.first->length()); - } + read.get<0>() - adjusted.first, + MIN(read.get<1>(), + bl.length() - (read.get<0>() - adjusted.first))); + result.insert( + read.get<0>(), trimmed.length(), std::move(trimmed)); res.returned.pop_front(); } out: - status->complete = true; - list &ip = - ec->in_progress_client_reads; - while (ip.size() && ip.front().complete) { - if (ip.front().on_complete) { - ip.front().on_complete->complete(res.r); - ip.front().on_complete = NULL; - } - ip.pop_front(); - } - } - ~CallClientContexts() { - for (list, - pair > >::iterator i = to_read.begin(); - i != to_read.end(); - to_read.erase(i++)) { - delete i->second.second; - } + status->complete_object(hoid, std::move(result)); + ec->kick_reads(); } }; -void ECBackend::objects_read_async( - const hobject_t &hoid, - const list, - pair > > &to_read, - Context *on_complete, - bool fast_read) +void ECBackend::objects_read_and_reconstruct( + const hobject_t::bitwisemap< + std::list > + > &reads, + bool fast_read, + GenContextURef &&> &&func) { - in_progress_client_reads.push_back(ClientAsyncReadStatus(on_complete)); - CallClientContexts *c = new CallClientContexts( - this, &(in_progress_client_reads.back()), to_read); - - list > offsets; - pair tmp; - for (list, - pair > >::const_iterator i = - to_read.begin(); - i != to_read.end(); - ++i) { - tmp = sinfo.offset_len_to_stripe_bounds(make_pair(i->first.get<0>(), i->first.get<1>())); - offsets.push_back(boost::make_tuple(tmp.first, tmp.second, i->first.get<2>())); + in_progress_client_reads.emplace_back( + reads.size(), std::move(func)); + if (!reads.size()) { + kick_reads(); + return; } set want_to_read; get_want_to_read_shards(&want_to_read); - set shards; - int r = get_min_avail_to_read_shards( - hoid, - want_to_read, - false, - fast_read, - &shards); - assert(r == 0); - map for_read_op; - for_read_op.insert( - make_pair( - hoid, - read_request_t( - offsets, - shards, - false, - c))); + for (auto &&to_read: reads) { + set shards; + int r = get_min_avail_to_read_shards( + to_read.first, + want_to_read, + false, + fast_read, + &shards); + assert(r == 0); + + CallClientContexts *c = new CallClientContexts( + to_read.first, + this, + &(in_progress_client_reads.back()), + to_read.second); + for_read_op.insert( + make_pair( + to_read.first, + read_request_t( + to_read.second, + shards, + false, + c))); + } start_read_op( CEPH_MSG_PRIO_DEFAULT, @@ -1907,8 +2242,11 @@ int ECBackend::send_all_remaining_reads( dout(10) << __func__ << " Read remaining shards " << shards << dendl; - list > offsets = rop.to_read.find(hoid)->second.to_read; - GenContext &> *c = rop.to_read.find(hoid)->second.cb; + // TODOSAM: this doesn't seem right + list > offsets = + rop.to_read.find(hoid)->second.to_read; + GenContext &> *c = + rop.to_read.find(hoid)->second.cb; map for_read_op; for_read_op.insert( @@ -1920,7 +2258,8 @@ int ECBackend::send_all_remaining_reads( false, c))); - do_read_op(rop, for_read_op); + rop.to_read.swap(for_read_op); + do_read_op(rop); return 0; } @@ -2009,26 +2348,35 @@ void ECBackend::be_deep_scrub( o.digest_present = false; return; } else { - if (hinfo->get_total_chunk_size() != pos) { - dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl; - o.ec_size_mismatch = true; - return; - } + if (!get_parent()->get_pool().is_hacky_ecoverwrites()) { + assert(hinfo->has_chunk_hash()); + if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) { + dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl; + o.ec_hash_mismatch = true; + return; + } - if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) { - dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl; - o.ec_hash_mismatch = true; - return; - } + if (hinfo->get_total_chunk_size() != pos) { + dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl; + o.ec_hash_mismatch = true; + return; + } - /* We checked above that we match our own stored hash. We cannot - * send a hash of the actual object, so instead we simply send - * our locally stored hash of shard 0 on the assumption that if - * we match our chunk hash and our recollection of the hash for - * chunk 0 matches that of our peers, there is likely no corruption. - */ - o.digest = hinfo->get_chunk_hash(0); - o.digest_present = true; + /* We checked above that we match our own stored hash. We cannot + * send a hash of the actual object, so instead we simply send + * our locally stored hash of shard 0 on the assumption that if + * we match our chunk hash and our recollection of the hash for + * chunk 0 matches that of our peers, there is likely no corruption. + */ + o.digest = hinfo->get_chunk_hash(0); + o.digest_present = true; + } else { + /* Hack! We must be using partial overwrites, and partial overwrites + * don't support deep-scrub yet + */ + o.digest = 0; + o.digest_present = true; + } } o.omap_digest = seed; diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index fc008fe4b3a..3689d118789 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -15,11 +15,15 @@ #ifndef ECBACKEND_H #define ECBACKEND_H +#include +#include + #include "OSD.h" #include "PGBackend.h" #include "erasure-code/ErasureCodeInterface.h" #include "ECUtil.h" #include "ECTransaction.h" +#include "ExtentCache.h" //forward declaration struct ECSubWrite; @@ -88,8 +92,11 @@ public: void dump_recovery_info(Formatter *f) const; + void call_write_ordered(std::function &&cb) override; + void submit_transaction( const hobject_t &hoid, + const object_stat_sum_t &delta_stats, const eversion_t &at_version, PGTransactionUPtr &&t, const eversion_t &trim_to, @@ -129,12 +136,36 @@ public: * ensures that we won't ever have to restart a client initiated read in * check_recovery_sources. */ + void objects_read_and_reconstruct( + const hobject_t::bitwisemap< + std::list > + > &reads, + bool fast_read, + GenContextURef &&> &&func); + friend struct CallClientContexts; struct ClientAsyncReadStatus { - bool complete; - Context *on_complete; - explicit ClientAsyncReadStatus(Context *on_complete) - : complete(false), on_complete(on_complete) {} + unsigned objects_to_read; + GenContextURef &&> func; + hobject_t::bitwisemap results; + explicit ClientAsyncReadStatus( + unsigned objects_to_read, + GenContextURef &&> &&func) + : objects_to_read(objects_to_read), func(std::move(func)) {} + void complete_object( + const hobject_t &hoid, + extent_map &&buffers) { + assert(objects_to_read); + --objects_to_read; + assert(!results.count(hoid)); + results.emplace(hoid, std::move(buffers)); + } + bool is_complete() const { + return objects_to_read == 0; + } + void run() { + func.release()->complete(std::move(results)); + } }; list in_progress_client_reads; void objects_read_async( @@ -144,6 +175,32 @@ public: Context *on_complete, bool fast_read = false); + template + void objects_read_async_no_cache( + const hobject_t::bitwisemap &to_read, + Func &&on_complete) { + hobject_t::bitwisemap< + std::list > > _to_read; + for (auto &&hpair: to_read) { + auto &l = _to_read[hpair.first]; + for (auto extent: hpair.second) { + l.emplace_back(extent.first, extent.second, 0); + } + } + objects_read_and_reconstruct( + _to_read, + false, + make_gen_lambda_context &&, Func>( + std::forward(on_complete))); + } + void kick_reads() { + while (in_progress_client_reads.size() && + in_progress_client_reads.front().is_complete()) { + in_progress_client_reads.front().run(); + in_progress_client_reads.pop_front(); + } + } + private: friend struct ECRecoveryHandle; uint64_t get_recovery_chunk_size() const { @@ -221,7 +278,7 @@ private: } // must be filled if state == WRITING - map returned_data; + map returned_data; map xattrs; ECUtil::HashInfoRef hinfo; ObjectContextRef obc; @@ -237,6 +294,24 @@ private: friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs); map recovery_ops; + void continue_recovery_op( + RecoveryOp &op, + RecoveryMessages *m); + void dispatch_recovery_messages(RecoveryMessages &m, int priority); + friend struct OnRecoveryReadComplete; + void handle_recovery_read_complete( + const hobject_t &hoid, + boost::tuple > &to_read, + boost::optional > attrs, + RecoveryMessages *m); + void handle_recovery_push( + PushOp &op, + RecoveryMessages *m); + void handle_recovery_push_reply( + PushReplyOp &op, + pg_shard_t from, + RecoveryMessages *m); + public: /** * Low level async read mechanism @@ -319,8 +394,7 @@ public: OpRequestRef op, bool do_redundant_reads, bool for_recovery); - void do_read_op(ReadOp &rop, - map &to_read); + void do_read_op(ReadOp &rop); int send_all_remaining_reads( const hobject_t &hoid, ReadOp &rop); @@ -339,59 +413,122 @@ public: * completions. Thus, callbacks and completion are called in order * on the writing list. */ - struct Op { + struct Op : boost::intrusive::list_base_hook<> { + /// From submit_transaction caller, decribes operation hobject_t hoid; + object_stat_sum_t delta_stats; eversion_t version; eversion_t trim_to; - eversion_t roll_forward_to; - vector log_entries; - map obc_map; boost::optional updated_hit_set_history; - Context *on_local_applied_sync; - Context *on_all_applied; - Context *on_all_commit; + vector log_entries; ceph_tid_t tid; osd_reqid_t reqid; - OpRequestRef client_op; - std::unique_ptr t; + eversion_t roll_forward_to; /// Soon to be generated internally + + /// Ancillary also provided from submit_transaction caller + map obc_map; + + /// see call_write_ordered + std::list > on_write; + /// Generated internally set temp_added; set temp_cleared; + ECTransaction::WritePlan plan; + bool requires_rmw() const { return !plan.to_read.empty(); } + bool invalidates_cache() const { return plan.invalidates_cache; } + + /// In progress read state; + hobject_t::bitwisemap pending_read; // subset already being read + hobject_t::bitwisemap remote_read; // subset we must read + hobject_t::bitwisemap remote_read_result; + bool read_in_progress() const { + return !remote_read.empty() && remote_read_result.empty(); + } + + /// In progress write state set pending_commit; set pending_apply; + bool write_in_progress() const { + return !pending_commit.empty() || !pending_apply.empty(); + } + + /// optional, may be null, for tracking purposes + OpRequestRef client_op; + + /// pin for cache + ExtentCache::write_pin pin; - map unstable_hash_infos; + /// Callbacks + Context *on_local_applied_sync = nullptr; + Context *on_all_applied = nullptr; + Context *on_all_commit = nullptr; ~Op() { delete on_local_applied_sync; delete on_all_applied; delete on_all_commit; } }; + using op_list = boost::intrusive::list; friend ostream &operator<<(ostream &lhs, const Op &rhs); - void continue_recovery_op( - RecoveryOp &op, - RecoveryMessages *m); + ExtentCache cache; + map tid_to_op_map; /// Owns Op structure + + /** + * We model the possible rmw states as a set of waitlists. + * All writes at this time complete in order, so a write blocked + * at waiting_state blocks all writes behind it as well (same for + * other states). + * + * Future work: We can break this up into a per-object pipeline + * (almost). First, provide an ordering token to submit_transaction + * and require that all operations within a single transaction take + * place on a subset of hobject_t space partitioned by that token + * (the hashid seem about right to me -- even works for temp objects + * if you recall that a temp object created for object head foo will + * only ever be referenced by other transactions on foo and aren't + * reused). Next, factor this part into a class and maintain one per + * ordering token. Next, fixup ReplicatedPG's repop queue to be + * partitioned by ordering token. Finally, refactor the op pipeline + * so that the log entries passed into submit_tranaction aren't + * versioned. We can't assign versions to them until we actually + * submit the operation. That's probably going to be the hard part. + */ + class pipeline_state_t { + enum { + CACHE_VALID = 0, + CACHE_INVALID = 1 + } pipeline_state = CACHE_VALID; + public: + bool caching_enabled() const { + return pipeline_state == CACHE_VALID; + } + bool cache_invalid() const { + return !caching_enabled(); + } + void invalidate() { + pipeline_state = CACHE_INVALID; + } + void clear() { + pipeline_state = CACHE_VALID; + } + friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs); + } pipeline_state; - void dispatch_recovery_messages(RecoveryMessages &m, int priority); - friend struct OnRecoveryReadComplete; - void handle_recovery_read_complete( - const hobject_t &hoid, - boost::tuple > &to_read, - boost::optional > attrs, - RecoveryMessages *m); - void handle_recovery_push( - PushOp &op, - RecoveryMessages *m); - void handle_recovery_push_reply( - PushReplyOp &op, - pg_shard_t from, - RecoveryMessages *m); - map tid_to_op_map; /// lists below point into here - list writing; + op_list waiting_state; /// writes waiting on pipe_state + op_list waiting_reads; /// writes waiting on partial stripe reads + op_list waiting_commit; /// writes waiting on initial commit + eversion_t completed_to; + eversion_t committed_to; + void start_rmw(Op *op, PGTransactionUPtr &&t); + bool try_state_to_reads(); + bool try_reads_to_commit(); + bool try_finish_rmw(); + void check_ops(); CephContext *cct; ErasureCodeInterfaceRef ec_impl; @@ -454,8 +591,6 @@ public: const map *attr = NULL); friend struct ReadCB; - void check_op(Op *op); - void start_write(Op *op); public: ECBackend( PGBackend::Listener *pg, @@ -503,5 +638,6 @@ public: void _failed_push(const hobject_t &hoid, pair &in); }; +ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs); #endif diff --git a/src/osd/ECTransaction.cc b/src/osd/ECTransaction.cc index 27657504b58..f808162caaf 100644 --- a/src/osd/ECTransaction.cc +++ b/src/osd/ECTransaction.cc @@ -22,61 +22,55 @@ #include "os/ObjectStore.h" #include "common/inline_variant.h" -void ECTransaction::get_append_objects( - const PGTransaction &t, - set *out) -{ - for (auto &&i: t.op_map) { - out->insert(i.first); - hobject_t source; - if (i.second.has_source(&source)) - out->insert(source); - } -} -void append( +void encode_and_write( pg_t pgid, const hobject_t &oid, const ECUtil::stripe_info_t &sinfo, ErasureCodeInterfaceRef &ecimpl, const set &want, uint64_t offset, - bufferlist &bl, + bufferlist bl, uint32_t flags, ECUtil::HashInfoRef hinfo, - map *transactions) { - + extent_map &written, + map *transactions, + DoutPrefixProvider *dpp) { + const uint64_t before_size = hinfo->get_total_logical_size(sinfo); + assert(sinfo.logical_offset_is_stripe_aligned(offset)); + assert(sinfo.logical_offset_is_stripe_aligned(bl.length())); assert(bl.length()); - assert(offset % sinfo.get_stripe_width() == 0); - assert( - sinfo.aligned_logical_offset_to_chunk_offset(offset) == - hinfo->get_total_chunk_size()); - map buffers; - // align - if (bl.length() % sinfo.get_stripe_width()) - bl.append_zero( - sinfo.get_stripe_width() - - ((offset + bl.length()) % sinfo.get_stripe_width())); + map buffers; int r = ECUtil::encode( sinfo, ecimpl, bl, want, &buffers); + assert(r == 0); - hinfo->append( - sinfo.aligned_logical_offset_to_chunk_offset(offset), - buffers); - bufferlist hbuf; - ::encode(*hinfo, hbuf); + written.insert(offset, bl.length(), bl); + + ldpp_dout(dpp, 20) << __func__ << ": " << oid + << " new_size " + << offset + bl.length() + << dendl; + + if (offset >= before_size) { + assert(offset == before_size); + hinfo->append( + sinfo.aligned_logical_offset_to_chunk_offset(offset), + buffers); + } - assert(r == 0); for (auto &&i : *transactions) { assert(buffers.count(i.first)); bufferlist &enc_bl = buffers[i.first]; - i.second.set_alloc_hint( - coll_t(spg_t(pgid, i.first)), - ghobject_t(oid, ghobject_t::NO_GEN, i.first), - 0, 0, - CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE | - CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY); + if (offset >= before_size) { + i.second.set_alloc_hint( + coll_t(spg_t(pgid, i.first)), + ghobject_t(oid, ghobject_t::NO_GEN, i.first), + 0, 0, + CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE | + CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY); + } i.second.write( coll_t(spg_t(pgid, i.first)), ghobject_t(oid, ghobject_t::NO_GEN, i.first), @@ -85,28 +79,45 @@ void append( enc_bl.length(), enc_bl, flags); - i.second.setattr( - coll_t(spg_t(pgid, i.first)), - ghobject_t(oid, ghobject_t::NO_GEN, i.first), - ECUtil::get_hinfo_key(), - hbuf); } } +bool ECTransaction::requires_overwrite( + uint64_t prev_size, + const PGTransaction::ObjectOperation &op) { + // special handling for truncates to 0 + if (op.truncate && op.truncate->first == 0) + return false; + return op.is_none() && + ((!op.buffer_updates.empty() && + (op.buffer_updates.begin().get_off() < prev_size)) || + (op.truncate && + (op.truncate->first < prev_size))); +} + void ECTransaction::generate_transactions( - PGTransaction &t, - map< - hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator - > &hash_infos, + WritePlan &plan, ErasureCodeInterfaceRef &ecimpl, pg_t pgid, + bool legacy_log_entries, const ECUtil::stripe_info_t &sinfo, + const hobject_t::bitwisemap &partial_extents, vector &entries, + hobject_t::bitwisemap *written_map, map *transactions, set *temp_added, set *temp_removed, - stringstream *out) + DoutPrefixProvider *dpp) { + assert(written_map); + assert(transactions); + assert(temp_added); + assert(temp_removed); + assert(plan.t); + auto &t = *(plan.t); + + auto &hash_infos = plan.hash_infos; + assert(transactions); assert(temp_added); assert(temp_removed); @@ -118,279 +129,528 @@ void ECTransaction::generate_transactions( t.safe_create_traverse( [&](pair &opair) { - const hobject_t &oid = opair.first; + const hobject_t &oid = opair.first; + auto &op = opair.second; + auto &obc_map = t.obc_map; + auto &written = (*written_map)[oid]; - auto iter = obj_to_log.find(oid); - pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr; + auto iter = obj_to_log.find(oid); + pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr; - if (entry && opair.second.updated_snaps) { - entry->mod_desc.update_snaps(opair.second.updated_snaps->first); - vector snaps( - opair.second.updated_snaps->second.begin(), - opair.second.updated_snaps->second.end()); - ::encode(snaps, entry->snaps); - } + ObjectContextRef obc; + auto obiter = t.obc_map.find(oid); + if (obiter != t.obc_map.end()) { + obc = obiter->second; + } + if (entry) { + assert(obc); + } else { + assert(oid.is_temp()); + } - ObjectContextRef obc; - auto obiter = t.obc_map.find(oid); - if (obiter != t.obc_map.end()) { - obc = obiter->second; - } - if (entry) { - assert(obc); - } else { - assert(oid.is_temp()); - } + ECUtil::HashInfoRef hinfo; + { + auto iter = hash_infos.find(oid); + assert(iter != hash_infos.end()); + hinfo = iter->second; + } + + if (oid.is_temp()) { + if (op.is_fresh_object()) { + temp_added->insert(oid); + } else if (op.is_delete()) { + temp_removed->insert(oid); + } + } + + if (entry && + entry->is_modify() && + op.updated_snaps) { + vector snaps( + op.updated_snaps->second.begin(), + op.updated_snaps->second.end()); + ::encode(snaps, entry->snaps); + } + + ldpp_dout(dpp, 20) << "generate_transactions: " + << opair.first + << ", current size is " + << hinfo->get_total_logical_size(sinfo) + << " buffers are " + << op.buffer_updates + << dendl; + if (op.truncate) { + ldpp_dout(dpp, 20) << "generate_transactions: " + << " truncate is " + << *(op.truncate) + << dendl; + } + + if (entry && op.updated_snaps) { + entry->mod_desc.update_snaps(op.updated_snaps->first); + } - map > xattr_rollback; - ECUtil::HashInfoRef hinfo; - { - auto iter = hash_infos.find(oid); - assert(iter != hash_infos.end()); - hinfo = iter->second; + map > xattr_rollback; + assert(hinfo); bufferlist old_hinfo; ::encode(*hinfo, old_hinfo); xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo; - } + + if (op.is_none() && op.truncate && op.truncate->first == 0) { + assert(op.truncate->first == 0); + assert(op.truncate->first == + op.truncate->second); + assert(entry); + assert(obc); + + if (op.truncate->first != op.truncate->second) { + op.truncate->first = op.truncate->second; + } else { + op.truncate = boost::none; + } - if (opair.second.is_none() && opair.second.truncate) { - assert(opair.second.truncate->first == 0); - assert(opair.second.truncate->first == - opair.second.truncate->second); - assert(entry); - assert(obc); - - opair.second.truncate = boost::none; - opair.second.delete_first = true; - opair.second.init_type = PGTransaction::ObjectOperation::Init::Create(); - - if (obc) { - /* We need to reapply all of the cached xattrs. - * std::map insert fortunately only writes keys - * which don't already exist, so this should do - * the right thing. */ - opair.second.attr_updates.insert( - obc->attr_cache.begin(), - obc->attr_cache.end()); - } - } + op.delete_first = true; + op.init_type = PGTransaction::ObjectOperation::Init::Create(); - if (oid.is_temp()) { - if (opair.second.is_fresh_object()) { - temp_added->insert(oid); - } else if (opair.second.is_delete()) { - temp_removed->insert(oid); + if (obc) { + /* We need to reapply all of the cached xattrs. + * std::map insert fortunately only writes keys + * which don't already exist, so this should do + * the right thing. */ + op.attr_updates.insert( + obc->attr_cache.begin(), + obc->attr_cache.end()); + } } - } - if (opair.second.delete_first) { - /* We also want to remove the boost::none entries since - * the keys already won't exist */ - for (auto j = opair.second.attr_updates.begin(); - j != opair.second.attr_updates.end(); - ) { - if (j->second) { - ++j; + if (op.delete_first) { + /* We also want to remove the boost::none entries since + * the keys already won't exist */ + for (auto j = op.attr_updates.begin(); + j != op.attr_updates.end(); + ) { + if (j->second) { + ++j; + } else { + op.attr_updates.erase(j++); + } + } + /* Fill in all current entries for xattr rollback */ + if (obc) { + xattr_rollback.insert( + obc->attr_cache.begin(), + obc->attr_cache.end()); + obc->attr_cache.clear(); + } + if (entry) { + entry->mod_desc.rmobject(entry->version.version); + for (auto &&st: *transactions) { + st.second.collection_move_rename( + coll_t(spg_t(pgid, st.first)), + ghobject_t(oid, ghobject_t::NO_GEN, st.first), + coll_t(spg_t(pgid, st.first)), + ghobject_t(oid, entry->version.version, st.first)); + } } else { - opair.second.attr_updates.erase(j++); + for (auto &&st: *transactions) { + st.second.remove( + coll_t(spg_t(pgid, st.first)), + ghobject_t(oid, ghobject_t::NO_GEN, st.first)); + } } + hinfo->clear(); } - /* Fill in all current entries for xattr rollback */ - if (obc) { - xattr_rollback.insert( - obc->attr_cache.begin(), - obc->attr_cache.end()); - obc->attr_cache.clear(); + + if (op.is_fresh_object() && entry) { + entry->mod_desc.create(); } - if (entry) { - entry->mod_desc.rmobject(entry->version.version); - for (auto &&st: *transactions) { - st.second.collection_move_rename( + + match( + op.init_type, + [&](const PGTransaction::ObjectOperation::Init::None &) {}, + [&](const PGTransaction::ObjectOperation::Init::Create &op) { + for (auto &&st: *transactions) { + st.second.touch( + coll_t(spg_t(pgid, st.first)), + ghobject_t(oid, ghobject_t::NO_GEN, st.first)); + } + }, + [&](const PGTransaction::ObjectOperation::Init::Clone &op) { + for (auto &&st: *transactions) { + st.second.clone( + coll_t(spg_t(pgid, st.first)), + ghobject_t(op.source, ghobject_t::NO_GEN, st.first), + ghobject_t(oid, ghobject_t::NO_GEN, st.first)); + } + + auto siter = hash_infos.find(op.source); + assert(siter != hash_infos.end()); + hinfo->update_to(*(siter->second)); + + if (obc) { + auto cobciter = obc_map.find(op.source); + assert(cobciter != obc_map.end()); + obc->attr_cache = cobciter->second->attr_cache; + } + }, + [&](const PGTransaction::ObjectOperation::Init::Rename &op) { + assert(op.source.is_temp()); + for (auto &&st: *transactions) { + st.second.collection_move_rename( + coll_t(spg_t(pgid, st.first)), + ghobject_t(op.source, ghobject_t::NO_GEN, st.first), + coll_t(spg_t(pgid, st.first)), + ghobject_t(oid, ghobject_t::NO_GEN, st.first)); + } + auto siter = hash_infos.find(op.source); + assert(siter != hash_infos.end()); + hinfo->update_to(*(siter->second)); + if (obc) { + auto cobciter = obc_map.find(op.source); + assert(cobciter == obc_map.end()); + obc->attr_cache.clear(); + } + }); + + // omap not supported (except 0, handled above) + assert(!(op.clear_omap)); + assert(!(op.omap_header)); + assert(op.omap_updates.empty()); + + if (!op.attr_updates.empty()) { + map to_set; + for (auto &&j: op.attr_updates) { + if (j.second) { + to_set[j.first] = *(j.second); + } else { + for (auto &&st : *transactions) { + st.second.rmattr( + coll_t(spg_t(pgid, st.first)), + ghobject_t(oid, ghobject_t::NO_GEN, st.first), + j.first); + } + } + if (obc) { + auto citer = obc->attr_cache.find(j.first); + if (entry) { + if (citer != obc->attr_cache.end()) { + // won't overwrite anything we put in earlier + xattr_rollback.insert( + make_pair( + j.first, + boost::optional(citer->second))); + } else { + // won't overwrite anything we put in earlier + xattr_rollback.insert( + make_pair( + j.first, + boost::none)); + } + } + if (j.second) { + obc->attr_cache[j.first] = *(j.second); + } else if (citer != obc->attr_cache.end()) { + obc->attr_cache.erase(citer); + } + } else { + assert(!entry); + } + } + for (auto &&st : *transactions) { + st.second.setattrs( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first), - coll_t(spg_t(pgid, st.first)), - ghobject_t(oid, entry->version.version, st.first)); + to_set); } - } else { - for (auto &&st: *transactions) { - st.second.remove( + assert(!xattr_rollback.empty()); + } + if (entry && !xattr_rollback.empty()) { + entry->mod_desc.setattrs(xattr_rollback); + } + + if (op.alloc_hint) { + /* logical_to_next_chunk_offset() scales down both aligned and + * unaligned offsets + + * we don't bother to roll this back at this time for two reasons: + * 1) it's advisory + * 2) we don't track the old value */ + uint64_t object_size = sinfo.logical_to_next_chunk_offset( + op.alloc_hint->expected_object_size); + uint64_t write_size = sinfo.logical_to_next_chunk_offset( + op.alloc_hint->expected_write_size); + + for (auto &&st : *transactions) { + st.second.set_alloc_hint( coll_t(spg_t(pgid, st.first)), - ghobject_t(oid, ghobject_t::NO_GEN, st.first)); + ghobject_t(oid, ghobject_t::NO_GEN, st.first), + object_size, + write_size, + op.alloc_hint->flags); } } - hinfo->clear(); - } - if (opair.second.is_fresh_object() && entry) { - entry->mod_desc.create(); - } + extent_map to_write; + auto pextiter = partial_extents.find(oid); + if (pextiter != partial_extents.end()) { + to_write = pextiter->second; + } - match( - opair.second.init_type, - [&](const PGTransaction::ObjectOperation::Init::None &) {}, - [&](const PGTransaction::ObjectOperation::Init::Create &op) { - for (auto &&st: *transactions) { - st.second.touch( - coll_t(spg_t(pgid, st.first)), - ghobject_t(oid, ghobject_t::NO_GEN, st.first)); - } - }, - [&](const PGTransaction::ObjectOperation::Init::Clone &op) { - for (auto &&st: *transactions) { - st.second.clone( - coll_t(spg_t(pgid, st.first)), - ghobject_t(op.source, ghobject_t::NO_GEN, st.first), - ghobject_t(oid, ghobject_t::NO_GEN, st.first)); + vector > rollback_extents; + const uint64_t orig_size = hinfo->get_total_logical_size(sinfo); + + uint64_t new_size = orig_size; + uint64_t append_after = new_size; + ldpp_dout(dpp, 20) << __func__ << ": new_size start " << new_size << dendl; + if (op.truncate && op.truncate->first < new_size) { + assert(!op.is_fresh_object()); + new_size = sinfo.logical_to_next_stripe_offset( + op.truncate->first); + ldpp_dout(dpp, 20) << __func__ << ": new_size truncate down " + << new_size << dendl; + if (new_size != op.truncate->first) { // 0 the unaligned part + bufferlist bl; + bl.append_zero(new_size - op.truncate->first); + to_write.insert( + op.truncate->first, + bl.length(), + bl); + append_after = sinfo.logical_to_prev_stripe_offset( + op.truncate->first); + } else { + append_after = new_size; } + to_write.erase( + new_size, + std::numeric_limits::max() - new_size); - auto siter = hash_infos.find(op.source); - assert(siter != hash_infos.end()); - *hinfo = *(siter->second); + if (entry && !op.is_fresh_object()) { + uint64_t restore_from = sinfo.logical_to_prev_chunk_offset( + op.truncate->first); + uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset( + orig_size - + sinfo.logical_to_prev_stripe_offset(op.truncate->first)); + assert(rollback_extents.empty()); - if (obc) { - auto cobciter = t.obc_map.find(op.source); - assert(cobciter != t.obc_map.end()); - obc->attr_cache = cobciter->second->attr_cache; - } - }, - [&](const PGTransaction::ObjectOperation::Init::Rename &op) { - assert(op.source.is_temp()); - for (auto &&st: *transactions) { - st.second.collection_move_rename( - coll_t(spg_t(pgid, st.first)), - ghobject_t(op.source, ghobject_t::NO_GEN, st.first), - coll_t(spg_t(pgid, st.first)), - ghobject_t(oid, ghobject_t::NO_GEN, st.first)); - } - auto siter = hash_infos.find(op.source); - assert(siter != hash_infos.end()); - *hinfo = *(siter->second); - assert(obc->attr_cache.empty()); - }); - - // omap, truncate not supported (except 0, handled above) - assert(!(opair.second.clear_omap)); - assert(!(opair.second.truncate)); - assert(!(opair.second.omap_header)); - assert(opair.second.omap_updates.empty()); - - if (!opair.second.attr_updates.empty()) { - map to_set; - for (auto &&j: opair.second.attr_updates) { - if (j.second) { - to_set[j.first] = *(j.second); - } else { + ldpp_dout(dpp, 20) << __func__ << ": saving extent " + << make_pair(restore_from, restore_len) + << dendl; + ldpp_dout(dpp, 20) << __func__ << ": truncating to " + << new_size + << dendl; + rollback_extents.emplace_back( + make_pair(restore_from, restore_len)); for (auto &&st : *transactions) { - st.second.rmattr( + st.second.touch( + coll_t(spg_t(pgid, st.first)), + ghobject_t(oid, entry->version.version, st.first)); + st.second.clone_range( coll_t(spg_t(pgid, st.first)), ghobject_t(oid, ghobject_t::NO_GEN, st.first), - j.first); - } - } - if (obc) { - auto citer = obc->attr_cache.find(j.first); - if (entry) { - if (citer != obc->attr_cache.end()) { - // won't overwrite anything we put in earlier - xattr_rollback.insert( - make_pair( - j.first, - boost::optional(citer->second))); - } else { - // won't overwrite anything we put in earlier - xattr_rollback.insert( - make_pair( - j.first, - boost::none)); - } - } - if (j.second) { - obc->attr_cache[j.first] = *(j.second); - } else if (citer != obc->attr_cache.end()) { - obc->attr_cache.erase(citer); + ghobject_t(oid, entry->version.version, st.first), + restore_from, + restore_len, + restore_from); + } } else { - assert(!entry); + ldpp_dout(dpp, 20) << __func__ << ": not saving extents, fresh object" + << dendl; + } + for (auto &&st : *transactions) { + st.second.truncate( + coll_t(spg_t(pgid, st.first)), + ghobject_t(oid, ghobject_t::NO_GEN, st.first), + sinfo.aligned_logical_offset_to_chunk_offset(new_size)); } } - for (auto &&st : *transactions) { - st.second.setattrs( - coll_t(spg_t(pgid, st.first)), - ghobject_t(oid, ghobject_t::NO_GEN, st.first), - to_set); - } - assert(!xattr_rollback.empty()); - } - if (entry && !xattr_rollback.empty()) { - entry->mod_desc.setattrs(xattr_rollback); - } - - if (opair.second.alloc_hint) { - /* logical_to_next_chunk_offset() scales down both aligned and - * unaligned offsets - - * we don't bother to roll this back at this time for two reasons: - * 1) it's advisory - * 2) we don't track the old value */ - uint64_t object_size = sinfo.logical_to_next_chunk_offset( - opair.second.alloc_hint->expected_object_size); - uint64_t write_size = sinfo.logical_to_next_chunk_offset( - opair.second.alloc_hint->expected_write_size); - - for (auto &&st : *transactions) { - st.second.set_alloc_hint( - coll_t(spg_t(pgid, st.first)), - ghobject_t(oid, ghobject_t::NO_GEN, st.first), - object_size, - write_size, - opair.second.alloc_hint->flags); - } - } - - if (!opair.second.buffer_updates.empty()) { - set want; - for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) { - want.insert(i); - } - if (entry) { - entry->mod_desc.append( - sinfo.aligned_chunk_offset_to_logical_offset( - hinfo->get_total_chunk_size() - )); - } - for (auto &&extent: opair.second.buffer_updates) { + uint32_t fadvise_flags = 0; + for (auto &&extent: op.buffer_updates) { using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate; + bufferlist bl; match( extent.get_val(), [&](const BufferUpdate::Write &op) { - if (extent.get_len()) { - assert(op.buffer.length() == extent.get_len()); - bufferlist bl = op.buffer; - append( - pgid, - oid, - sinfo, - ecimpl, - want, - extent.get_off(), - bl, - op.fadvise_flags, - hinfo, - transactions); - } + bl = op.buffer; + fadvise_flags |= op.fadvise_flags; }, [&](const BufferUpdate::Zero &) { - assert( - 0 == - "Zero is not allowed, do_op should have returned ENOTSUPP"); + bl.append_zero(extent.get_len()); }, [&](const BufferUpdate::CloneRange &) { assert( 0 == "CloneRange is not allowed, do_op should have returned ENOTSUPP"); }); + + uint64_t off = extent.get_off(); + uint64_t len = extent.get_len(); + uint64_t end = off + len; + ldpp_dout(dpp, 20) << __func__ << ": adding buffer_update " + << make_pair(off, len) + << dendl; + assert(len > 0); + if (off > new_size) { + assert(off > append_after); + bl.prepend_zero(off - new_size); + len += off - new_size; + ldpp_dout(dpp, 20) << __func__ << ": prepending zeroes to align " + << off << "->" << new_size + << dendl; + off = new_size; + } + if (!sinfo.logical_offset_is_stripe_aligned(end) && (end > append_after)) { + uint64_t aligned_end = sinfo.logical_to_next_stripe_offset( + end); + uint64_t tail = aligned_end - end; + bl.append_zero(tail); + ldpp_dout(dpp, 20) << __func__ << ": appending zeroes to align end " + << end << "->" << end+tail + << ", len: " << len << "->" << len+tail + << dendl; + end += tail; + len += tail; + } + + to_write.insert(off, len, bl); + if (end > new_size) + new_size = end; } - } - }); + + if (op.truncate && + op.truncate->second > new_size) { + assert(op.truncate->second > append_after); + uint64_t truncate_to = + sinfo.logical_to_next_stripe_offset( + op.truncate->second); + uint64_t zeroes = truncate_to - new_size; + bufferlist bl; + bl.append_zero(zeroes); + to_write.insert( + new_size, + zeroes, + bl); + new_size = truncate_to; + ldpp_dout(dpp, 20) << __func__ << ": truncating out to " + << truncate_to + << dendl; + } + + set want; + for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) { + want.insert(i); + } + auto to_overwrite = to_write.intersect(0, append_after); + ldpp_dout(dpp, 20) << __func__ << ": to_overwrite: " + << to_overwrite + << dendl; + for (auto &&extent: to_overwrite) { + assert(extent.get_off() + extent.get_len() <= append_after); + assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off())); + assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len())); + if (entry) { + uint64_t restore_from = sinfo.aligned_logical_offset_to_chunk_offset( + extent.get_off()); + uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset( + extent.get_len()); + ldpp_dout(dpp, 20) << __func__ << ": overwriting " + << restore_from << "~" << restore_len + << dendl; + if (rollback_extents.empty()) { + for (auto &&st : *transactions) { + st.second.touch( + coll_t(spg_t(pgid, st.first)), + ghobject_t(oid, entry->version.version, st.first)); + } + } + rollback_extents.emplace_back(make_pair(restore_from, restore_len)); + for (auto &&st : *transactions) { + st.second.clone_range( + coll_t(spg_t(pgid, st.first)), + ghobject_t(oid, ghobject_t::NO_GEN, st.first), + ghobject_t(oid, entry->version.version, st.first), + restore_from, + restore_len, + restore_from); + } + } + encode_and_write( + pgid, + oid, + sinfo, + ecimpl, + want, + extent.get_off(), + extent.get_val(), + fadvise_flags, + hinfo, + written, + transactions, + dpp); + } + + auto to_append = to_write.intersect( + append_after, + std::numeric_limits::max() - append_after); + ldpp_dout(dpp, 20) << __func__ << ": to_append: " + << to_append + << dendl; + for (auto &&extent: to_append) { + assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off())); + assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len())); + ldpp_dout(dpp, 20) << __func__ << ": appending " + << extent.get_off() << "~" << extent.get_len() + << dendl; + encode_and_write( + pgid, + oid, + sinfo, + ecimpl, + want, + extent.get_off(), + extent.get_val(), + fadvise_flags, + hinfo, + written, + transactions, + dpp); + } + + ldpp_dout(dpp, 20) << __func__ << ": " << oid + << " resetting hinfo to logical size " + << new_size + << dendl; + if (!rollback_extents.empty() && entry) { + if (entry) { + ldpp_dout(dpp, 20) << __func__ << ": " << oid + << " marking rollback extents " + << rollback_extents + << dendl; + entry->mod_desc.rollback_extents( + entry->version.version, rollback_extents); + } + hinfo->set_total_chunk_size_clear_hash( + sinfo.aligned_logical_offset_to_chunk_offset(new_size)); + } else { + assert(hinfo->get_total_logical_size(sinfo) == new_size); + } + + if (entry && !to_append.empty()) { + ldpp_dout(dpp, 20) << __func__ << ": marking append " + << append_after + << dendl; + entry->mod_desc.append(append_after); + } + + bufferlist hbuf; + ::encode(*hinfo, hbuf); + for (auto &&i : *transactions) { + i.second.setattr( + coll_t(spg_t(pgid, i.first)), + ghobject_t(oid, ghobject_t::NO_GEN, i.first), + ECUtil::get_hinfo_key(), + hbuf); + } + }); } diff --git a/src/osd/ECTransaction.h b/src/osd/ECTransaction.h index aa6bef5fabd..789f8c8ec93 100644 --- a/src/osd/ECTransaction.h +++ b/src/osd/ECTransaction.h @@ -20,24 +20,160 @@ #include "ECUtil.h" #include "erasure-code/ErasureCodeInterface.h" #include "PGTransaction.h" +#include "ExtentCache.h" namespace ECTransaction { - void get_append_objects( - const PGTransaction &t, - set *out); + struct WritePlan { + PGTransactionUPtr t; + bool invalidates_cache = false; // Yes, both are possible + hobject_t::bitwisemap to_read; + hobject_t::bitwisemap will_write; // superset of to_read + + hobject_t::bitwisemap hash_infos; + }; + + bool requires_overwrite( + uint64_t prev_size, + const PGTransaction::ObjectOperation &op); + + template + WritePlan get_write_plan( + const ECUtil::stripe_info_t &sinfo, + PGTransactionUPtr &&t, + F &&get_hinfo, + DoutPrefixProvider *dpp) { + WritePlan plan; + t->safe_create_traverse( + [&](pair &i) { + ECUtil::HashInfoRef hinfo = get_hinfo(i.first); + plan.hash_infos[i.first] = hinfo; + + uint64_t projected_size = + hinfo->get_projected_total_logical_size(sinfo); + + if (i.second.has_source()) { + plan.invalidates_cache = true; + } + + if (i.second.is_delete()) { + projected_size = 0; + } + + hobject_t source; + if (i.second.has_source(&source)) { + ECUtil::HashInfoRef shinfo = get_hinfo(source); + projected_size = shinfo->get_projected_total_logical_size(sinfo); + plan.hash_infos[source] = shinfo; + } + + auto &will_write = plan.will_write[i.first]; + if (i.second.truncate && + i.second.truncate->first < projected_size) { + if (!(sinfo.logical_offset_is_stripe_aligned( + i.second.truncate->first))) { + plan.to_read[i.first].insert( + sinfo.logical_to_prev_stripe_offset(i.second.truncate->first), + sinfo.get_stripe_width()); + + ldpp_dout(dpp, 20) << __func__ << ": unaligned truncate" << dendl; + + will_write.insert( + sinfo.logical_to_prev_stripe_offset(i.second.truncate->first), + sinfo.get_stripe_width()); + } + projected_size = sinfo.logical_to_next_stripe_offset( + i.second.truncate->first); + } + + for (auto &&extent: i.second.buffer_updates) { + using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate; + if (boost::get(&(extent.get_val()))) { + assert( + 0 == + "CloneRange is not allowed, do_op should have returned ENOTSUPP"); + } + + uint64_t head_start = + sinfo.logical_to_prev_stripe_offset(extent.get_off()); + uint64_t head_finish = + sinfo.logical_to_next_stripe_offset(extent.get_off()); + if (head_start > projected_size) { + head_start = projected_size; + } + if (head_start != head_finish && + head_start < projected_size) { + assert(head_finish <= projected_size); + assert(head_finish - head_start == sinfo.get_stripe_width()); + plan.to_read[i.first].insert( + head_start, sinfo.get_stripe_width()); + } + + uint64_t tail_start = + sinfo.logical_to_prev_stripe_offset( + extent.get_off() + extent.get_len()); + uint64_t tail_finish = + sinfo.logical_to_next_stripe_offset( + extent.get_off() + extent.get_len()); + if (tail_start != tail_finish && + tail_start != head_start && + tail_start < projected_size) { + assert(tail_finish <= projected_size); + assert(tail_finish - tail_start == sinfo.get_stripe_width()); + plan.to_read[i.first].insert( + tail_start, sinfo.get_stripe_width()); + } + + if (head_start != tail_finish) { + assert( + sinfo.logical_offset_is_stripe_aligned( + tail_finish - head_start) + ); + will_write.insert( + head_start, tail_finish - head_start); + if (tail_finish > projected_size) + projected_size = tail_finish; + } else { + assert(tail_finish <= projected_size); + } + } + + if (i.second.truncate && + i.second.truncate->second > projected_size) { + uint64_t truncating_to = + sinfo.logical_to_next_stripe_offset(i.second.truncate->second); + ldpp_dout(dpp, 20) << __func__ << ": truncating out to " + << truncating_to + << dendl; + will_write.insert(projected_size, truncating_to - projected_size); + projected_size = truncating_to; + } + + ldpp_dout(dpp, 20) << __func__ << ": " << i.first + << " projected size " + << projected_size + << dendl; + hinfo->set_projected_total_logical_size( + sinfo, + projected_size); + assert(plan.to_read[i.first].empty() || !i.second.has_source()); + }); + plan.t = std::move(t); + return plan; + } + void generate_transactions( - PGTransaction &t, - map< - hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator - > &hash_infos, + WritePlan &plan, ErasureCodeInterfaceRef &ecimpl, pg_t pgid, + bool legacy_log_entries, const ECUtil::stripe_info_t &sinfo, + const hobject_t::bitwisemap &partial_extents, vector &entries, + hobject_t::bitwisemap *written, map *transactions, set *temp_added, set *temp_removed, - stringstream *out = 0); + DoutPrefixProvider *dpp); }; #endif diff --git a/src/osd/ECUtil.cc b/src/osd/ECUtil.cc index 53fe1a203bf..6e046526eda 100644 --- a/src/osd/ECUtil.cc +++ b/src/osd/ECUtil.cc @@ -139,16 +139,18 @@ int ECUtil::encode( void ECUtil::HashInfo::append(uint64_t old_size, map &to_append) { - assert(to_append.size() == cumulative_shard_hashes.size()); assert(old_size == total_chunk_size); uint64_t size_to_append = to_append.begin()->second.length(); - for (map::iterator i = to_append.begin(); - i != to_append.end(); - ++i) { - assert(size_to_append == i->second.length()); - assert((unsigned)i->first < cumulative_shard_hashes.size()); - uint32_t new_hash = i->second.crc32c(cumulative_shard_hashes[i->first]); - cumulative_shard_hashes[i->first] = new_hash; + if (has_chunk_hash()) { + assert(to_append.size() == cumulative_shard_hashes.size()); + for (map::iterator i = to_append.begin(); + i != to_append.end(); + ++i) { + assert(size_to_append == i->second.length()); + assert((unsigned)i->first < cumulative_shard_hashes.size()); + uint32_t new_hash = i->second.crc32c(cumulative_shard_hashes[i->first]); + cumulative_shard_hashes[i->first] = new_hash; + } } total_chunk_size += size_to_append; } @@ -166,6 +168,7 @@ void ECUtil::HashInfo::decode(bufferlist::iterator &bl) DECODE_START(1, bl); ::decode(total_chunk_size, bl); ::decode(cumulative_shard_hashes, bl); + projected_total_chunk_size = total_chunk_size; DECODE_FINISH(bl); } diff --git a/src/osd/ECUtil.h b/src/osd/ECUtil.h index 2706bba67fc..4ca6f550bee 100644 --- a/src/osd/ECUtil.h +++ b/src/osd/ECUtil.h @@ -41,6 +41,9 @@ public: chunk_size(stripe_width / stripe_size) { assert(stripe_width % stripe_size == 0); } + bool logical_offset_is_stripe_aligned(uint64_t logical) const { + return (logical % stripe_width) == 0; + } uint64_t get_stripe_width() const { return stripe_width; } @@ -104,12 +107,14 @@ int encode( map *out); class HashInfo { - uint64_t total_chunk_size; + uint64_t total_chunk_size = 0; vector cumulative_shard_hashes; + + // purely ephemeral, represents the size once all in-flight ops commit + uint64_t projected_total_chunk_size = 0; public: - HashInfo() : total_chunk_size(0) {} - explicit HashInfo(unsigned num_chunks) - : total_chunk_size(0), + HashInfo() {} + explicit HashInfo(unsigned num_chunks) : cumulative_shard_hashes(num_chunks, -1) {} void append(uint64_t old_size, map &to_append); void clear() { @@ -129,7 +134,38 @@ public: uint64_t get_total_chunk_size() const { return total_chunk_size; } + uint64_t get_projected_total_chunk_size() const { + return projected_total_chunk_size; + } + uint64_t get_total_logical_size(const stripe_info_t &sinfo) const { + return get_total_chunk_size() * + (sinfo.get_stripe_width()/sinfo.get_chunk_size()); + } + uint64_t get_projected_total_logical_size(const stripe_info_t &sinfo) const { + return get_projected_total_chunk_size() * + (sinfo.get_stripe_width()/sinfo.get_chunk_size()); + } + void set_projected_total_logical_size( + const stripe_info_t &sinfo, + uint64_t logical_size) { + assert(sinfo.logical_offset_is_stripe_aligned(logical_size)); + projected_total_chunk_size = sinfo.aligned_logical_offset_to_chunk_offset( + logical_size); + } + void set_total_chunk_size_clear_hash(uint64_t new_chunk_size) { + cumulative_shard_hashes.clear(); + total_chunk_size = new_chunk_size; + } + bool has_chunk_hash() const { + return !cumulative_shard_hashes.empty(); + } + void update_to(const HashInfo &rhs) { + auto ptcs = projected_total_chunk_size; + *this = rhs; + projected_total_chunk_size = ptcs; + } }; + typedef ceph::shared_ptr HashInfoRef; bool is_hinfo_key_string(const string &key); diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 5e1a4a9e10e..ca28a1d50ce 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -12935,6 +12935,13 @@ void ReplicatedPG::_scrub_finish() bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB); const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub")); + if (pool.info.is_hacky_ecoverwrites()) { + dout(10) << __func__ + << ": skipping stat comparisons since hacky_overwrites are enabled" + << dendl; + return; + } + if (info.stats.stats_invalid) { info.stats.stats = scrub_cstat; info.stats.stats_invalid = false; diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index 3c3ab2fff6c..a831df932e1 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -1406,6 +1406,10 @@ public: } uint64_t required_alignment() const { return stripe_width; } + bool is_hacky_ecoverwrites() const { + return has_flag(FLAG_EC_OVERWRITES); + } + bool can_shift_osds() const { switch (get_type()) { case TYPE_REPLICATED: