Implements the rmw pipeline and integrates the cache.
HashInfo now maintains a projected size for use during the planning
phase of the pipeline.
(Doesn't build without subsequent patches, not worth stubbing out
the interfaces)
Signed-off-by: Samuel Just <sjust@redhat.com>
-Subproject commit 0b00610443a916fabc6668c03337f64d1f773ec9
+Subproject commit b5c863495c16975478aa5fc2ca33293c2e0c1a5f
append("\n", 1);
}
}
+
+ void buffer::list::prepend_zero(unsigned len)
+ {
+ ptr bp(len);
+ bp.zero(false);
+ _len += len;
+ _buffers.emplace_front(std::move(bp));
+ }
void buffer::list::append_zero(unsigned len)
{
void append(const list& bl);
void append(std::istream& in);
void append_zero(unsigned len);
+ void prepend_zero(unsigned len);
/*
* get a char
list<ECBackend::RecoveryOp> ops;
};
+ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs) {
+ switch (rhs.pipeline_state) {
+ case ECBackend::pipeline_state_t::CACHE_VALID:
+ return lhs << "CACHE_VALID";
+ case ECBackend::pipeline_state_t::CACHE_INVALID:
+ return lhs << "CACHE_INVALID";
+ default:
+ assert(0 == "invalid pipeline state");
+ }
+ return lhs; // unreachable
+}
+
static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
{
lhs << "[";
lhs << " client_op=";
rhs.client_op->get_req()->print(lhs);
}
- lhs << " pending_commit=" << rhs.pending_commit
+ lhs << " roll_forward_to=" << rhs.roll_forward_to
+ << " temp_added=" << rhs.temp_added
+ << " pending_commit=" << rhs.pending_commit
+ << " temp_cleared=" << rhs.temp_cleared
+ << " pending_read=" << rhs.pending_read
+ << " remote_read=" << rhs.remote_read
+ << " remote_read_result=" << rhs.remote_read_result
<< " pending_apply=" << rhs.pending_apply
+ << " pending_commit=" << rhs.pending_commit
+ << " plan.to_read=" << rhs.plan.to_read
+ << " plan.will_write=" << rhs.plan.will_write
<< ")";
return lhs;
}
op.state = RecoveryOp::READING;
assert(!op.recovery_progress.data_complete);
set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
+ uint64_t from = op.recovery_progress.data_recovered_to;
+ uint64_t amount = get_recovery_chunk_size();
+
+ if (op.recovery_progress.first && op.obc) {
+ /* We've got the attrs and the hinfo, might as well use them */
+ op.hinfo = get_hash_info(op.hoid);
+ assert(op.hinfo);
+ op.xattrs = op.obc->attr_cache;
+ ::encode(*(op.hinfo), op.xattrs[ECUtil::get_hinfo_key()]);
+ }
+
set<pg_shard_t> to_read;
- uint64_t recovery_max_chunk = get_recovery_chunk_size();
int r = get_min_avail_to_read_shards(
op.hoid, want, true, false, &to_read);
if (r != 0) {
this,
op.hoid,
op.recovery_progress.data_recovered_to,
- recovery_max_chunk,
+ amount,
to_read,
- op.recovery_progress.first);
- op.extent_requested = make_pair(op.recovery_progress.data_recovered_to,
- recovery_max_chunk);
+ op.recovery_progress.first && !op.obc);
+ op.extent_requested = make_pair(
+ from,
+ amount);
dout(10) << __func__ << ": IDLE return " << op << dendl;
return;
}
new SubWriteApplied(this, msg, op.tid, op.at_version)));
vector<ObjectStore::Transaction> tls;
tls.reserve(2);
- tls.push_back(std::move(localt));
tls.push_back(std::move(op.t));
+ tls.push_back(std::move(localt));
get_parent()->queue_transactions(tls, msg);
}
i != op.to_read.end();
++i) {
int r = 0;
- ECUtil::HashInfoRef hinfo = get_hash_info(i->first);
- if (!hinfo) {
- r = -EIO;
- get_parent()->clog_error() << __func__ << ": No hinfo for " << i->first << "\n";
- dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
- goto error;
+ ECUtil::HashInfoRef hinfo;
+ if (!get_parent()->get_pool().is_hacky_ecoverwrites()) {
+ hinfo = get_hash_info(i->first);
+ if (!hinfo) {
+ r = -EIO;
+ get_parent()->clog_error() << __func__ << ": No hinfo for " << i->first << "\n";
+ dout(5) << __func__ << ": No hinfo for " << i->first << dendl;
+ goto error;
+ }
}
for (list<boost::tuple<uint64_t, uint64_t, uint32_t> >::iterator j =
i->second.begin(); j != i->second.end(); ++j) {
);
}
- // This shows that we still need deep scrub because large enough files
- // are read in sections, so the digest check here won't be done here.
- // Do NOT check osd_read_eio_on_bad_digest here. We need to report
- // the state of our chunk in case other chunks could substitute.
- if ((bl.length() == hinfo->get_total_chunk_size()) &&
- (j->get<0>() == 0)) {
- dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
- bufferhash h(-1);
- h << bl;
- if (h.digest() != hinfo->get_chunk_hash(shard)) {
- get_parent()->clog_error() << __func__ << ": Bad hash for " << i->first << " digest 0x"
- << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << "\n";
- dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
- << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
- r = -EIO;
- goto error;
+ if (!get_parent()->get_pool().is_hacky_ecoverwrites()) {
+ // This shows that we still need deep scrub because large enough files
+ // are read in sections, so the digest check here won't be done here.
+ // Do NOT check osd_read_eio_on_bad_digest here. We need to report
+ // the state of our chunk in case other chunks could substitute.
+ assert(hinfo->has_chunk_hash());
+ if ((bl.length() == hinfo->get_total_chunk_size()) &&
+ (j->get<0>() == 0)) {
+ dout(20) << __func__ << ": Checking hash of " << i->first << dendl;
+ bufferhash h(-1);
+ h << bl;
+ if (h.digest() != hinfo->get_chunk_hash(shard)) {
+ get_parent()->clog_error() << __func__ << ": Bad hash for " << i->first << " digest 0x"
+ << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << "\n";
+ dout(5) << __func__ << ": Bad hash for " << i->first << " digest 0x"
+ << hex << h.digest() << " expected 0x" << hinfo->get_chunk_hash(shard) << dec << dendl;
+ r = -EIO;
+ goto error;
+ }
}
}
}
assert(i->second.pending_apply.count(from));
i->second.pending_apply.erase(from);
}
- check_op(&(i->second));
+
+ if (i->second.pending_apply.empty() && i->second.on_all_applied) {
+ dout(10) << __func__ << " Calling on_all_applied on " << i->second << dendl;
+ i->second.on_all_applied->complete(0);
+ i->second.on_all_applied = 0;
+ }
+ if (i->second.pending_commit.empty() && i->second.on_all_commit) {
+ dout(10) << __func__ << " Calling on_all_commit on " << i->second << dendl;
+ i->second.on_all_commit->complete(0);
+ i->second.on_all_commit = 0;
+ }
+ check_ops();
}
void ECBackend::handle_sub_read_reply(
void ECBackend::on_change()
{
dout(10) << __func__ << dendl;
- writing.clear();
+
+ completed_to = eversion_t();
+ committed_to = eversion_t();
+ pipeline_state.clear();
+ waiting_reads.clear();
+ waiting_state.clear();
+ waiting_commit.clear();
+ for (auto &&op: tid_to_op_map) {
+ cache.release_write_pin(op.second.pin);
+ }
+ tid_to_op_map.clear();
+
tid_to_op_map.clear();
for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
i != tid_to_read_map.end();
}
}
tid_to_read_map.clear();
- for (list<ClientAsyncReadStatus>::iterator i = in_progress_client_reads.begin();
- i != in_progress_client_reads.end();
- ++i) {
- delete i->on_complete;
- i->on_complete = NULL;
- }
in_progress_client_reads.clear();
shard_to_read_map.clear();
clear_recovery_state();
void ECBackend::submit_transaction(
const hobject_t &hoid,
+ const object_stat_sum_t &delta_stats,
const eversion_t &at_version,
- PGTransactionUPtr &&_t,
+ PGTransactionUPtr &&t,
const eversion_t &trim_to,
const eversion_t &roll_forward_to,
const vector<pg_log_entry_t> &log_entries,
assert(!tid_to_op_map.count(tid));
Op *op = &(tid_to_op_map[tid]);
op->hoid = hoid;
+ op->delta_stats = delta_stats;
op->version = at_version;
op->trim_to = trim_to;
- op->roll_forward_to = roll_forward_to;
+ op->roll_forward_to = MAX(roll_forward_to, committed_to);
op->log_entries = log_entries;
std::swap(op->updated_hit_set_history, hset_history);
op->on_local_applied_sync = on_local_applied_sync;
op->reqid = reqid;
op->client_op = client_op;
- op->t = std::move(_t);
-
- set<hobject_t, hobject_t::BitwiseComparator> need_hinfos;
- ECTransaction::get_append_objects(*(op->t), &need_hinfos);
- for (set<hobject_t, hobject_t::BitwiseComparator>::iterator i = need_hinfos.begin();
- i != need_hinfos.end();
- ++i) {
- ECUtil::HashInfoRef ref = get_hash_info(*i, false);
- if (!ref) {
- derr << __func__ << ": get_hash_info(" << *i << ")"
- << " returned a null pointer and there is no "
- << " way to recover from such an error in this "
- << " context" << dendl;
- assert(0);
- }
- op->unstable_hash_infos.insert(
- make_pair(
- *i,
- ref));
- }
-
dout(10) << __func__ << ": op " << *op << " starting" << dendl;
- start_write(op);
- writing.push_back(op);
+ start_rmw(op, std::move(t));
dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
}
+void ECBackend::call_write_ordered(std::function<void(void)> &&cb) {
+ if (!waiting_state.empty()) {
+ waiting_state.back().on_write.emplace_back(std::move(cb));
+ } else if (!waiting_reads.empty()) {
+ waiting_reads.back().on_write.emplace_back(std::move(cb));
+ } else {
+ // Nothing earlier in the pipeline, just call it
+ cb();
+ }
+}
+
int ECBackend::get_min_avail_to_read_shards(
const hobject_t &hoid,
const set<int> &want,
op.for_recovery = for_recovery;
dout(10) << __func__ << ": starting " << op << dendl;
do_read_op(
- op,
- to_read);
+ op);
}
-void ECBackend::do_read_op(
- ReadOp &op,
- map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read)
+void ECBackend::do_read_op(ReadOp &op)
{
int priority = op.priority;
ceph_tid_t tid = op.tid;
- op.to_read.swap(to_read);
- dout(10) << __func__ << ": starting additional " << op << dendl;
+ dout(10) << __func__ << ": starting read " << op << dendl;
map<pg_shard_t, ECSubRead> messages;
for (map<hobject_t, read_request_t,
i != op.to_read.end();
++i) {
bool need_attrs = i->second.want_attrs;
+ list<boost::tuple<
+ uint64_t, uint64_t, map<pg_shard_t, bufferlist> > > &reslist =
+ op.complete[i->first].returned;
for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
j != i->second.need.end();
++j) {
i->second.to_read.begin();
j != i->second.to_read.end();
++j) {
+ reslist.push_back(
+ boost::make_tuple(
+ j->get<0>(),
+ j->get<1>(),
+ map<pg_shard_t, bufferlist>()));
pair<uint64_t, uint64_t> chunk_off_len =
sinfo.aligned_offset_len_to_chunk(make_pair(j->get<0>(), j->get<1>()));
for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
k != i->second.need.end();
++k) {
- messages[*k].to_read[i->first].push_back(boost::make_tuple(chunk_off_len.first,
- chunk_off_len.second,
- j->get<2>()));
+ messages[*k].to_read[i->first].push_back(
+ boost::make_tuple(
+ chunk_off_len.first,
+ chunk_off_len.second,
+ j->get<2>()));
}
assert(!need_attrs);
}
msg,
get_parent()->get_epoch());
}
- dout(10) << __func__ << ": started additional " << op << dendl;
+ dout(10) << __func__ << ": started " << op << dendl;
}
ECUtil::HashInfoRef ECBackend::get_hash_info(
return ref;
}
-void ECBackend::check_op(Op *op)
+void ECBackend::start_rmw(Op *op, PGTransactionUPtr &&t)
{
- if (op->pending_apply.empty() && op->on_all_applied) {
- dout(10) << __func__ << " Calling on_all_applied on " << *op << dendl;
- op->on_all_applied->complete(0);
- op->on_all_applied = 0;
- }
- if (op->pending_commit.empty() && op->on_all_commit) {
- dout(10) << __func__ << " Calling on_all_commit on " << *op << dendl;
- op->on_all_commit->complete(0);
- op->on_all_commit = 0;
- }
- if (op->pending_apply.empty() && op->pending_commit.empty()) {
- // done!
- assert(writing.front() == op);
- dout(10) << __func__ << " Completing " << *op << dendl;
- writing.pop_front();
- tid_to_op_map.erase(op->tid);
- }
- for (map<ceph_tid_t, Op>::iterator i = tid_to_op_map.begin();
- i != tid_to_op_map.end();
- ++i) {
- dout(20) << __func__ << " tid " << i->first <<": " << i->second << dendl;
+ assert(op);
+
+ op->plan = ECTransaction::get_write_plan(
+ sinfo,
+ std::move(t),
+ [&](const hobject_t &i) {
+ ECUtil::HashInfoRef ref = get_hash_info(i, false);
+ if (!ref) {
+ derr << __func__ << ": get_hash_info(" << i << ")"
+ << " returned a null pointer and there is no "
+ << " way to recover from such an error in this "
+ << " context" << dendl;
+ assert(0);
+ }
+ return ref;
+ },
+ get_parent()->get_dpp());
+
+ dout(10) << __func__ << ": " << *op << dendl;
+
+ waiting_state.push_back(*op);
+ check_ops();
+}
+
+bool ECBackend::try_state_to_reads()
+{
+ if (waiting_state.empty())
+ return false;
+
+ Op *op = &(waiting_state.front());
+ if (op->requires_rmw() && pipeline_state.cache_invalid()) {
+ dout(20) << __func__ << ": blocking " << *op
+ << " because it requires an rmw and the cache is invalid"
+ << pipeline_state
+ << dendl;
+ return false;
+ }
+
+ if (op->invalidates_cache()) {
+ dout(20) << __func__ << ": invalidating cache after this op"
+ << dendl;
+ pipeline_state.invalidate();
}
+
+ waiting_state.pop_front();
+ waiting_reads.push_back(*op);
+
+ if (op->requires_rmw() || pipeline_state.caching_enabled()) {
+ cache.open_write_pin(op->pin);
+
+ extent_set empty;
+ for (auto &&hpair: op->plan.will_write) {
+ auto to_read_plan_iter = op->plan.to_read.find(hpair.first);
+ const extent_set &to_read_plan =
+ to_read_plan_iter == op->plan.to_read.end() ?
+ empty :
+ to_read_plan_iter->second;
+
+ extent_set remote_read = cache.reserve_extents_for_rmw(
+ hpair.first,
+ op->pin,
+ hpair.second,
+ to_read_plan);
+
+ extent_set pending_read = to_read_plan;
+ pending_read.subtract(remote_read);
+
+ if (!remote_read.empty()) {
+ op->remote_read[hpair.first] = std::move(remote_read);
+ }
+ if (!pending_read.empty()) {
+ op->pending_read[hpair.first] = std::move(pending_read);
+ }
+ }
+ } else {
+ op->remote_read = op->plan.to_read;
+ }
+
+ dout(10) << __func__ << ": " << *op << dendl;
+
+ if (!op->remote_read.empty()) {
+ objects_read_async_no_cache(
+ op->remote_read,
+ [this, op](hobject_t::bitwisemap<extent_map> &&results) {
+ op->remote_read_result = std::move(results);
+ check_ops();
+ });
+ }
+
+ return true;
}
-void ECBackend::start_write(Op *op) {
+bool ECBackend::try_reads_to_commit()
+{
+ if (waiting_reads.empty())
+ return false;
+ Op *op = &(waiting_reads.front());
+ if (op->read_in_progress())
+ return false;
+ waiting_reads.pop_front();
+ waiting_commit.push_back(*op);
+
+ dout(10) << __func__ << ": starting commit on " << *op << dendl;
+ dout(20) << __func__ << ": " << cache << dendl;
+
+ get_parent()->apply_stats(
+ op->hoid,
+ op->delta_stats);
+
+ if (pipeline_state.caching_enabled() || op->requires_rmw()) {
+ for (auto &&hpair: op->pending_read) {
+ op->remote_read_result[hpair.first].insert(
+ cache.get_remaining_extents_for_rmw(
+ hpair.first,
+ op->pin,
+ hpair.second));
+ }
+ op->pending_read.clear();
+ } else {
+ assert(op->pending_read.empty());
+ }
+
map<shard_id_t, ObjectStore::Transaction> trans;
for (set<pg_shard_t>::const_iterator i =
get_parent()->get_actingbackfill_shards().begin();
++i) {
trans[i->shard];
}
- ObjectStore::Transaction empty;
- ECTransaction::generate_transactions(
- *(op->t),
- op->unstable_hash_infos,
- ec_impl,
- get_parent()->get_info().pgid.pgid,
- sinfo,
- op->log_entries,
- &trans,
- &(op->temp_added),
- &(op->temp_cleared));
+ hobject_t::bitwisemap<extent_map> written;
+ if (op->plan.t) {
+ ECTransaction::generate_transactions(
+ op->plan,
+ ec_impl,
+ get_parent()->get_info().pgid.pgid,
+ !get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN),
+ sinfo,
+ op->remote_read_result,
+ op->log_entries,
+ &written,
+ &trans,
+ &(op->temp_added),
+ &(op->temp_cleared),
+ get_parent()->get_dpp());
+ }
+
+ dout(20) << __func__ << ": " << cache << dendl;
+ dout(20) << __func__ << ": written: " << written << dendl;
+ dout(20) << __func__ << ": op: " << *op << dendl;
+
+ if (!get_parent()->get_pool().is_hacky_ecoverwrites()) {
+ for (auto &&i: op->log_entries) {
+ if (i.requires_kraken()) {
+ derr << __func__ << ": log entry " << i << " requires kraken"
+ << " but overwrites are not enabled!" << dendl;
+ assert(0);
+ }
+ }
+ }
+
+ hobject_t::bitwisemap<extent_set> written_set;
+ for (auto &&i: written) {
+ written_set[i.first] = i.second.get_interval_set();
+ }
+ dout(20) << __func__ << ": written_set: " << written_set << dendl;
+ assert(written_set == op->plan.will_write);
+
+ if (pipeline_state.caching_enabled() || op->requires_rmw()) {
+ for (auto &&hpair: written) {
+ dout(20) << __func__ << ": " << hpair << dendl;
+ cache.present_rmw_update(hpair.first, op->pin, hpair.second);
+ }
+ }
+ op->remote_read.clear();
+ op->remote_read_result.clear();
dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
+ ObjectStore::Transaction empty;
for (set<pg_shard_t>::const_iterator i =
get_parent()->get_actingbackfill_shards().begin();
i->osd, r, get_parent()->get_epoch());
}
}
+
+ for (auto i = op->on_write.begin();
+ i != op->on_write.end();
+ op->on_write.erase(i++)) {
+ (*i)();
+ }
+
+ return true;
+}
+
+bool ECBackend::try_finish_rmw()
+{
+ if (waiting_commit.empty())
+ return false;
+ Op *op = &(waiting_commit.front());
+ if (op->write_in_progress())
+ return false;
+ waiting_commit.pop_front();
+
+ dout(10) << __func__ << ": " << *op << dendl;
+ dout(20) << __func__ << ": " << cache << dendl;
+
+ if (op->roll_forward_to > completed_to)
+ completed_to = op->roll_forward_to;
+ if (op->version > committed_to)
+ committed_to = op->version;
+
+ if (get_osdmap()->test_flag(CEPH_OSDMAP_REQUIRE_KRAKEN)) {
+ if (op->version > get_parent()->get_log().get_can_rollback_to() &&
+ waiting_reads.empty() &&
+ waiting_commit.empty()) {
+ // submit a dummy transaction to kick the rollforward
+ auto tid = get_parent()->get_tid();
+ Op *nop = &(tid_to_op_map[tid]);
+ nop->hoid = op->hoid;
+ nop->trim_to = op->trim_to;
+ nop->roll_forward_to = op->version;
+ nop->tid = tid;
+ nop->reqid = op->reqid;
+ waiting_reads.push_back(*nop);
+ }
+ }
+
+ if (pipeline_state.caching_enabled() || op->requires_rmw()) {
+ cache.release_write_pin(op->pin);
+ }
+ tid_to_op_map.erase(op->tid);
+
+ if (waiting_reads.empty() &&
+ waiting_commit.empty()) {
+ pipeline_state.clear();
+ dout(20) << __func__ << ": clearing pipeline_state "
+ << pipeline_state
+ << dendl;
+ }
+ return true;
+}
+
+void ECBackend::check_ops()
+{
+ while (try_state_to_reads() ||
+ try_reads_to_commit() ||
+ try_finish_rmw());
}
int ECBackend::objects_read_sync(
return -EOPNOTSUPP;
}
+void ECBackend::objects_read_async(
+ const hobject_t &hoid,
+ const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+ pair<bufferlist*, Context*> > > &to_read,
+ Context *on_complete,
+ bool fast_read)
+{
+ hobject_t::bitwisemap<std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > >
+ reads;
+
+ uint32_t flags = 0;
+ extent_set es;
+ for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+ pair<bufferlist*, Context*> > >::const_iterator i =
+ to_read.begin();
+ i != to_read.end();
+ ++i) {
+ pair<uint64_t, uint64_t> tmp =
+ sinfo.offset_len_to_stripe_bounds(
+ make_pair(i->first.get<0>(), i->first.get<1>()));
+
+ extent_set esnew;
+ esnew.insert(tmp.first, tmp.second);
+ es.union_of(esnew);
+ flags |= i->first.get<2>();
+ }
+
+ if (!es.empty()) {
+ auto &offsets = reads[hoid];
+ for (auto j = es.begin();
+ j != es.end();
+ ++j) {
+ offsets.push_back(
+ boost::make_tuple(
+ j.get_start(),
+ j.get_len(),
+ flags));
+ }
+ }
+
+ struct cb {
+ ECBackend *ec;
+ hobject_t hoid;
+ list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+ pair<bufferlist*, Context*> > > to_read;
+ unique_ptr<Context> on_complete;
+ cb(const cb&) = delete;
+ cb(cb &&) = default;
+ cb(ECBackend *ec,
+ const hobject_t &hoid,
+ const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
+ pair<bufferlist*, Context*> > > &to_read,
+ Context *on_complete)
+ : ec(ec),
+ hoid(hoid),
+ to_read(to_read),
+ on_complete(on_complete) {}
+ void operator()(hobject_t::bitwisemap<extent_map> &&results) {
+ auto dpp = ec->get_parent()->get_dpp();
+ ldpp_dout(dpp, 20) << "objects_read_async_cb: got: " << results
+ << dendl;
+ ldpp_dout(dpp, 20) << "objects_read_async_cb: cache: " << ec->cache
+ << dendl;
+
+ auto &got = results[hoid];
+
+ for (auto &&read: to_read) {
+ assert(read.second.first);
+ uint64_t offset = read.first.get<0>();
+ uint64_t length = read.first.get<1>();
+ auto range = got.get_containing_range(offset, length);
+ assert(range.first != range.second);
+ assert(range.first.get_off() <= offset);
+ assert(
+ (offset + length) <=
+ (range.first.get_off() + range.first.get_len()));
+ read.second.first->substr_of(
+ range.first.get_val(),
+ offset - range.first.get_off(),
+ length);
+ if (read.second.second) {
+ read.second.second->complete(length);
+ read.second.second = nullptr;
+ }
+ }
+ to_read.clear();
+ if (on_complete) {
+ on_complete.release()->complete(0);
+ }
+ }
+ ~cb() {
+ for (auto &&i: to_read) {
+ delete i.second.second;
+ }
+ to_read.clear();
+ }
+ };
+ objects_read_and_reconstruct(
+ reads,
+ fast_read,
+ make_gen_lambda_context<hobject_t::bitwisemap<extent_map> &&, cb>(
+ cb(this,
+ hoid,
+ to_read,
+ on_complete)));
+}
+
struct CallClientContexts :
public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
+ hobject_t hoid;
ECBackend *ec;
ECBackend::ClientAsyncReadStatus *status;
- list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
- pair<bufferlist*, Context*> > > to_read;
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
CallClientContexts(
+ hobject_t hoid,
ECBackend *ec,
ECBackend::ClientAsyncReadStatus *status,
- const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
- pair<bufferlist*, Context*> > > &to_read)
- : ec(ec), status(status), to_read(to_read) {}
+ const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
+ : hoid(hoid), ec(ec), status(status), to_read(to_read) {}
void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) {
ECBackend::read_result_t &res = in.second;
+ extent_map result;
if (res.r != 0)
goto out;
assert(res.returned.size() == to_read.size());
assert(res.r == 0);
assert(res.errors.empty());
- for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
- pair<bufferlist*, Context*> > >::iterator i = to_read.begin();
- i != to_read.end();
- to_read.erase(i++)) {
+ for (auto &&read: to_read) {
pair<uint64_t, uint64_t> adjusted =
- ec->sinfo.offset_len_to_stripe_bounds(make_pair(i->first.get<0>(), i->first.get<1>()));
+ ec->sinfo.offset_len_to_stripe_bounds(
+ make_pair(read.get<0>(), read.get<1>()));
assert(res.returned.front().get<0>() == adjusted.first &&
res.returned.front().get<1>() == adjusted.second);
map<int, bufferlist> to_decode;
res.r = r;
goto out;
}
- assert(i->second.second);
- assert(i->second.first);
- i->second.first->substr_of(
+ bufferlist trimmed;
+ trimmed.substr_of(
bl,
- i->first.get<0>() - adjusted.first,
- MIN(i->first.get<1>(), bl.length() - (i->first.get<0>() - adjusted.first)));
- if (i->second.second) {
- i->second.second->complete(i->second.first->length());
- }
+ read.get<0>() - adjusted.first,
+ MIN(read.get<1>(),
+ bl.length() - (read.get<0>() - adjusted.first)));
+ result.insert(
+ read.get<0>(), trimmed.length(), std::move(trimmed));
res.returned.pop_front();
}
out:
- status->complete = true;
- list<ECBackend::ClientAsyncReadStatus> &ip =
- ec->in_progress_client_reads;
- while (ip.size() && ip.front().complete) {
- if (ip.front().on_complete) {
- ip.front().on_complete->complete(res.r);
- ip.front().on_complete = NULL;
- }
- ip.pop_front();
- }
- }
- ~CallClientContexts() {
- for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
- pair<bufferlist*, Context*> > >::iterator i = to_read.begin();
- i != to_read.end();
- to_read.erase(i++)) {
- delete i->second.second;
- }
+ status->complete_object(hoid, std::move(result));
+ ec->kick_reads();
}
};
-void ECBackend::objects_read_async(
- const hobject_t &hoid,
- const list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
- pair<bufferlist*, Context*> > > &to_read,
- Context *on_complete,
- bool fast_read)
+void ECBackend::objects_read_and_reconstruct(
+ const hobject_t::bitwisemap<
+ std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+ > &reads,
+ bool fast_read,
+ GenContextURef<hobject_t::bitwisemap<extent_map> &&> &&func)
{
- in_progress_client_reads.push_back(ClientAsyncReadStatus(on_complete));
- CallClientContexts *c = new CallClientContexts(
- this, &(in_progress_client_reads.back()), to_read);
-
- list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets;
- pair<uint64_t, uint64_t> tmp;
- for (list<pair<boost::tuple<uint64_t, uint64_t, uint32_t>,
- pair<bufferlist*, Context*> > >::const_iterator i =
- to_read.begin();
- i != to_read.end();
- ++i) {
- tmp = sinfo.offset_len_to_stripe_bounds(make_pair(i->first.get<0>(), i->first.get<1>()));
- offsets.push_back(boost::make_tuple(tmp.first, tmp.second, i->first.get<2>()));
+ in_progress_client_reads.emplace_back(
+ reads.size(), std::move(func));
+ if (!reads.size()) {
+ kick_reads();
+ return;
}
set<int> want_to_read;
get_want_to_read_shards(&want_to_read);
- set<pg_shard_t> shards;
- int r = get_min_avail_to_read_shards(
- hoid,
- want_to_read,
- false,
- fast_read,
- &shards);
- assert(r == 0);
-
map<hobject_t, read_request_t, hobject_t::BitwiseComparator> for_read_op;
- for_read_op.insert(
- make_pair(
- hoid,
- read_request_t(
- offsets,
- shards,
- false,
- c)));
+ for (auto &&to_read: reads) {
+ set<pg_shard_t> shards;
+ int r = get_min_avail_to_read_shards(
+ to_read.first,
+ want_to_read,
+ false,
+ fast_read,
+ &shards);
+ assert(r == 0);
+
+ CallClientContexts *c = new CallClientContexts(
+ to_read.first,
+ this,
+ &(in_progress_client_reads.back()),
+ to_read.second);
+ for_read_op.insert(
+ make_pair(
+ to_read.first,
+ read_request_t(
+ to_read.second,
+ shards,
+ false,
+ c)));
+ }
start_read_op(
CEPH_MSG_PRIO_DEFAULT,
dout(10) << __func__ << " Read remaining shards " << shards << dendl;
- list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets = rop.to_read.find(hoid)->second.to_read;
- GenContext<pair<RecoveryMessages *, read_result_t& > &> *c = rop.to_read.find(hoid)->second.cb;
+ // TODOSAM: this doesn't seem right
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
+ rop.to_read.find(hoid)->second.to_read;
+ GenContext<pair<RecoveryMessages *, read_result_t& > &> *c =
+ rop.to_read.find(hoid)->second.cb;
map<hobject_t, read_request_t, hobject_t::BitwiseComparator> for_read_op;
for_read_op.insert(
false,
c)));
- do_read_op(rop, for_read_op);
+ rop.to_read.swap(for_read_op);
+ do_read_op(rop);
return 0;
}
o.digest_present = false;
return;
} else {
- if (hinfo->get_total_chunk_size() != pos) {
- dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl;
- o.ec_size_mismatch = true;
- return;
- }
+ if (!get_parent()->get_pool().is_hacky_ecoverwrites()) {
+ assert(hinfo->has_chunk_hash());
+ if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
+ dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl;
+ o.ec_hash_mismatch = true;
+ return;
+ }
- if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
- dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl;
- o.ec_hash_mismatch = true;
- return;
- }
+ if (hinfo->get_total_chunk_size() != pos) {
+ dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl;
+ o.ec_hash_mismatch = true;
+ return;
+ }
- /* We checked above that we match our own stored hash. We cannot
- * send a hash of the actual object, so instead we simply send
- * our locally stored hash of shard 0 on the assumption that if
- * we match our chunk hash and our recollection of the hash for
- * chunk 0 matches that of our peers, there is likely no corruption.
- */
- o.digest = hinfo->get_chunk_hash(0);
- o.digest_present = true;
+ /* We checked above that we match our own stored hash. We cannot
+ * send a hash of the actual object, so instead we simply send
+ * our locally stored hash of shard 0 on the assumption that if
+ * we match our chunk hash and our recollection of the hash for
+ * chunk 0 matches that of our peers, there is likely no corruption.
+ */
+ o.digest = hinfo->get_chunk_hash(0);
+ o.digest_present = true;
+ } else {
+ /* Hack! We must be using partial overwrites, and partial overwrites
+ * don't support deep-scrub yet
+ */
+ o.digest = 0;
+ o.digest_present = true;
+ }
}
o.omap_digest = seed;
#ifndef ECBACKEND_H
#define ECBACKEND_H
+#include <boost/intrusive/set.hpp>
+#include <boost/intrusive/list.hpp>
+
#include "OSD.h"
#include "PGBackend.h"
#include "erasure-code/ErasureCodeInterface.h"
#include "ECUtil.h"
#include "ECTransaction.h"
+#include "ExtentCache.h"
//forward declaration
struct ECSubWrite;
void dump_recovery_info(Formatter *f) const;
+ void call_write_ordered(std::function<void(void)> &&cb) override;
+
void submit_transaction(
const hobject_t &hoid,
+ const object_stat_sum_t &delta_stats,
const eversion_t &at_version,
PGTransactionUPtr &&t,
const eversion_t &trim_to,
* ensures that we won't ever have to restart a client initiated read in
* check_recovery_sources.
*/
+ void objects_read_and_reconstruct(
+ const hobject_t::bitwisemap<
+ std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
+ > &reads,
+ bool fast_read,
+ GenContextURef<hobject_t::bitwisemap<extent_map> &&> &&func);
+
friend struct CallClientContexts;
struct ClientAsyncReadStatus {
- bool complete;
- Context *on_complete;
- explicit ClientAsyncReadStatus(Context *on_complete)
- : complete(false), on_complete(on_complete) {}
+ unsigned objects_to_read;
+ GenContextURef<hobject_t::bitwisemap<extent_map> &&> func;
+ hobject_t::bitwisemap<extent_map> results;
+ explicit ClientAsyncReadStatus(
+ unsigned objects_to_read,
+ GenContextURef<hobject_t::bitwisemap<extent_map> &&> &&func)
+ : objects_to_read(objects_to_read), func(std::move(func)) {}
+ void complete_object(
+ const hobject_t &hoid,
+ extent_map &&buffers) {
+ assert(objects_to_read);
+ --objects_to_read;
+ assert(!results.count(hoid));
+ results.emplace(hoid, std::move(buffers));
+ }
+ bool is_complete() const {
+ return objects_to_read == 0;
+ }
+ void run() {
+ func.release()->complete(std::move(results));
+ }
};
list<ClientAsyncReadStatus> in_progress_client_reads;
void objects_read_async(
Context *on_complete,
bool fast_read = false);
+ template <typename Func>
+ void objects_read_async_no_cache(
+ const hobject_t::bitwisemap<extent_set> &to_read,
+ Func &&on_complete) {
+ hobject_t::bitwisemap<
+ std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > > _to_read;
+ for (auto &&hpair: to_read) {
+ auto &l = _to_read[hpair.first];
+ for (auto extent: hpair.second) {
+ l.emplace_back(extent.first, extent.second, 0);
+ }
+ }
+ objects_read_and_reconstruct(
+ _to_read,
+ false,
+ make_gen_lambda_context<hobject_t::bitwisemap<extent_map> &&, Func>(
+ std::forward<Func>(on_complete)));
+ }
+ void kick_reads() {
+ while (in_progress_client_reads.size() &&
+ in_progress_client_reads.front().is_complete()) {
+ in_progress_client_reads.front().run();
+ in_progress_client_reads.pop_front();
+ }
+ }
+
private:
friend struct ECRecoveryHandle;
uint64_t get_recovery_chunk_size() const {
}
// must be filled if state == WRITING
- map<shard_id_t, bufferlist> returned_data;
+ map<int, bufferlist> returned_data;
map<string, bufferlist> xattrs;
ECUtil::HashInfoRef hinfo;
ObjectContextRef obc;
friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs);
map<hobject_t, RecoveryOp, hobject_t::BitwiseComparator> recovery_ops;
+ void continue_recovery_op(
+ RecoveryOp &op,
+ RecoveryMessages *m);
+ void dispatch_recovery_messages(RecoveryMessages &m, int priority);
+ friend struct OnRecoveryReadComplete;
+ void handle_recovery_read_complete(
+ const hobject_t &hoid,
+ boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
+ boost::optional<map<string, bufferlist> > attrs,
+ RecoveryMessages *m);
+ void handle_recovery_push(
+ PushOp &op,
+ RecoveryMessages *m);
+ void handle_recovery_push_reply(
+ PushReplyOp &op,
+ pg_shard_t from,
+ RecoveryMessages *m);
+
public:
/**
* Low level async read mechanism
OpRequestRef op,
bool do_redundant_reads, bool for_recovery);
- void do_read_op(ReadOp &rop,
- map<hobject_t, read_request_t, hobject_t::BitwiseComparator> &to_read);
+ void do_read_op(ReadOp &rop);
int send_all_remaining_reads(
const hobject_t &hoid,
ReadOp &rop);
* completions. Thus, callbacks and completion are called in order
* on the writing list.
*/
- struct Op {
+ struct Op : boost::intrusive::list_base_hook<> {
+ /// From submit_transaction caller, decribes operation
hobject_t hoid;
+ object_stat_sum_t delta_stats;
eversion_t version;
eversion_t trim_to;
- eversion_t roll_forward_to;
- vector<pg_log_entry_t> log_entries;
- map<hobject_t, ObjectContextRef, hobject_t::BitwiseComparator> obc_map;
boost::optional<pg_hit_set_history_t> updated_hit_set_history;
- Context *on_local_applied_sync;
- Context *on_all_applied;
- Context *on_all_commit;
+ vector<pg_log_entry_t> log_entries;
ceph_tid_t tid;
osd_reqid_t reqid;
- OpRequestRef client_op;
- std::unique_ptr<PGTransaction> t;
+ eversion_t roll_forward_to; /// Soon to be generated internally
+
+ /// Ancillary also provided from submit_transaction caller
+ map<hobject_t, ObjectContextRef, hobject_t::BitwiseComparator> obc_map;
+
+ /// see call_write_ordered
+ std::list<std::function<void(void)> > on_write;
+ /// Generated internally
set<hobject_t, hobject_t::BitwiseComparator> temp_added;
set<hobject_t, hobject_t::BitwiseComparator> temp_cleared;
+ ECTransaction::WritePlan plan;
+ bool requires_rmw() const { return !plan.to_read.empty(); }
+ bool invalidates_cache() const { return plan.invalidates_cache; }
+
+ /// In progress read state;
+ hobject_t::bitwisemap<extent_set> pending_read; // subset already being read
+ hobject_t::bitwisemap<extent_set> remote_read; // subset we must read
+ hobject_t::bitwisemap<extent_map> remote_read_result;
+ bool read_in_progress() const {
+ return !remote_read.empty() && remote_read_result.empty();
+ }
+
+ /// In progress write state
set<pg_shard_t> pending_commit;
set<pg_shard_t> pending_apply;
+ bool write_in_progress() const {
+ return !pending_commit.empty() || !pending_apply.empty();
+ }
+
+ /// optional, may be null, for tracking purposes
+ OpRequestRef client_op;
+
+ /// pin for cache
+ ExtentCache::write_pin pin;
- map<hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator> unstable_hash_infos;
+ /// Callbacks
+ Context *on_local_applied_sync = nullptr;
+ Context *on_all_applied = nullptr;
+ Context *on_all_commit = nullptr;
~Op() {
delete on_local_applied_sync;
delete on_all_applied;
delete on_all_commit;
}
};
+ using op_list = boost::intrusive::list<Op>;
friend ostream &operator<<(ostream &lhs, const Op &rhs);
- void continue_recovery_op(
- RecoveryOp &op,
- RecoveryMessages *m);
+ ExtentCache cache;
+ map<ceph_tid_t, Op> tid_to_op_map; /// Owns Op structure
+
+ /**
+ * We model the possible rmw states as a set of waitlists.
+ * All writes at this time complete in order, so a write blocked
+ * at waiting_state blocks all writes behind it as well (same for
+ * other states).
+ *
+ * Future work: We can break this up into a per-object pipeline
+ * (almost). First, provide an ordering token to submit_transaction
+ * and require that all operations within a single transaction take
+ * place on a subset of hobject_t space partitioned by that token
+ * (the hashid seem about right to me -- even works for temp objects
+ * if you recall that a temp object created for object head foo will
+ * only ever be referenced by other transactions on foo and aren't
+ * reused). Next, factor this part into a class and maintain one per
+ * ordering token. Next, fixup ReplicatedPG's repop queue to be
+ * partitioned by ordering token. Finally, refactor the op pipeline
+ * so that the log entries passed into submit_tranaction aren't
+ * versioned. We can't assign versions to them until we actually
+ * submit the operation. That's probably going to be the hard part.
+ */
+ class pipeline_state_t {
+ enum {
+ CACHE_VALID = 0,
+ CACHE_INVALID = 1
+ } pipeline_state = CACHE_VALID;
+ public:
+ bool caching_enabled() const {
+ return pipeline_state == CACHE_VALID;
+ }
+ bool cache_invalid() const {
+ return !caching_enabled();
+ }
+ void invalidate() {
+ pipeline_state = CACHE_INVALID;
+ }
+ void clear() {
+ pipeline_state = CACHE_VALID;
+ }
+ friend ostream &operator<<(ostream &lhs, const pipeline_state_t &rhs);
+ } pipeline_state;
- void dispatch_recovery_messages(RecoveryMessages &m, int priority);
- friend struct OnRecoveryReadComplete;
- void handle_recovery_read_complete(
- const hobject_t &hoid,
- boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
- boost::optional<map<string, bufferlist> > attrs,
- RecoveryMessages *m);
- void handle_recovery_push(
- PushOp &op,
- RecoveryMessages *m);
- void handle_recovery_push_reply(
- PushReplyOp &op,
- pg_shard_t from,
- RecoveryMessages *m);
- map<ceph_tid_t, Op> tid_to_op_map; /// lists below point into here
- list<Op*> writing;
+ op_list waiting_state; /// writes waiting on pipe_state
+ op_list waiting_reads; /// writes waiting on partial stripe reads
+ op_list waiting_commit; /// writes waiting on initial commit
+ eversion_t completed_to;
+ eversion_t committed_to;
+ void start_rmw(Op *op, PGTransactionUPtr &&t);
+ bool try_state_to_reads();
+ bool try_reads_to_commit();
+ bool try_finish_rmw();
+ void check_ops();
CephContext *cct;
ErasureCodeInterfaceRef ec_impl;
const map<string,bufferptr> *attr = NULL);
friend struct ReadCB;
- void check_op(Op *op);
- void start_write(Op *op);
public:
ECBackend(
PGBackend::Listener *pg,
void _failed_push(const hobject_t &hoid,
pair<RecoveryMessages *, ECBackend::read_result_t &> &in);
};
+ostream &operator<<(ostream &lhs, const ECBackend::pipeline_state_t &rhs);
#endif
#include "os/ObjectStore.h"
#include "common/inline_variant.h"
-void ECTransaction::get_append_objects(
- const PGTransaction &t,
- set<hobject_t, hobject_t::BitwiseComparator> *out)
-{
- for (auto &&i: t.op_map) {
- out->insert(i.first);
- hobject_t source;
- if (i.second.has_source(&source))
- out->insert(source);
- }
-}
-void append(
+void encode_and_write(
pg_t pgid,
const hobject_t &oid,
const ECUtil::stripe_info_t &sinfo,
ErasureCodeInterfaceRef &ecimpl,
const set<int> &want,
uint64_t offset,
- bufferlist &bl,
+ bufferlist bl,
uint32_t flags,
ECUtil::HashInfoRef hinfo,
- map<shard_id_t, ObjectStore::Transaction> *transactions) {
-
+ extent_map &written,
+ map<shard_id_t, ObjectStore::Transaction> *transactions,
+ DoutPrefixProvider *dpp) {
+ const uint64_t before_size = hinfo->get_total_logical_size(sinfo);
+ assert(sinfo.logical_offset_is_stripe_aligned(offset));
+ assert(sinfo.logical_offset_is_stripe_aligned(bl.length()));
assert(bl.length());
- assert(offset % sinfo.get_stripe_width() == 0);
- assert(
- sinfo.aligned_logical_offset_to_chunk_offset(offset) ==
- hinfo->get_total_chunk_size());
- map<int, bufferlist> buffers;
- // align
- if (bl.length() % sinfo.get_stripe_width())
- bl.append_zero(
- sinfo.get_stripe_width() -
- ((offset + bl.length()) % sinfo.get_stripe_width()));
+ map<int, bufferlist> buffers;
int r = ECUtil::encode(
sinfo, ecimpl, bl, want, &buffers);
+ assert(r == 0);
- hinfo->append(
- sinfo.aligned_logical_offset_to_chunk_offset(offset),
- buffers);
- bufferlist hbuf;
- ::encode(*hinfo, hbuf);
+ written.insert(offset, bl.length(), bl);
+
+ ldpp_dout(dpp, 20) << __func__ << ": " << oid
+ << " new_size "
+ << offset + bl.length()
+ << dendl;
+
+ if (offset >= before_size) {
+ assert(offset == before_size);
+ hinfo->append(
+ sinfo.aligned_logical_offset_to_chunk_offset(offset),
+ buffers);
+ }
- assert(r == 0);
for (auto &&i : *transactions) {
assert(buffers.count(i.first));
bufferlist &enc_bl = buffers[i.first];
- i.second.set_alloc_hint(
- coll_t(spg_t(pgid, i.first)),
- ghobject_t(oid, ghobject_t::NO_GEN, i.first),
- 0, 0,
- CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
- CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
+ if (offset >= before_size) {
+ i.second.set_alloc_hint(
+ coll_t(spg_t(pgid, i.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+ 0, 0,
+ CEPH_OSD_ALLOC_HINT_FLAG_SEQUENTIAL_WRITE |
+ CEPH_OSD_ALLOC_HINT_FLAG_APPEND_ONLY);
+ }
i.second.write(
coll_t(spg_t(pgid, i.first)),
ghobject_t(oid, ghobject_t::NO_GEN, i.first),
enc_bl.length(),
enc_bl,
flags);
- i.second.setattr(
- coll_t(spg_t(pgid, i.first)),
- ghobject_t(oid, ghobject_t::NO_GEN, i.first),
- ECUtil::get_hinfo_key(),
- hbuf);
}
}
+bool ECTransaction::requires_overwrite(
+ uint64_t prev_size,
+ const PGTransaction::ObjectOperation &op) {
+ // special handling for truncates to 0
+ if (op.truncate && op.truncate->first == 0)
+ return false;
+ return op.is_none() &&
+ ((!op.buffer_updates.empty() &&
+ (op.buffer_updates.begin().get_off() < prev_size)) ||
+ (op.truncate &&
+ (op.truncate->first < prev_size)));
+}
+
void ECTransaction::generate_transactions(
- PGTransaction &t,
- map<
- hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator
- > &hash_infos,
+ WritePlan &plan,
ErasureCodeInterfaceRef &ecimpl,
pg_t pgid,
+ bool legacy_log_entries,
const ECUtil::stripe_info_t &sinfo,
+ const hobject_t::bitwisemap<extent_map> &partial_extents,
vector<pg_log_entry_t> &entries,
+ hobject_t::bitwisemap<extent_map> *written_map,
map<shard_id_t, ObjectStore::Transaction> *transactions,
set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
- stringstream *out)
+ DoutPrefixProvider *dpp)
{
+ assert(written_map);
+ assert(transactions);
+ assert(temp_added);
+ assert(temp_removed);
+ assert(plan.t);
+ auto &t = *(plan.t);
+
+ auto &hash_infos = plan.hash_infos;
+
assert(transactions);
assert(temp_added);
assert(temp_removed);
t.safe_create_traverse(
[&](pair<const hobject_t, PGTransaction::ObjectOperation> &opair) {
- const hobject_t &oid = opair.first;
+ const hobject_t &oid = opair.first;
+ auto &op = opair.second;
+ auto &obc_map = t.obc_map;
+ auto &written = (*written_map)[oid];
- auto iter = obj_to_log.find(oid);
- pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
+ auto iter = obj_to_log.find(oid);
+ pg_log_entry_t *entry = iter != obj_to_log.end() ? iter->second : nullptr;
- if (entry && opair.second.updated_snaps) {
- entry->mod_desc.update_snaps(opair.second.updated_snaps->first);
- vector<snapid_t> snaps(
- opair.second.updated_snaps->second.begin(),
- opair.second.updated_snaps->second.end());
- ::encode(snaps, entry->snaps);
- }
+ ObjectContextRef obc;
+ auto obiter = t.obc_map.find(oid);
+ if (obiter != t.obc_map.end()) {
+ obc = obiter->second;
+ }
+ if (entry) {
+ assert(obc);
+ } else {
+ assert(oid.is_temp());
+ }
- ObjectContextRef obc;
- auto obiter = t.obc_map.find(oid);
- if (obiter != t.obc_map.end()) {
- obc = obiter->second;
- }
- if (entry) {
- assert(obc);
- } else {
- assert(oid.is_temp());
- }
+ ECUtil::HashInfoRef hinfo;
+ {
+ auto iter = hash_infos.find(oid);
+ assert(iter != hash_infos.end());
+ hinfo = iter->second;
+ }
+
+ if (oid.is_temp()) {
+ if (op.is_fresh_object()) {
+ temp_added->insert(oid);
+ } else if (op.is_delete()) {
+ temp_removed->insert(oid);
+ }
+ }
+
+ if (entry &&
+ entry->is_modify() &&
+ op.updated_snaps) {
+ vector<snapid_t> snaps(
+ op.updated_snaps->second.begin(),
+ op.updated_snaps->second.end());
+ ::encode(snaps, entry->snaps);
+ }
+
+ ldpp_dout(dpp, 20) << "generate_transactions: "
+ << opair.first
+ << ", current size is "
+ << hinfo->get_total_logical_size(sinfo)
+ << " buffers are "
+ << op.buffer_updates
+ << dendl;
+ if (op.truncate) {
+ ldpp_dout(dpp, 20) << "generate_transactions: "
+ << " truncate is "
+ << *(op.truncate)
+ << dendl;
+ }
+
+ if (entry && op.updated_snaps) {
+ entry->mod_desc.update_snaps(op.updated_snaps->first);
+ }
- map<string, boost::optional<bufferlist> > xattr_rollback;
- ECUtil::HashInfoRef hinfo;
- {
- auto iter = hash_infos.find(oid);
- assert(iter != hash_infos.end());
- hinfo = iter->second;
+ map<string, boost::optional<bufferlist> > xattr_rollback;
+ assert(hinfo);
bufferlist old_hinfo;
::encode(*hinfo, old_hinfo);
xattr_rollback[ECUtil::get_hinfo_key()] = old_hinfo;
- }
+
+ if (op.is_none() && op.truncate && op.truncate->first == 0) {
+ assert(op.truncate->first == 0);
+ assert(op.truncate->first ==
+ op.truncate->second);
+ assert(entry);
+ assert(obc);
+
+ if (op.truncate->first != op.truncate->second) {
+ op.truncate->first = op.truncate->second;
+ } else {
+ op.truncate = boost::none;
+ }
- if (opair.second.is_none() && opair.second.truncate) {
- assert(opair.second.truncate->first == 0);
- assert(opair.second.truncate->first ==
- opair.second.truncate->second);
- assert(entry);
- assert(obc);
-
- opair.second.truncate = boost::none;
- opair.second.delete_first = true;
- opair.second.init_type = PGTransaction::ObjectOperation::Init::Create();
-
- if (obc) {
- /* We need to reapply all of the cached xattrs.
- * std::map insert fortunately only writes keys
- * which don't already exist, so this should do
- * the right thing. */
- opair.second.attr_updates.insert(
- obc->attr_cache.begin(),
- obc->attr_cache.end());
- }
- }
+ op.delete_first = true;
+ op.init_type = PGTransaction::ObjectOperation::Init::Create();
- if (oid.is_temp()) {
- if (opair.second.is_fresh_object()) {
- temp_added->insert(oid);
- } else if (opair.second.is_delete()) {
- temp_removed->insert(oid);
+ if (obc) {
+ /* We need to reapply all of the cached xattrs.
+ * std::map insert fortunately only writes keys
+ * which don't already exist, so this should do
+ * the right thing. */
+ op.attr_updates.insert(
+ obc->attr_cache.begin(),
+ obc->attr_cache.end());
+ }
}
- }
- if (opair.second.delete_first) {
- /* We also want to remove the boost::none entries since
- * the keys already won't exist */
- for (auto j = opair.second.attr_updates.begin();
- j != opair.second.attr_updates.end();
- ) {
- if (j->second) {
- ++j;
+ if (op.delete_first) {
+ /* We also want to remove the boost::none entries since
+ * the keys already won't exist */
+ for (auto j = op.attr_updates.begin();
+ j != op.attr_updates.end();
+ ) {
+ if (j->second) {
+ ++j;
+ } else {
+ op.attr_updates.erase(j++);
+ }
+ }
+ /* Fill in all current entries for xattr rollback */
+ if (obc) {
+ xattr_rollback.insert(
+ obc->attr_cache.begin(),
+ obc->attr_cache.end());
+ obc->attr_cache.clear();
+ }
+ if (entry) {
+ entry->mod_desc.rmobject(entry->version.version);
+ for (auto &&st: *transactions) {
+ st.second.collection_move_rename(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, entry->version.version, st.first));
+ }
} else {
- opair.second.attr_updates.erase(j++);
+ for (auto &&st: *transactions) {
+ st.second.remove(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ }
}
+ hinfo->clear();
}
- /* Fill in all current entries for xattr rollback */
- if (obc) {
- xattr_rollback.insert(
- obc->attr_cache.begin(),
- obc->attr_cache.end());
- obc->attr_cache.clear();
+
+ if (op.is_fresh_object() && entry) {
+ entry->mod_desc.create();
}
- if (entry) {
- entry->mod_desc.rmobject(entry->version.version);
- for (auto &&st: *transactions) {
- st.second.collection_move_rename(
+
+ match(
+ op.init_type,
+ [&](const PGTransaction::ObjectOperation::Init::None &) {},
+ [&](const PGTransaction::ObjectOperation::Init::Create &op) {
+ for (auto &&st: *transactions) {
+ st.second.touch(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ }
+ },
+ [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
+ for (auto &&st: *transactions) {
+ st.second.clone(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ }
+
+ auto siter = hash_infos.find(op.source);
+ assert(siter != hash_infos.end());
+ hinfo->update_to(*(siter->second));
+
+ if (obc) {
+ auto cobciter = obc_map.find(op.source);
+ assert(cobciter != obc_map.end());
+ obc->attr_cache = cobciter->second->attr_cache;
+ }
+ },
+ [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
+ assert(op.source.is_temp());
+ for (auto &&st: *transactions) {
+ st.second.collection_move_rename(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ }
+ auto siter = hash_infos.find(op.source);
+ assert(siter != hash_infos.end());
+ hinfo->update_to(*(siter->second));
+ if (obc) {
+ auto cobciter = obc_map.find(op.source);
+ assert(cobciter == obc_map.end());
+ obc->attr_cache.clear();
+ }
+ });
+
+ // omap not supported (except 0, handled above)
+ assert(!(op.clear_omap));
+ assert(!(op.omap_header));
+ assert(op.omap_updates.empty());
+
+ if (!op.attr_updates.empty()) {
+ map<string, bufferlist> to_set;
+ for (auto &&j: op.attr_updates) {
+ if (j.second) {
+ to_set[j.first] = *(j.second);
+ } else {
+ for (auto &&st : *transactions) {
+ st.second.rmattr(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+ j.first);
+ }
+ }
+ if (obc) {
+ auto citer = obc->attr_cache.find(j.first);
+ if (entry) {
+ if (citer != obc->attr_cache.end()) {
+ // won't overwrite anything we put in earlier
+ xattr_rollback.insert(
+ make_pair(
+ j.first,
+ boost::optional<bufferlist>(citer->second)));
+ } else {
+ // won't overwrite anything we put in earlier
+ xattr_rollback.insert(
+ make_pair(
+ j.first,
+ boost::none));
+ }
+ }
+ if (j.second) {
+ obc->attr_cache[j.first] = *(j.second);
+ } else if (citer != obc->attr_cache.end()) {
+ obc->attr_cache.erase(citer);
+ }
+ } else {
+ assert(!entry);
+ }
+ }
+ for (auto &&st : *transactions) {
+ st.second.setattrs(
coll_t(spg_t(pgid, st.first)),
ghobject_t(oid, ghobject_t::NO_GEN, st.first),
- coll_t(spg_t(pgid, st.first)),
- ghobject_t(oid, entry->version.version, st.first));
+ to_set);
}
- } else {
- for (auto &&st: *transactions) {
- st.second.remove(
+ assert(!xattr_rollback.empty());
+ }
+ if (entry && !xattr_rollback.empty()) {
+ entry->mod_desc.setattrs(xattr_rollback);
+ }
+
+ if (op.alloc_hint) {
+ /* logical_to_next_chunk_offset() scales down both aligned and
+ * unaligned offsets
+
+ * we don't bother to roll this back at this time for two reasons:
+ * 1) it's advisory
+ * 2) we don't track the old value */
+ uint64_t object_size = sinfo.logical_to_next_chunk_offset(
+ op.alloc_hint->expected_object_size);
+ uint64_t write_size = sinfo.logical_to_next_chunk_offset(
+ op.alloc_hint->expected_write_size);
+
+ for (auto &&st : *transactions) {
+ st.second.set_alloc_hint(
coll_t(spg_t(pgid, st.first)),
- ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+ object_size,
+ write_size,
+ op.alloc_hint->flags);
}
}
- hinfo->clear();
- }
- if (opair.second.is_fresh_object() && entry) {
- entry->mod_desc.create();
- }
+ extent_map to_write;
+ auto pextiter = partial_extents.find(oid);
+ if (pextiter != partial_extents.end()) {
+ to_write = pextiter->second;
+ }
- match(
- opair.second.init_type,
- [&](const PGTransaction::ObjectOperation::Init::None &) {},
- [&](const PGTransaction::ObjectOperation::Init::Create &op) {
- for (auto &&st: *transactions) {
- st.second.touch(
- coll_t(spg_t(pgid, st.first)),
- ghobject_t(oid, ghobject_t::NO_GEN, st.first));
- }
- },
- [&](const PGTransaction::ObjectOperation::Init::Clone &op) {
- for (auto &&st: *transactions) {
- st.second.clone(
- coll_t(spg_t(pgid, st.first)),
- ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
- ghobject_t(oid, ghobject_t::NO_GEN, st.first));
+ vector<pair<uint64_t, uint64_t> > rollback_extents;
+ const uint64_t orig_size = hinfo->get_total_logical_size(sinfo);
+
+ uint64_t new_size = orig_size;
+ uint64_t append_after = new_size;
+ ldpp_dout(dpp, 20) << __func__ << ": new_size start " << new_size << dendl;
+ if (op.truncate && op.truncate->first < new_size) {
+ assert(!op.is_fresh_object());
+ new_size = sinfo.logical_to_next_stripe_offset(
+ op.truncate->first);
+ ldpp_dout(dpp, 20) << __func__ << ": new_size truncate down "
+ << new_size << dendl;
+ if (new_size != op.truncate->first) { // 0 the unaligned part
+ bufferlist bl;
+ bl.append_zero(new_size - op.truncate->first);
+ to_write.insert(
+ op.truncate->first,
+ bl.length(),
+ bl);
+ append_after = sinfo.logical_to_prev_stripe_offset(
+ op.truncate->first);
+ } else {
+ append_after = new_size;
}
+ to_write.erase(
+ new_size,
+ std::numeric_limits<uint64_t>::max() - new_size);
- auto siter = hash_infos.find(op.source);
- assert(siter != hash_infos.end());
- *hinfo = *(siter->second);
+ if (entry && !op.is_fresh_object()) {
+ uint64_t restore_from = sinfo.logical_to_prev_chunk_offset(
+ op.truncate->first);
+ uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
+ orig_size -
+ sinfo.logical_to_prev_stripe_offset(op.truncate->first));
+ assert(rollback_extents.empty());
- if (obc) {
- auto cobciter = t.obc_map.find(op.source);
- assert(cobciter != t.obc_map.end());
- obc->attr_cache = cobciter->second->attr_cache;
- }
- },
- [&](const PGTransaction::ObjectOperation::Init::Rename &op) {
- assert(op.source.is_temp());
- for (auto &&st: *transactions) {
- st.second.collection_move_rename(
- coll_t(spg_t(pgid, st.first)),
- ghobject_t(op.source, ghobject_t::NO_GEN, st.first),
- coll_t(spg_t(pgid, st.first)),
- ghobject_t(oid, ghobject_t::NO_GEN, st.first));
- }
- auto siter = hash_infos.find(op.source);
- assert(siter != hash_infos.end());
- *hinfo = *(siter->second);
- assert(obc->attr_cache.empty());
- });
-
- // omap, truncate not supported (except 0, handled above)
- assert(!(opair.second.clear_omap));
- assert(!(opair.second.truncate));
- assert(!(opair.second.omap_header));
- assert(opair.second.omap_updates.empty());
-
- if (!opair.second.attr_updates.empty()) {
- map<string, bufferlist> to_set;
- for (auto &&j: opair.second.attr_updates) {
- if (j.second) {
- to_set[j.first] = *(j.second);
- } else {
+ ldpp_dout(dpp, 20) << __func__ << ": saving extent "
+ << make_pair(restore_from, restore_len)
+ << dendl;
+ ldpp_dout(dpp, 20) << __func__ << ": truncating to "
+ << new_size
+ << dendl;
+ rollback_extents.emplace_back(
+ make_pair(restore_from, restore_len));
for (auto &&st : *transactions) {
- st.second.rmattr(
+ st.second.touch(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, entry->version.version, st.first));
+ st.second.clone_range(
coll_t(spg_t(pgid, st.first)),
ghobject_t(oid, ghobject_t::NO_GEN, st.first),
- j.first);
- }
- }
- if (obc) {
- auto citer = obc->attr_cache.find(j.first);
- if (entry) {
- if (citer != obc->attr_cache.end()) {
- // won't overwrite anything we put in earlier
- xattr_rollback.insert(
- make_pair(
- j.first,
- boost::optional<bufferlist>(citer->second)));
- } else {
- // won't overwrite anything we put in earlier
- xattr_rollback.insert(
- make_pair(
- j.first,
- boost::none));
- }
- }
- if (j.second) {
- obc->attr_cache[j.first] = *(j.second);
- } else if (citer != obc->attr_cache.end()) {
- obc->attr_cache.erase(citer);
+ ghobject_t(oid, entry->version.version, st.first),
+ restore_from,
+ restore_len,
+ restore_from);
+
}
} else {
- assert(!entry);
+ ldpp_dout(dpp, 20) << __func__ << ": not saving extents, fresh object"
+ << dendl;
+ }
+ for (auto &&st : *transactions) {
+ st.second.truncate(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+ sinfo.aligned_logical_offset_to_chunk_offset(new_size));
}
}
- for (auto &&st : *transactions) {
- st.second.setattrs(
- coll_t(spg_t(pgid, st.first)),
- ghobject_t(oid, ghobject_t::NO_GEN, st.first),
- to_set);
- }
- assert(!xattr_rollback.empty());
- }
- if (entry && !xattr_rollback.empty()) {
- entry->mod_desc.setattrs(xattr_rollback);
- }
-
- if (opair.second.alloc_hint) {
- /* logical_to_next_chunk_offset() scales down both aligned and
- * unaligned offsets
-
- * we don't bother to roll this back at this time for two reasons:
- * 1) it's advisory
- * 2) we don't track the old value */
- uint64_t object_size = sinfo.logical_to_next_chunk_offset(
- opair.second.alloc_hint->expected_object_size);
- uint64_t write_size = sinfo.logical_to_next_chunk_offset(
- opair.second.alloc_hint->expected_write_size);
-
- for (auto &&st : *transactions) {
- st.second.set_alloc_hint(
- coll_t(spg_t(pgid, st.first)),
- ghobject_t(oid, ghobject_t::NO_GEN, st.first),
- object_size,
- write_size,
- opair.second.alloc_hint->flags);
- }
- }
-
- if (!opair.second.buffer_updates.empty()) {
- set<int> want;
- for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
- want.insert(i);
- }
- if (entry) {
- entry->mod_desc.append(
- sinfo.aligned_chunk_offset_to_logical_offset(
- hinfo->get_total_chunk_size()
- ));
- }
- for (auto &&extent: opair.second.buffer_updates) {
+ uint32_t fadvise_flags = 0;
+ for (auto &&extent: op.buffer_updates) {
using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
+ bufferlist bl;
match(
extent.get_val(),
[&](const BufferUpdate::Write &op) {
- if (extent.get_len()) {
- assert(op.buffer.length() == extent.get_len());
- bufferlist bl = op.buffer;
- append(
- pgid,
- oid,
- sinfo,
- ecimpl,
- want,
- extent.get_off(),
- bl,
- op.fadvise_flags,
- hinfo,
- transactions);
- }
+ bl = op.buffer;
+ fadvise_flags |= op.fadvise_flags;
},
[&](const BufferUpdate::Zero &) {
- assert(
- 0 ==
- "Zero is not allowed, do_op should have returned ENOTSUPP");
+ bl.append_zero(extent.get_len());
},
[&](const BufferUpdate::CloneRange &) {
assert(
0 ==
"CloneRange is not allowed, do_op should have returned ENOTSUPP");
});
+
+ uint64_t off = extent.get_off();
+ uint64_t len = extent.get_len();
+ uint64_t end = off + len;
+ ldpp_dout(dpp, 20) << __func__ << ": adding buffer_update "
+ << make_pair(off, len)
+ << dendl;
+ assert(len > 0);
+ if (off > new_size) {
+ assert(off > append_after);
+ bl.prepend_zero(off - new_size);
+ len += off - new_size;
+ ldpp_dout(dpp, 20) << __func__ << ": prepending zeroes to align "
+ << off << "->" << new_size
+ << dendl;
+ off = new_size;
+ }
+ if (!sinfo.logical_offset_is_stripe_aligned(end) && (end > append_after)) {
+ uint64_t aligned_end = sinfo.logical_to_next_stripe_offset(
+ end);
+ uint64_t tail = aligned_end - end;
+ bl.append_zero(tail);
+ ldpp_dout(dpp, 20) << __func__ << ": appending zeroes to align end "
+ << end << "->" << end+tail
+ << ", len: " << len << "->" << len+tail
+ << dendl;
+ end += tail;
+ len += tail;
+ }
+
+ to_write.insert(off, len, bl);
+ if (end > new_size)
+ new_size = end;
}
- }
- });
+
+ if (op.truncate &&
+ op.truncate->second > new_size) {
+ assert(op.truncate->second > append_after);
+ uint64_t truncate_to =
+ sinfo.logical_to_next_stripe_offset(
+ op.truncate->second);
+ uint64_t zeroes = truncate_to - new_size;
+ bufferlist bl;
+ bl.append_zero(zeroes);
+ to_write.insert(
+ new_size,
+ zeroes,
+ bl);
+ new_size = truncate_to;
+ ldpp_dout(dpp, 20) << __func__ << ": truncating out to "
+ << truncate_to
+ << dendl;
+ }
+
+ set<int> want;
+ for (unsigned i = 0; i < ecimpl->get_chunk_count(); ++i) {
+ want.insert(i);
+ }
+ auto to_overwrite = to_write.intersect(0, append_after);
+ ldpp_dout(dpp, 20) << __func__ << ": to_overwrite: "
+ << to_overwrite
+ << dendl;
+ for (auto &&extent: to_overwrite) {
+ assert(extent.get_off() + extent.get_len() <= append_after);
+ assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
+ assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
+ if (entry) {
+ uint64_t restore_from = sinfo.aligned_logical_offset_to_chunk_offset(
+ extent.get_off());
+ uint64_t restore_len = sinfo.aligned_logical_offset_to_chunk_offset(
+ extent.get_len());
+ ldpp_dout(dpp, 20) << __func__ << ": overwriting "
+ << restore_from << "~" << restore_len
+ << dendl;
+ if (rollback_extents.empty()) {
+ for (auto &&st : *transactions) {
+ st.second.touch(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, entry->version.version, st.first));
+ }
+ }
+ rollback_extents.emplace_back(make_pair(restore_from, restore_len));
+ for (auto &&st : *transactions) {
+ st.second.clone_range(
+ coll_t(spg_t(pgid, st.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, st.first),
+ ghobject_t(oid, entry->version.version, st.first),
+ restore_from,
+ restore_len,
+ restore_from);
+ }
+ }
+ encode_and_write(
+ pgid,
+ oid,
+ sinfo,
+ ecimpl,
+ want,
+ extent.get_off(),
+ extent.get_val(),
+ fadvise_flags,
+ hinfo,
+ written,
+ transactions,
+ dpp);
+ }
+
+ auto to_append = to_write.intersect(
+ append_after,
+ std::numeric_limits<uint64_t>::max() - append_after);
+ ldpp_dout(dpp, 20) << __func__ << ": to_append: "
+ << to_append
+ << dendl;
+ for (auto &&extent: to_append) {
+ assert(sinfo.logical_offset_is_stripe_aligned(extent.get_off()));
+ assert(sinfo.logical_offset_is_stripe_aligned(extent.get_len()));
+ ldpp_dout(dpp, 20) << __func__ << ": appending "
+ << extent.get_off() << "~" << extent.get_len()
+ << dendl;
+ encode_and_write(
+ pgid,
+ oid,
+ sinfo,
+ ecimpl,
+ want,
+ extent.get_off(),
+ extent.get_val(),
+ fadvise_flags,
+ hinfo,
+ written,
+ transactions,
+ dpp);
+ }
+
+ ldpp_dout(dpp, 20) << __func__ << ": " << oid
+ << " resetting hinfo to logical size "
+ << new_size
+ << dendl;
+ if (!rollback_extents.empty() && entry) {
+ if (entry) {
+ ldpp_dout(dpp, 20) << __func__ << ": " << oid
+ << " marking rollback extents "
+ << rollback_extents
+ << dendl;
+ entry->mod_desc.rollback_extents(
+ entry->version.version, rollback_extents);
+ }
+ hinfo->set_total_chunk_size_clear_hash(
+ sinfo.aligned_logical_offset_to_chunk_offset(new_size));
+ } else {
+ assert(hinfo->get_total_logical_size(sinfo) == new_size);
+ }
+
+ if (entry && !to_append.empty()) {
+ ldpp_dout(dpp, 20) << __func__ << ": marking append "
+ << append_after
+ << dendl;
+ entry->mod_desc.append(append_after);
+ }
+
+ bufferlist hbuf;
+ ::encode(*hinfo, hbuf);
+ for (auto &&i : *transactions) {
+ i.second.setattr(
+ coll_t(spg_t(pgid, i.first)),
+ ghobject_t(oid, ghobject_t::NO_GEN, i.first),
+ ECUtil::get_hinfo_key(),
+ hbuf);
+ }
+ });
}
#include "ECUtil.h"
#include "erasure-code/ErasureCodeInterface.h"
#include "PGTransaction.h"
+#include "ExtentCache.h"
namespace ECTransaction {
- void get_append_objects(
- const PGTransaction &t,
- set<hobject_t, hobject_t::BitwiseComparator> *out);
+ struct WritePlan {
+ PGTransactionUPtr t;
+ bool invalidates_cache = false; // Yes, both are possible
+ hobject_t::bitwisemap<extent_set> to_read;
+ hobject_t::bitwisemap<extent_set> will_write; // superset of to_read
+
+ hobject_t::bitwisemap<ECUtil::HashInfoRef> hash_infos;
+ };
+
+ bool requires_overwrite(
+ uint64_t prev_size,
+ const PGTransaction::ObjectOperation &op);
+
+ template <typename F>
+ WritePlan get_write_plan(
+ const ECUtil::stripe_info_t &sinfo,
+ PGTransactionUPtr &&t,
+ F &&get_hinfo,
+ DoutPrefixProvider *dpp) {
+ WritePlan plan;
+ t->safe_create_traverse(
+ [&](pair<const hobject_t, PGTransaction::ObjectOperation> &i) {
+ ECUtil::HashInfoRef hinfo = get_hinfo(i.first);
+ plan.hash_infos[i.first] = hinfo;
+
+ uint64_t projected_size =
+ hinfo->get_projected_total_logical_size(sinfo);
+
+ if (i.second.has_source()) {
+ plan.invalidates_cache = true;
+ }
+
+ if (i.second.is_delete()) {
+ projected_size = 0;
+ }
+
+ hobject_t source;
+ if (i.second.has_source(&source)) {
+ ECUtil::HashInfoRef shinfo = get_hinfo(source);
+ projected_size = shinfo->get_projected_total_logical_size(sinfo);
+ plan.hash_infos[source] = shinfo;
+ }
+
+ auto &will_write = plan.will_write[i.first];
+ if (i.second.truncate &&
+ i.second.truncate->first < projected_size) {
+ if (!(sinfo.logical_offset_is_stripe_aligned(
+ i.second.truncate->first))) {
+ plan.to_read[i.first].insert(
+ sinfo.logical_to_prev_stripe_offset(i.second.truncate->first),
+ sinfo.get_stripe_width());
+
+ ldpp_dout(dpp, 20) << __func__ << ": unaligned truncate" << dendl;
+
+ will_write.insert(
+ sinfo.logical_to_prev_stripe_offset(i.second.truncate->first),
+ sinfo.get_stripe_width());
+ }
+ projected_size = sinfo.logical_to_next_stripe_offset(
+ i.second.truncate->first);
+ }
+
+ for (auto &&extent: i.second.buffer_updates) {
+ using BufferUpdate = PGTransaction::ObjectOperation::BufferUpdate;
+ if (boost::get<BufferUpdate::CloneRange>(&(extent.get_val()))) {
+ assert(
+ 0 ==
+ "CloneRange is not allowed, do_op should have returned ENOTSUPP");
+ }
+
+ uint64_t head_start =
+ sinfo.logical_to_prev_stripe_offset(extent.get_off());
+ uint64_t head_finish =
+ sinfo.logical_to_next_stripe_offset(extent.get_off());
+ if (head_start > projected_size) {
+ head_start = projected_size;
+ }
+ if (head_start != head_finish &&
+ head_start < projected_size) {
+ assert(head_finish <= projected_size);
+ assert(head_finish - head_start == sinfo.get_stripe_width());
+ plan.to_read[i.first].insert(
+ head_start, sinfo.get_stripe_width());
+ }
+
+ uint64_t tail_start =
+ sinfo.logical_to_prev_stripe_offset(
+ extent.get_off() + extent.get_len());
+ uint64_t tail_finish =
+ sinfo.logical_to_next_stripe_offset(
+ extent.get_off() + extent.get_len());
+ if (tail_start != tail_finish &&
+ tail_start != head_start &&
+ tail_start < projected_size) {
+ assert(tail_finish <= projected_size);
+ assert(tail_finish - tail_start == sinfo.get_stripe_width());
+ plan.to_read[i.first].insert(
+ tail_start, sinfo.get_stripe_width());
+ }
+
+ if (head_start != tail_finish) {
+ assert(
+ sinfo.logical_offset_is_stripe_aligned(
+ tail_finish - head_start)
+ );
+ will_write.insert(
+ head_start, tail_finish - head_start);
+ if (tail_finish > projected_size)
+ projected_size = tail_finish;
+ } else {
+ assert(tail_finish <= projected_size);
+ }
+ }
+
+ if (i.second.truncate &&
+ i.second.truncate->second > projected_size) {
+ uint64_t truncating_to =
+ sinfo.logical_to_next_stripe_offset(i.second.truncate->second);
+ ldpp_dout(dpp, 20) << __func__ << ": truncating out to "
+ << truncating_to
+ << dendl;
+ will_write.insert(projected_size, truncating_to - projected_size);
+ projected_size = truncating_to;
+ }
+
+ ldpp_dout(dpp, 20) << __func__ << ": " << i.first
+ << " projected size "
+ << projected_size
+ << dendl;
+ hinfo->set_projected_total_logical_size(
+ sinfo,
+ projected_size);
+ assert(plan.to_read[i.first].empty() || !i.second.has_source());
+ });
+ plan.t = std::move(t);
+ return plan;
+ }
+
void generate_transactions(
- PGTransaction &t,
- map<
- hobject_t, ECUtil::HashInfoRef, hobject_t::BitwiseComparator
- > &hash_infos,
+ WritePlan &plan,
ErasureCodeInterfaceRef &ecimpl,
pg_t pgid,
+ bool legacy_log_entries,
const ECUtil::stripe_info_t &sinfo,
+ const hobject_t::bitwisemap<extent_map> &partial_extents,
vector<pg_log_entry_t> &entries,
+ hobject_t::bitwisemap<extent_map> *written,
map<shard_id_t, ObjectStore::Transaction> *transactions,
set<hobject_t, hobject_t::BitwiseComparator> *temp_added,
set<hobject_t, hobject_t::BitwiseComparator> *temp_removed,
- stringstream *out = 0);
+ DoutPrefixProvider *dpp);
};
#endif
void ECUtil::HashInfo::append(uint64_t old_size,
map<int, bufferlist> &to_append) {
- assert(to_append.size() == cumulative_shard_hashes.size());
assert(old_size == total_chunk_size);
uint64_t size_to_append = to_append.begin()->second.length();
- for (map<int, bufferlist>::iterator i = to_append.begin();
- i != to_append.end();
- ++i) {
- assert(size_to_append == i->second.length());
- assert((unsigned)i->first < cumulative_shard_hashes.size());
- uint32_t new_hash = i->second.crc32c(cumulative_shard_hashes[i->first]);
- cumulative_shard_hashes[i->first] = new_hash;
+ if (has_chunk_hash()) {
+ assert(to_append.size() == cumulative_shard_hashes.size());
+ for (map<int, bufferlist>::iterator i = to_append.begin();
+ i != to_append.end();
+ ++i) {
+ assert(size_to_append == i->second.length());
+ assert((unsigned)i->first < cumulative_shard_hashes.size());
+ uint32_t new_hash = i->second.crc32c(cumulative_shard_hashes[i->first]);
+ cumulative_shard_hashes[i->first] = new_hash;
+ }
}
total_chunk_size += size_to_append;
}
DECODE_START(1, bl);
::decode(total_chunk_size, bl);
::decode(cumulative_shard_hashes, bl);
+ projected_total_chunk_size = total_chunk_size;
DECODE_FINISH(bl);
}
chunk_size(stripe_width / stripe_size) {
assert(stripe_width % stripe_size == 0);
}
+ bool logical_offset_is_stripe_aligned(uint64_t logical) const {
+ return (logical % stripe_width) == 0;
+ }
uint64_t get_stripe_width() const {
return stripe_width;
}
map<int, bufferlist> *out);
class HashInfo {
- uint64_t total_chunk_size;
+ uint64_t total_chunk_size = 0;
vector<uint32_t> cumulative_shard_hashes;
+
+ // purely ephemeral, represents the size once all in-flight ops commit
+ uint64_t projected_total_chunk_size = 0;
public:
- HashInfo() : total_chunk_size(0) {}
- explicit HashInfo(unsigned num_chunks)
- : total_chunk_size(0),
+ HashInfo() {}
+ explicit HashInfo(unsigned num_chunks) :
cumulative_shard_hashes(num_chunks, -1) {}
void append(uint64_t old_size, map<int, bufferlist> &to_append);
void clear() {
uint64_t get_total_chunk_size() const {
return total_chunk_size;
}
+ uint64_t get_projected_total_chunk_size() const {
+ return projected_total_chunk_size;
+ }
+ uint64_t get_total_logical_size(const stripe_info_t &sinfo) const {
+ return get_total_chunk_size() *
+ (sinfo.get_stripe_width()/sinfo.get_chunk_size());
+ }
+ uint64_t get_projected_total_logical_size(const stripe_info_t &sinfo) const {
+ return get_projected_total_chunk_size() *
+ (sinfo.get_stripe_width()/sinfo.get_chunk_size());
+ }
+ void set_projected_total_logical_size(
+ const stripe_info_t &sinfo,
+ uint64_t logical_size) {
+ assert(sinfo.logical_offset_is_stripe_aligned(logical_size));
+ projected_total_chunk_size = sinfo.aligned_logical_offset_to_chunk_offset(
+ logical_size);
+ }
+ void set_total_chunk_size_clear_hash(uint64_t new_chunk_size) {
+ cumulative_shard_hashes.clear();
+ total_chunk_size = new_chunk_size;
+ }
+ bool has_chunk_hash() const {
+ return !cumulative_shard_hashes.empty();
+ }
+ void update_to(const HashInfo &rhs) {
+ auto ptcs = projected_total_chunk_size;
+ *this = rhs;
+ projected_total_chunk_size = ptcs;
+ }
};
+
typedef ceph::shared_ptr<HashInfo> HashInfoRef;
bool is_hinfo_key_string(const string &key);
bool deep_scrub = state_test(PG_STATE_DEEP_SCRUB);
const char *mode = (repair ? "repair": (deep_scrub ? "deep-scrub" : "scrub"));
+ if (pool.info.is_hacky_ecoverwrites()) {
+ dout(10) << __func__
+ << ": skipping stat comparisons since hacky_overwrites are enabled"
+ << dendl;
+ return;
+ }
+
if (info.stats.stats_invalid) {
info.stats.stats = scrub_cstat;
info.stats.stats_invalid = false;
}
uint64_t required_alignment() const { return stripe_width; }
+ bool is_hacky_ecoverwrites() const {
+ return has_flag(FLAG_EC_OVERWRITES);
+ }
+
bool can_shift_osds() const {
switch (get_type()) {
case TYPE_REPLICATED: