From 792f0a85d012a4c2292b27ea2cd4f5c04aece3db Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 5 Feb 2014 18:49:41 -0800 Subject: [PATCH] ECBackend: flesh out ECBackend implementation Signed-off-by: Samuel Just --- src/osd/ECBackend.cc | 1611 +++++++++++++++++++++++++++++++++++++++++- src/osd/ECBackend.h | 400 ++++++++++- 2 files changed, 2002 insertions(+), 9 deletions(-) diff --git a/src/osd/ECBackend.cc b/src/osd/ECBackend.cc index f25749d8729af..8688ad17c61e9 100644 --- a/src/osd/ECBackend.cc +++ b/src/osd/ECBackend.cc @@ -12,17 +12,571 @@ * */ +#include +#include +#include +#include + +#include "ECUtil.h" #include "ECBackend.h" +#include "messages/MOSDPGPush.h" +#include "messages/MOSDPGPushReply.h" + +#define dout_subsys ceph_subsys_osd +#define DOUT_PREFIX_ARGS this +#undef dout_prefix +#define dout_prefix _prefix(_dout, this) +static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) { + return *_dout << pgb->get_parent()->gen_dbg_prefix(); +} -PGBackend::RecoveryHandle *open_recovery_op() +struct ECRecoveryHandle : public PGBackend::RecoveryHandle { + list ops; +}; + +static ostream &operator<<(ostream &lhs, const map &rhs) { - return 0; + lhs << "["; + for (map::const_iterator i = rhs.begin(); + i != rhs.end(); + ++i) { + if (i != rhs.begin()) + lhs << ", "; + lhs << make_pair(i->first, i->second.length()); + } + return lhs << "]"; +} + +static ostream &operator<<(ostream &lhs, const map &rhs) +{ + lhs << "["; + for (map::const_iterator i = rhs.begin(); + i != rhs.end(); + ++i) { + if (i != rhs.begin()) + lhs << ", "; + lhs << make_pair(i->first, i->second.length()); + } + return lhs << "]"; +} + +static ostream &operator<<( + ostream &lhs, + const boost::tuple > &rhs) +{ + return lhs << "(" << rhs.get<0>() << ", " + << rhs.get<1>() << ", " << rhs.get<2>() << ")"; +} + +ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs) +{ + return lhs << "read_request_t(to_read=[" << rhs.to_read << "]" + << ", need=" << rhs.need + << ", want_attrs=" << rhs.want_attrs + << ")"; +} + +ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs) +{ + lhs << "read_result_t(r=" << rhs.r + << ", errors=" << rhs.errors; + if (rhs.attrs) { + lhs << ", attrs=" << rhs.attrs; + } else { + lhs << ", noattrs"; + } + return lhs << ", returned=" << rhs.returned; +} + +ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs) +{ + lhs << "ReadOp(tid=" << rhs.tid; + if (rhs.op && rhs.op->get_req()) { + lhs << ", op="; + rhs.op->get_req()->print(lhs); + } + return lhs << ", to_read=" << rhs.to_read + << ", complete=" << rhs.complete + << ", priority=" << rhs.priority + << ", obj_to_source=" << rhs.obj_to_source + << ", source_to_obj=" << rhs.source_to_obj + << ", in_progress=" << rhs.in_progress << ")"; +} + +void ECBackend::ReadOp::dump(Formatter *f) const +{ + f->dump_stream("tid") << tid; + if (op && op->get_req()) { + f->dump_stream("op") << *(op->get_req()); + } + f->dump_stream("to_read") << to_read; + f->dump_stream("complete") << complete; + f->dump_stream("priority") << priority; + f->dump_stream("obj_to_source") << obj_to_source; + f->dump_stream("source_to_obj") << source_to_obj; + f->dump_stream("in_progress") << in_progress; +} + +ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs) +{ + lhs << "Op(" << rhs.hoid + << " v=" << rhs.version + << " tt=" << rhs.trim_to + << " tid=" << rhs.tid + << " reqid=" << rhs.reqid; + if (rhs.client_op && rhs.client_op->get_req()) { + lhs << " client_op="; + rhs.client_op->get_req()->print(lhs); + } + lhs << " pending_commit=" << rhs.pending_commit + << " pending_apply=" << rhs.pending_apply + << ")"; + return lhs; +} + +ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs) +{ + return lhs << "RecoveryOp(" + << "hoid=" << rhs.hoid + << " v=" << rhs.v + << " missing_on=" << rhs.missing_on + << " missing_on_shards=" << rhs.missing_on_shards + << " recovery_info=" << rhs.recovery_info + << " recovery_progress=" << rhs.recovery_progress + << " pending_read=" << rhs.pending_read + << " obc refcount=" << rhs.obc.use_count() + << " state=" << ECBackend::RecoveryOp::tostr(rhs.state) + << " waiting_on_pushes=" << rhs.waiting_on_pushes + << " extent_requested=" << rhs.extent_requested; +} + +void ECBackend::RecoveryOp::dump(Formatter *f) const +{ + f->dump_stream("hoid") << hoid; + f->dump_stream("v") << v; + f->dump_stream("missing_on") << missing_on; + f->dump_stream("missing_on_shards") << missing_on_shards; + f->dump_stream("recovery_info") << recovery_info; + f->dump_stream("recovery_progress") << recovery_progress; + f->dump_stream("pending_read") << pending_read; + f->dump_stream("state") << tostr(state); + f->dump_stream("waiting_on_pushes") << waiting_on_pushes; + f->dump_stream("extent_requested") << extent_requested; +} + +PGBackend::RecoveryHandle *ECBackend::open_recovery_op() +{ + return new ECRecoveryHandle; +} + +struct OnRecoveryReadComplete : + public GenContext &> { + ECBackend *pg; + hobject_t hoid; + set want; + OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid) + : pg(pg), hoid(hoid) {} + void finish(pair &in) { + ECBackend::read_result_t &res = in.second; + assert(res.r == 0); + assert(res.errors.empty()); + assert(res.returned.size() == 1); + pg->handle_recovery_read_complete( + hoid, + res.returned.back(), + res.attrs, + in.first); + } +}; + +struct RecoveryMessages { + map reads; + void read( + ECBackend *ec, + const hobject_t &hoid, uint64_t off, uint64_t len, + const set &need, + bool attrs) { + list > to_read; + to_read.push_back(make_pair(off, len)); + assert(!reads.count(hoid)); + reads.insert( + make_pair( + hoid, + ECBackend::read_request_t( + hoid, + to_read, + need, + attrs, + new OnRecoveryReadComplete( + ec, + hoid)))); + } + + map > pushes; + map > push_replies; + ObjectStore::Transaction *t; + RecoveryMessages() : t(new ObjectStore::Transaction) {} + ~RecoveryMessages() { assert(!t); } +}; + +void ECBackend::handle_recovery_push( + PushOp &op, + RecoveryMessages *m) +{ + bool oneshot = op.before_progress.first && op.after_progress.data_complete; + coll_t tcoll = oneshot ? coll : get_temp_coll(m->t); + if (op.before_progress.first) { + get_parent()->on_local_recover_start( + op.soid, + m->t); + m->t->remove( + get_temp_coll(m->t), + ghobject_t( + op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); + m->t->touch( + tcoll, + ghobject_t( + op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); + } + + if (!op.data_included.empty()) { + uint64_t start = op.data_included.range_start(); + uint64_t end = op.data_included.range_end(); + assert(op.data.length() == (end - start)); + + m->t->write( + tcoll, + ghobject_t( + op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + start, + op.data.length(), + op.data); + } else { + assert(op.data.length() == 0); + } + + if (op.before_progress.first) { + if (!oneshot) + add_temp_obj(op.soid); + assert(op.attrset.count(string("_"))); + m->t->setattrs( + tcoll, + ghobject_t( + op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + op.attrset); + } + + if (op.after_progress.data_complete && !oneshot) { + clear_temp_obj(op.soid); + m->t->collection_move( + coll, + tcoll, + ghobject_t( + op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard)); + } + if (op.after_progress.data_complete) { + if ((get_parent()->pgb_is_primary())) { + assert(recovery_ops.count(op.soid)); + assert(recovery_ops[op.soid].obc); + object_stat_sum_t stats; + stats.num_objects_recovered = 1; + stats.num_bytes_recovered = recovery_ops[op.soid].obc->obs.oi.size; + get_parent()->on_local_recover( + op.soid, + stats, + op.recovery_info, + recovery_ops[op.soid].obc, + m->t); + } else { + get_parent()->on_local_recover( + op.soid, + object_stat_sum_t(), + op.recovery_info, + ObjectContextRef(), + m->t); + } + } + m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp()); + m->push_replies[get_parent()->primary_shard()].back().soid = op.soid; +} + +void ECBackend::handle_recovery_push_reply( + PushReplyOp &op, + pg_shard_t from, + RecoveryMessages *m) +{ + if (!recovery_ops.count(op.soid)) + return; + RecoveryOp &rop = recovery_ops[op.soid]; + assert(rop.waiting_on_pushes.count(from)); + rop.waiting_on_pushes.erase(from); + continue_recovery_op(rop, m); +} + +void ECBackend::handle_recovery_read_complete( + const hobject_t &hoid, + boost::tuple > &to_read, + boost::optional > attrs, + RecoveryMessages *m) +{ + dout(10) << __func__ << ": returned " << hoid << " " + << "(" << to_read.get<0>() + << ", " << to_read.get<1>() + << ", " << to_read.get<2>() + << ")" + << dendl; + assert(recovery_ops.count(hoid)); + RecoveryOp &op = recovery_ops[hoid]; + assert(op.returned_data.empty()); + map target; + for (set::iterator i = op.missing_on_shards.begin(); + i != op.missing_on_shards.end(); + ++i) { + target[*i] = &(op.returned_data[*i]); + } + map from; + for(map::iterator i = to_read.get<2>().begin(); + i != to_read.get<2>().end(); + ++i) { + from[i->first.shard].claim(i->second); + } + dout(10) << __func__ << ": " << from << dendl; + ECUtil::decode(sinfo, ec_impl, from, target); + if (attrs) { + op.xattrs.swap(*attrs); + + if (!op.obc) { + op.obc = get_parent()->get_obc(hoid, op.xattrs); + op.recovery_info.size = op.obc->obs.oi.size; + op.recovery_info.oi = op.obc->obs.oi; + } + + ECUtil::HashInfo hinfo(ec_impl->get_chunk_count()); + if (op.obc->obs.oi.size > 0) { + assert(op.xattrs.count(ECUtil::get_hinfo_key())); + bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin(); + ::decode(hinfo, bp); + } + op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo); + } + assert(op.xattrs.size()); + assert(op.obc); + continue_recovery_op(op, m); +} + +struct SendPushReplies : public Context { + PGBackend::Listener *l; + epoch_t epoch; + map replies; + SendPushReplies( + PGBackend::Listener *l, + epoch_t epoch, + map &in) : l(l), epoch(epoch) { + replies.swap(in); + } + void finish(int) { + for (map::iterator i = replies.begin(); + i != replies.end(); + ++i) { + l->send_message_osd_cluster(i->first, i->second, epoch); + } + replies.clear(); + } + ~SendPushReplies() { + for (map::iterator i = replies.begin(); + i != replies.end(); + ++i) { + i->second->put(); + } + replies.clear(); + } +}; + +void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority) +{ + for (map >::iterator i = m.pushes.begin(); + i != m.pushes.end(); + m.pushes.erase(i++)) { + MOSDPGPush *msg = new MOSDPGPush(); + msg->set_priority(priority); + msg->map_epoch = get_parent()->get_epoch(); + msg->from = get_parent()->whoami_shard(); + msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); + msg->pushes.swap(i->second); + msg->compute_cost(cct); + get_parent()->send_message( + i->first.osd, + msg); + } + map replies; + for (map >::iterator i = + m.push_replies.begin(); + i != m.push_replies.end(); + m.push_replies.erase(i++)) { + MOSDPGPushReply *msg = new MOSDPGPushReply(); + msg->set_priority(priority); + msg->map_epoch = get_parent()->get_epoch(); + msg->from = get_parent()->whoami_shard(); + msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard); + msg->replies.swap(i->second); + msg->compute_cost(cct); + replies.insert(make_pair(i->first.osd, msg)); + } + m.t->register_on_complete( + get_parent()->bless_context( + new SendPushReplies( + get_parent(), + get_parent()->get_epoch(), + replies))); + get_parent()->queue_transaction(m.t); + m.t = NULL; + if (m.reads.empty()) + return; + start_read_op( + priority, + m.reads, + OpRequestRef()); +} + +void ECBackend::continue_recovery_op( + RecoveryOp &op, + RecoveryMessages *m) +{ + dout(10) << __func__ << ": continuing " << op << dendl; + while (1) { + switch (op.state) { + case RecoveryOp::IDLE: { + // start read + op.state = RecoveryOp::READING; + assert(!op.recovery_progress.data_complete); + set want(op.missing_on_shards.begin(), op.missing_on_shards.end()); + set to_read; + int r = get_min_avail_to_read_shards( + op.hoid, want, true, &to_read); + assert(r == 0); + m->read( + this, + op.hoid, + op.recovery_progress.data_recovered_to, + get_recovery_chunk_size(), + to_read, + op.recovery_progress.first); + op.extent_requested = make_pair(op.recovery_progress.data_recovered_to, + get_recovery_chunk_size()); + dout(10) << __func__ << ": IDLE return " << op << dendl; + return; + } + case RecoveryOp::READING: { + // read completed, start write + assert(op.xattrs.size()); + assert(op.returned_data.size()); + op.state = RecoveryOp::WRITING; + ObjectRecoveryProgress after_progress = op.recovery_progress; + after_progress.data_recovered_to += get_recovery_chunk_size(); + after_progress.first = false; + if (after_progress.data_recovered_to >= op.obc->obs.oi.size) { + after_progress.data_recovered_to = + sinfo.logical_to_next_stripe_offset( + op.obc->obs.oi.size); + after_progress.data_complete = true; + } + for (set::iterator mi = op.missing_on.begin(); + mi != op.missing_on.end(); + ++mi) { + assert(op.returned_data.count(mi->shard)); + m->pushes[*mi].push_back(PushOp()); + PushOp &pop = m->pushes[*mi].back(); + pop.soid = op.hoid; + pop.version = op.v; + pop.data = op.returned_data[mi->shard]; + dout(10) << __func__ << ": before_progress=" << op.recovery_progress + << ", after_progress=" << after_progress + << ", pop.data.length()=" << pop.data.length() + << ", size=" << op.obc->obs.oi.size << dendl; + assert( + pop.data.length() == + sinfo.aligned_logical_offset_to_chunk_offset( + after_progress.data_recovered_to - + op.recovery_progress.data_recovered_to) + ); + if (pop.data.length()) + pop.data_included.insert( + sinfo.aligned_logical_offset_to_chunk_offset( + op.recovery_progress.data_recovered_to), + pop.data.length() + ); + if (op.recovery_progress.first) { + pop.attrset = op.xattrs; + } + pop.recovery_info = op.recovery_info; + pop.before_progress = op.recovery_progress; + pop.after_progress = after_progress; + if (*mi != get_parent()->primary_shard()) + get_parent()->begin_peer_recover( + *mi, + op.hoid); + } + op.returned_data.clear(); + op.waiting_on_pushes = op.missing_on; + op.recovery_progress = after_progress; + dout(10) << __func__ << ": READING return " << op << dendl; + return; + } + case RecoveryOp::WRITING: { + if (op.waiting_on_pushes.empty()) { + if (op.recovery_progress.data_complete) { + op.state = RecoveryOp::COMPLETE; + for (set::iterator i = op.missing_on.begin(); + i != op.missing_on.end(); + ++i) { + if (*i != get_parent()->primary_shard()) { + dout(10) << __func__ << ": on_peer_recover on " << *i + << ", obj " << op.hoid << dendl; + get_parent()->on_peer_recover( + *i, + op.hoid, + op.recovery_info, + object_stat_sum_t()); + } + } + get_parent()->on_global_recover(op.hoid); + dout(10) << __func__ << ": WRITING return " << op << dendl; + recovery_ops.erase(op.hoid); + return; + } else { + op.state = RecoveryOp::IDLE; + dout(10) << __func__ << ": WRITING continue " << op << dendl; + continue; + } + } + return; + } + case RecoveryOp::COMPLETE: { + assert(0); // should never be called once complete + }; + default: + assert(0); + } + } } void ECBackend::run_recovery_op( - RecoveryHandle *h, + RecoveryHandle *_h, int priority) { + ECRecoveryHandle *h = static_cast(_h); + RecoveryMessages m; + for (list::iterator i = h->ops.begin(); + i != h->ops.end(); + ++i) { + dout(10) << __func__ << ": starting " << *i << dendl; + assert(!recovery_ops.count(i->hoid)); + RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second; + continue_recovery_op(op, &m); + } + dispatch_recovery_messages(m, priority); + delete _h; } void ECBackend::recover_object( @@ -30,35 +584,517 @@ void ECBackend::recover_object( eversion_t v, ObjectContextRef head, ObjectContextRef obc, - RecoveryHandle *h) + RecoveryHandle *_h) +{ + ECRecoveryHandle *h = static_cast(_h); + h->ops.push_back(RecoveryOp()); + h->ops.back().v = v; + h->ops.back().hoid = hoid; + h->ops.back().obc = obc; + h->ops.back().recovery_info.soid = hoid; + h->ops.back().recovery_info.version = v; + if (obc) { + h->ops.back().recovery_info.size = obc->obs.oi.size; + h->ops.back().recovery_info.oi = obc->obs.oi; + } + h->ops.back().recovery_progress.omap_complete = true; + for (set::const_iterator i = + get_parent()->get_actingbackfill_shards().begin(); + i != get_parent()->get_actingbackfill_shards().end(); + ++i) { + dout(10) << "checking " << *i << dendl; + if (get_parent()->get_shard_missing(*i).is_missing(hoid)) { + h->ops.back().missing_on.insert(*i); + h->ops.back().missing_on_shards.insert(i->shard); + } + } + dout(10) << __func__ << ": built op " << h->ops.back() << dendl; +} + +bool ECBackend::can_handle_while_inactive( + OpRequestRef _op) { + return false; } bool ECBackend::handle_message( - OpRequestRef op) + OpRequestRef _op) { + dout(10) << __func__ << ": " << *_op->get_req() << dendl; + int priority = _op->get_req()->get_priority(); + switch (_op->get_req()->get_type()) { + case MSG_OSD_EC_WRITE: { + MOSDECSubOpWrite *op = static_cast(_op->get_req()); + handle_sub_write(op->op.from, _op, op->op); + return true; + } + case MSG_OSD_EC_WRITE_REPLY: { + MOSDECSubOpWriteReply *op = static_cast( + _op->get_req()); + op->set_priority(priority); + handle_sub_write_reply(op->op.from, op->op); + return true; + } + case MSG_OSD_EC_READ: { + MOSDECSubOpRead *op = static_cast(_op->get_req()); + MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply; + reply->pgid = get_parent()->primary_spg_t(); + reply->map_epoch = get_parent()->get_epoch(); + handle_sub_read(op->op.from, op->op, &(reply->op)); + op->set_priority(priority); + get_parent()->send_message_osd_cluster( + op->op.from.osd, reply, get_parent()->get_epoch()); + return true; + } + case MSG_OSD_EC_READ_REPLY: { + MOSDECSubOpReadReply *op = static_cast( + _op->get_req()); + RecoveryMessages rm; + handle_sub_read_reply(op->op.from, op->op, &rm); + dispatch_recovery_messages(rm, priority); + return true; + } + case MSG_OSD_PG_PUSH: { + MOSDPGPush *op = static_cast(_op->get_req()); + RecoveryMessages rm; + for (vector::iterator i = op->pushes.begin(); + i != op->pushes.end(); + ++i) { + handle_recovery_push(*i, &rm); + } + dispatch_recovery_messages(rm, priority); + return true; + } + case MSG_OSD_PG_PUSH_REPLY: { + MOSDPGPushReply *op = static_cast(_op->get_req()); + RecoveryMessages rm; + for (vector::iterator i = op->replies.begin(); + i != op->replies.end(); + ++i) { + handle_recovery_push_reply(*i, op->from, &rm); + } + dispatch_recovery_messages(rm, priority); + return true; + } + default: + return false; + } return false; } +struct SubWriteCommitted : public Context { + ECBackend *pg; + OpRequestRef msg; + tid_t tid; + eversion_t version; + eversion_t last_complete; + SubWriteCommitted( + ECBackend *pg, + OpRequestRef msg, + tid_t tid, + eversion_t version, + eversion_t last_complete) + : pg(pg), msg(msg), tid(tid), + version(version), last_complete(last_complete) {} + void finish(int) { + if (msg) + msg->mark_event("sub_op_committed"); + pg->sub_write_committed(tid, version, last_complete); + } +}; +void ECBackend::sub_write_committed( + tid_t tid, eversion_t version, eversion_t last_complete) { + if (get_parent()->pgb_is_primary()) { + ECSubWriteReply reply; + reply.tid = tid; + reply.last_complete = last_complete; + reply.committed = true; + reply.from = get_parent()->whoami_shard(); + handle_sub_write_reply( + get_parent()->whoami_shard(), + reply); + } else { + get_parent()->update_last_complete_ondisk(last_complete); + MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply; + r->pgid = get_parent()->primary_spg_t(); + r->map_epoch = get_parent()->get_epoch(); + r->op.tid = tid; + r->op.last_complete = last_complete; + r->op.committed = true; + r->op.from = get_parent()->whoami_shard(); + get_parent()->send_message_osd_cluster( + get_parent()->primary_shard().osd, r, get_parent()->get_epoch()); + } +} + +struct SubWriteApplied : public Context { + ECBackend *pg; + OpRequestRef msg; + tid_t tid; + eversion_t version; + SubWriteApplied( + ECBackend *pg, + OpRequestRef msg, + tid_t tid, + eversion_t version) + : pg(pg), msg(msg), tid(tid), version(version) {} + void finish(int) { + if (msg) + msg->mark_event("sub_op_applied"); + pg->sub_write_applied(tid, version); + } +}; +void ECBackend::sub_write_applied( + tid_t tid, eversion_t version) { + parent->op_applied(version); + if (get_parent()->pgb_is_primary()) { + ECSubWriteReply reply; + reply.from = get_parent()->whoami_shard(); + reply.tid = tid; + reply.applied = true; + handle_sub_write_reply( + get_parent()->whoami_shard(), + reply); + } else { + MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply; + r->pgid = get_parent()->primary_spg_t(); + r->map_epoch = get_parent()->get_epoch(); + r->op.from = get_parent()->whoami_shard(); + r->op.tid = tid; + r->op.applied = true; + get_parent()->send_message_osd_cluster( + get_parent()->primary_shard().osd, r, get_parent()->get_epoch()); + } +} + +void ECBackend::handle_sub_write( + pg_shard_t from, + OpRequestRef msg, + ECSubWrite &op, + Context *on_local_applied_sync) +{ + if (msg) + msg->mark_started(); + assert(!get_parent()->get_log().get_missing().is_missing(op.soid)); + if (!get_parent()->pgb_is_primary()) + get_parent()->update_stats(op.stats); + ObjectStore::Transaction *localt = new ObjectStore::Transaction; + get_parent()->log_operation( + op.log_entries, + op.trim_to, + !(op.t.empty()), + localt); + localt->append(op.t); + if (on_local_applied_sync) { + dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl; + localt->register_on_applied_sync(on_local_applied_sync); + } + localt->register_on_commit( + get_parent()->bless_context( + new SubWriteCommitted( + this, msg, op.tid, + op.at_version, + get_parent()->get_info().last_complete))); + localt->register_on_applied( + get_parent()->bless_context( + new SubWriteApplied(this, msg, op.tid, op.at_version))); + get_parent()->queue_transaction(localt, msg); +} + +void ECBackend::handle_sub_read( + pg_shard_t from, + ECSubRead &op, + ECSubReadReply *reply) +{ + for(map > >::iterator i = + op.to_read.begin(); + i != op.to_read.end(); + ++i) { + for (list >::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + bufferlist bl; + int r = store->read( + i->first.is_temp() ? temp_coll : coll, + ghobject_t( + i->first, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + j->first, + j->second, + bl, + false); + if (r < 0) { + assert(0); + reply->buffers_read.erase(i->first); + reply->errors[i->first] = r; + break; + } else { + reply->buffers_read[i->first].push_back( + make_pair( + j->first, + bl) + ); + } + } + } + for (set::iterator i = op.attrs_to_read.begin(); + i != op.attrs_to_read.end(); + ++i) { + dout(10) << __func__ << ": fulfilling attr request on " + << *i << dendl; + if (reply->errors.count(*i)) + continue; + int r = store->getattrs( + i->is_temp() ? temp_coll : coll, + ghobject_t( + *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + reply->attrs_read[*i]); + if (r < 0) { + assert(0); + reply->buffers_read.erase(*i); + reply->errors[*i] = r; + } + } + reply->from = get_parent()->whoami_shard(); + reply->tid = op.tid; +} + +void ECBackend::handle_sub_write_reply( + pg_shard_t from, + ECSubWriteReply &op) +{ + map::iterator i = tid_to_op_map.find(op.tid); + assert(i != tid_to_op_map.end()); + if (op.committed) { + assert(i->second.pending_commit.count(from)); + i->second.pending_commit.erase(from); + if (from != get_parent()->whoami_shard()) { + get_parent()->update_peer_last_complete_ondisk(from, op.last_complete); + } + } + if (op.applied) { + assert(i->second.pending_apply.count(from)); + i->second.pending_apply.erase(from); + } + check_op(&(i->second)); +} + +void ECBackend::handle_sub_read_reply( + pg_shard_t from, + ECSubReadReply &op, + RecoveryMessages *m) +{ + dout(10) << __func__ << ": reply " << op << dendl; + map::iterator iter = tid_to_read_map.find(op.tid); + if (iter == tid_to_read_map.end()) { + //canceled + return; + } + ReadOp &rop = iter->second; + for (map > >::iterator i = + op.buffers_read.begin(); + i != op.buffers_read.end(); + ++i) { + assert(!op.errors.count(i->first)); + assert(rop.to_read.count(i->first)); + list >::const_iterator req_iter = + rop.to_read.find(i->first)->second.to_read.begin(); + list< + boost::tuple< + uint64_t, uint64_t, map > >::iterator riter = + rop.complete[i->first].returned.begin(); + for (list >::iterator j = i->second.begin(); + j != i->second.end(); + ++j, ++req_iter, ++riter) { + assert(req_iter != rop.to_read.find(i->first)->second.to_read.end()); + assert(riter != rop.complete[i->first].returned.end()); + pair adjusted = + sinfo.aligned_offset_len_to_chunk( + *req_iter); + assert(adjusted.first == j->first); + riter->get<2>()[from].claim(j->second); + } + } + for (map >::iterator i = op.attrs_read.begin(); + i != op.attrs_read.end(); + ++i) { + assert(!op.errors.count(i->first)); + assert(rop.to_read.count(i->first)); + rop.complete[i->first].attrs = map(); + (*(rop.complete[i->first].attrs)).swap(i->second); + } + for (map::iterator i = op.errors.begin(); + i != op.errors.end(); + ++i) { + rop.complete[i->first].errors.insert( + make_pair( + from, + i->second)); + if (rop.complete[i->first].r == 0) + rop.complete[i->first].r = i->second; + } + + map >::iterator siter = shard_to_read_map.find(from); + assert(siter != shard_to_read_map.end()); + assert(siter->second.count(op.tid)); + siter->second.erase(op.tid); + + assert(rop.in_progress.count(from)); + rop.in_progress.erase(from); + if (!rop.in_progress.empty()) { + dout(10) << __func__ << " readop not complete: " << rop << dendl; + } else { + dout(10) << __func__ << " readop complete: " << rop << dendl; + complete_read_op(rop, m); + } +} + +void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m) +{ + map::iterator reqiter = + rop.to_read.begin(); + map::iterator resiter = + rop.complete.begin(); + assert(rop.to_read.size() == rop.complete.size()); + for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) { + if (reqiter->second.cb) { + pair arg( + m, resiter->second); + reqiter->second.cb->complete(arg); + reqiter->second.cb = NULL; + } + } + tid_to_read_map.erase(rop.tid); +} + +struct FinishReadOp : public GenContext { + ECBackend *ec; + tid_t tid; + FinishReadOp(ECBackend *ec, tid_t tid) : ec(ec), tid(tid) {} + void finish(ThreadPool::TPHandle &handle) { + assert(ec->tid_to_read_map.count(tid)); + int priority = ec->tid_to_read_map[tid].priority; + RecoveryMessages rm; + ec->complete_read_op(ec->tid_to_read_map[tid], &rm); + ec->dispatch_recovery_messages(rm, priority); + } +}; + +void ECBackend::filter_read_op( + const OSDMapRef osdmap, + ReadOp &op) +{ + for (map >::iterator i = op.source_to_obj.begin(); + i != op.source_to_obj.end(); + ) { + if (!osdmap->is_down(i->first.osd)) { + ++i; + continue; + } + for (set::iterator j = i->second.begin(); + j != i->second.end(); + ++j) { + get_parent()->cancel_pull(*j); + + assert(op.to_read.count(*j)); + read_request_t &req = op.to_read.find(*j)->second; + assert(req.cb); + delete req.cb; + req.cb = NULL; + + op.to_read.erase(*j); + op.complete.erase(*j); + op.obj_to_source.erase(*j); + op.in_progress.erase(i->first); + recovery_ops.erase(*j); + if (op.in_progress.empty()) { + get_parent()->schedule_work( + get_parent()->bless_gencontext( + new FinishReadOp(this, op.tid))); + } + } + op.source_to_obj.erase(i++); + } +}; + void ECBackend::check_recovery_sources(const OSDMapRef osdmap) { + set tids_to_filter; + for (map >::iterator i = shard_to_read_map.begin(); + i != shard_to_read_map.end(); + ) { + if (osdmap->is_down(i->first.osd)) { + tids_to_filter.insert(i->second.begin(), i->second.end()); + shard_to_read_map.erase(i++); + } else { + ++i; + } + } + for (set::iterator i = tids_to_filter.begin(); + i != tids_to_filter.end(); + ++i) { + map::iterator j = tid_to_read_map.find(*i); + assert(j != tid_to_read_map.end()); + filter_read_op(osdmap, j->second); + } } void ECBackend::_on_change(ObjectStore::Transaction *t) { + writing.clear(); + tid_to_op_map.clear(); + for (map::iterator i = tid_to_read_map.begin(); + i != tid_to_read_map.end(); + ++i) { + dout(10) << __func__ << ": cancelling " << i->second << dendl; + for (map::iterator j = + i->second.to_read.begin(); + j != i->second.to_read.end(); + ++j) { + delete j->second.cb; + j->second.cb = 0; + } + } + tid_to_read_map.clear(); + for (list::iterator i = in_progress_client_reads.begin(); + i != in_progress_client_reads.end(); + ++i) { + delete i->on_complete; + i->on_complete = NULL; + } + in_progress_client_reads.clear(); + shard_to_read_map.clear(); + clear_state(); } void ECBackend::clear_state() { + recovery_ops.clear(); } void ECBackend::on_flushed() { } - void ECBackend::dump_recovery_info(Formatter *f) const { + f->open_array_section("recovery_ops"); + for (map::const_iterator i = recovery_ops.begin(); + i != recovery_ops.end(); + ++i) { + f->open_object_section("op"); + i->second.dump(f); + f->close_section(); + } + f->close_section(); + f->open_array_section("read_ops"); + for (map::const_iterator i = tid_to_read_map.begin(); + i != tid_to_read_map.end(); + ++i) { + f->open_object_section("read_op"); + i->second.dump(f); + f->close_section(); + } + f->close_section(); } PGBackend::PGTransaction *ECBackend::get_transaction() @@ -66,10 +1102,31 @@ PGBackend::PGTransaction *ECBackend::get_transaction() return new ECTransaction; } +struct MustPrependHashInfo : public ObjectModDesc::Visitor { + enum { EMPTY, FOUND_APPEND, FOUND_CREATE_STASH } state; + MustPrependHashInfo() : state(EMPTY) {} + void append(uint64_t) { + if (state == EMPTY) { + state = FOUND_APPEND; + } + } + void rmobject(version_t) { + if (state == EMPTY) { + state = FOUND_CREATE_STASH; + } + } + void create() { + if (state == EMPTY) { + state = FOUND_CREATE_STASH; + } + } + bool must_prepend_hash_info() const { return state == FOUND_APPEND; } +}; + void ECBackend::submit_transaction( const hobject_t &hoid, const eversion_t &at_version, - PGTransaction *t, + PGTransaction *_t, const eversion_t &trim_to, vector &log_entries, Context *on_local_applied_sync, @@ -77,8 +1134,338 @@ void ECBackend::submit_transaction( Context *on_all_commit, tid_t tid, osd_reqid_t reqid, - OpRequestRef op) + OpRequestRef client_op + ) +{ + assert(!tid_to_op_map.count(tid)); + Op *op = &(tid_to_op_map[tid]); + op->hoid = hoid; + op->version = at_version; + op->trim_to = trim_to; + op->log_entries.swap(log_entries); + op->on_local_applied_sync = on_local_applied_sync; + op->on_all_applied = on_all_applied; + op->on_all_commit = on_all_commit; + op->tid = tid; + op->reqid = reqid; + op->client_op = client_op; + + op->t = static_cast(_t); + + set need_hinfos; + op->t->get_append_objects(&need_hinfos); + for (set::iterator i = need_hinfos.begin(); + i != need_hinfos.end(); + ++i) { + op->unstable_hash_infos.insert( + make_pair( + *i, + get_hash_info(*i))); + } + + for (vector::iterator i = op->log_entries.begin(); + i != op->log_entries.end(); + ++i) { + MustPrependHashInfo vis; + i->mod_desc.visit(&vis); + if (vis.must_prepend_hash_info()) { + dout(10) << __func__ << ": stashing HashInfo for " + << i->soid << " for entry " << *i << dendl; + assert(op->unstable_hash_infos.count(i->soid)); + ObjectModDesc desc; + map > old_attrs; + bufferlist old_hinfo; + ::encode(*(op->unstable_hash_infos[i->soid]), old_hinfo); + old_attrs[ECUtil::get_hinfo_key()] = old_hinfo; + desc.setattrs(old_attrs); + i->mod_desc.swap(desc); + i->mod_desc.claim_append(desc); + assert(i->mod_desc.can_rollback()); + } + } + + dout(10) << __func__ << ": op " << *op << " starting" << dendl; + start_write(op); + writing.push_back(op); + dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl; +} + +int ECBackend::get_min_avail_to_read_shards( + const hobject_t &hoid, + const set &want, + bool for_recovery, + set *to_read) +{ + map >::const_iterator miter = + get_parent()->get_missing_loc_shards().find(hoid); + + set have; + map shards; + + for (set::const_iterator i = + get_parent()->get_acting_shards().begin(); + i != get_parent()->get_acting_shards().end(); + ++i) { + dout(10) << __func__ << ": checking acting " << *i << dendl; + const pg_missing_t &missing = get_parent()->get_shard_missing(*i); + if (!missing.is_missing(hoid)) { + assert(!have.count(i->shard)); + have.insert(i->shard); + assert(!shards.count(i->shard)); + shards.insert(make_pair(i->shard, *i)); + } + } + + if (for_recovery) { + for (set::const_iterator i = + get_parent()->get_backfill_shards().begin(); + i != get_parent()->get_backfill_shards().end(); + ++i) { + if (have.count(i->shard)) { + assert(shards.count(i->shard)); + continue; + } + dout(10) << __func__ << ": checking backfill " << *i << dendl; + assert(!shards.count(i->shard)); + const pg_info_t &info = get_parent()->get_shard_info(*i); + const pg_missing_t &missing = get_parent()->get_shard_missing(*i); + if (hoid < info.last_backfill && !missing.is_missing(hoid)) { + have.insert(i->shard); + shards.insert(make_pair(i->shard, *i)); + } + } + + if (miter != get_parent()->get_missing_loc_shards().end()) { + for (set::iterator i = miter->second.begin(); + i != miter->second.end(); + ++i) { + dout(10) << __func__ << ": checking missing_loc " << *i << dendl; + boost::optional m = + get_parent()->maybe_get_shard_missing(*i); + if (m) { + assert(!(*m).is_missing(hoid)); + } + have.insert(i->shard); + shards.insert(make_pair(i->shard, *i)); + } + } + } + + set need; + int r = ec_impl->minimum_to_decode(want, have, &need); + if (r < 0) + return r; + + if (!to_read) + return 0; + + for (set::iterator i = need.begin(); + i != need.end(); + ++i) { + assert(shards.count(*i)); + to_read->insert(shards[*i]); + } + return 0; +} + +void ECBackend::start_read_op( + int priority, + map &to_read, + OpRequestRef _op) +{ + tid_t tid = get_parent()->get_tid(); + assert(!tid_to_read_map.count(tid)); + ReadOp &op(tid_to_read_map[tid]); + op.priority = priority; + op.tid = tid; + op.to_read.swap(to_read); + op.op = _op; + dout(10) << __func__ << ": starting " << op << dendl; + + map messages; + for (map::iterator i = op.to_read.begin(); + i != op.to_read.end(); + ++i) { + list > > &reslist = + op.complete[i->first].returned; + bool need_attrs = i->second.want_attrs; + for (set::const_iterator j = i->second.need.begin(); + j != i->second.need.end(); + ++j) { + if (need_attrs) { + messages[*j].attrs_to_read.insert(i->first); + need_attrs = false; + } + op.obj_to_source[i->first].insert(*j); + op.source_to_obj[*j].insert(i->first); + } + for (list >::const_iterator j = + i->second.to_read.begin(); + j != i->second.to_read.end(); + ++j) { + reslist.push_back( + boost::make_tuple( + j->first, + j->second, + map())); + pair chunk_off_len = + sinfo.aligned_offset_len_to_chunk( + *j); + for (set::const_iterator k = i->second.need.begin(); + k != i->second.need.end(); + ++k) { + messages[*k].to_read[i->first].push_back(chunk_off_len); + } + assert(!need_attrs); + } + } + + for (map::iterator i = messages.begin(); + i != messages.end(); + ++i) { + op.in_progress.insert(i->first); + shard_to_read_map[i->first].insert(op.tid); + i->second.tid = tid; + MOSDECSubOpRead *msg = new MOSDECSubOpRead; + msg->set_priority(priority); + msg->pgid = spg_t( + get_parent()->whoami_spg_t().pgid, + i->first.shard); + msg->map_epoch = get_parent()->get_epoch(); + msg->op = i->second; + msg->op.from = get_parent()->whoami_shard(); + msg->op.tid = tid; + get_parent()->send_message_osd_cluster( + i->first.osd, + msg, + get_parent()->get_epoch()); + } + dout(10) << __func__ << ": started " << op << dendl; +} + +ECUtil::HashInfoRef ECBackend::get_hash_info( + const hobject_t &hoid) +{ + dout(10) << __func__ << ": Getting attr on " << hoid << dendl; + ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid); + if (!ref) { + dout(10) << __func__ << ": not in cache " << hoid << dendl; + struct stat st; + int r = store->stat( + coll, + ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + &st); + ECUtil::HashInfo hinfo(ec_impl->get_chunk_count()); + if (r >= 0 && st.st_size > 0) { + dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl; + bufferlist bl; + r = store->getattr( + coll, + ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + ECUtil::get_hinfo_key(), + bl); + if (r >= 0) { + bufferlist::iterator bp = bl.begin(); + ::decode(hinfo, bp); + assert(hinfo.get_total_chunk_size() == (unsigned)st.st_size); + } else { + assert(0 == "missing hash attr"); + } + } + ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo); + } + return ref; +} + +void ECBackend::check_op(Op *op) { + if (op->pending_apply.empty() && op->on_all_applied) { + dout(10) << __func__ << " Calling on_all_applied on " << *op << dendl; + op->on_all_applied->complete(0); + op->on_all_applied = 0; + } + if (op->pending_commit.empty() && op->on_all_commit) { + dout(10) << __func__ << " Calling on_all_commit on " << *op << dendl; + op->on_all_commit->complete(0); + op->on_all_commit = 0; + } + if (op->pending_apply.empty() && op->pending_commit.empty()) { + // done! + assert(writing.front() == op); + dout(10) << __func__ << " Completing " << *op << dendl; + writing.pop_front(); + tid_to_op_map.erase(op->tid); + } + for (map::iterator i = tid_to_op_map.begin(); + i != tid_to_op_map.end(); + ++i) { + dout(20) << __func__ << " tid " << i->first <<": " << i->second << dendl; + } +} + +void ECBackend::start_write(Op *op) { + map trans; + for (set::const_iterator i = + get_parent()->get_actingbackfill_shards().begin(); + i != get_parent()->get_actingbackfill_shards().end(); + ++i) { + trans[i->shard]; + } + op->t->generate_transactions( + op->unstable_hash_infos, + ec_impl, + get_parent()->get_info().pgid.pgid, + sinfo, + &trans, + &(op->temp_added), + &(op->temp_cleared)); + + dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl; + + for (set::const_iterator i = + get_parent()->get_actingbackfill_shards().begin(); + i != get_parent()->get_actingbackfill_shards().end(); + ++i) { + op->pending_apply.insert(*i); + op->pending_commit.insert(*i); + map::iterator iter = + trans.find(i->shard); + assert(iter != trans.end()); + bool should_send = get_parent()->should_send_op(*i, op->hoid); + pg_stat_t stats = + should_send ? + get_info().stats : + parent->get_shard_info().find(*i)->second.stats; + + ECSubWrite sop( + get_parent()->whoami_shard(), + op->tid, + op->reqid, + op->hoid, + stats, + should_send ? iter->second : ObjectStore::Transaction(), + op->version, + op->trim_to, + op->log_entries, + op->temp_added, + op->temp_cleared); + if (*i == get_parent()->whoami_shard()) { + handle_sub_write( + get_parent()->whoami_shard(), + op->client_op, + sop, + op->on_local_applied_sync); + op->on_local_applied_sync = 0; + } else { + MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop); + r->set_priority(cct->_conf->osd_client_op_priority); + r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard); + r->map_epoch = get_parent()->get_epoch(); + get_parent()->send_message_osd_cluster( + i->osd, r, get_parent()->get_epoch()); + } + } } int ECBackend::objects_read_sync( @@ -90,11 +1477,219 @@ int ECBackend::objects_read_sync( return -EOPNOTSUPP; } +struct CallClientContexts : + public GenContext &> { + ECBackend *ec; + ECBackend::ClientAsyncReadStatus *status; + list, + pair > > to_read; + CallClientContexts( + ECBackend *ec, + ECBackend::ClientAsyncReadStatus *status, + const list, + pair > > &to_read) + : ec(ec), status(status), to_read(to_read) {} + void finish(pair &in) { + ECBackend::read_result_t &res = in.second; + assert(res.returned.size() == to_read.size()); + assert(res.r == 0); + assert(res.errors.empty()); + for (list, + pair > >::iterator i = to_read.begin(); + i != to_read.end(); + to_read.erase(i++)) { + pair adjusted = + ec->sinfo.offset_len_to_stripe_bounds(i->first); + assert(res.returned.front().get<0>() == adjusted.first && + res.returned.front().get<1>() == adjusted.second); + map to_decode; + bufferlist bl; + for (map::iterator j = + res.returned.front().get<2>().begin(); + j != res.returned.front().get<2>().end(); + ++j) { + to_decode[j->first.shard].claim(j->second); + } + ECUtil::decode( + ec->sinfo, + ec->ec_impl, + to_decode, + &bl); + assert(i->second.second); + assert(i->second.first); + i->second.first->substr_of( + bl, + i->first.first - adjusted.first, + i->first.second); + if (i->second.second) { + i->second.second->complete(i->second.first->length()); + } + res.returned.pop_front(); + } + status->complete = true; + list &ip = + ec->in_progress_client_reads; + while (ip.size() && ip.front().complete) { + if (ip.front().on_complete) { + ip.front().on_complete->complete(0); + ip.front().on_complete = NULL; + } + ip.pop_front(); + } + } + ~CallClientContexts() { + for (list, + pair > >::iterator i = to_read.begin(); + i != to_read.end(); + to_read.erase(i++)) { + delete i->second.second; + } + } +}; + void ECBackend::objects_read_async( const hobject_t &hoid, const list, pair > > &to_read, Context *on_complete) { + in_progress_client_reads.push_back(ClientAsyncReadStatus(on_complete)); + CallClientContexts *c = new CallClientContexts( + this, &(in_progress_client_reads.back()), to_read); + list > offsets; + for (list, + pair > >::const_iterator i = + to_read.begin(); + i != to_read.end(); + ++i) { + offsets.push_back( + sinfo.offset_len_to_stripe_bounds(i->first)); + } + + set want_to_read; + for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) { + want_to_read.insert(i); + } + set shards; + int r = get_min_avail_to_read_shards( + hoid, + want_to_read, + false, + &shards); + assert(r == 0); + + map for_read_op; + for_read_op.insert( + make_pair( + hoid, + read_request_t( + hoid, + offsets, + shards, + false, + c))); + + start_read_op( + cct->_conf->osd_client_op_priority, + for_read_op, + OpRequestRef()); return; } + + +int ECBackend::objects_get_attrs( + const hobject_t &hoid, + map *out) +{ + int r = store->getattrs( + coll, + ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + *out); + if (r < 0) + return r; + + for (map::iterator i = out->begin(); + i != out->end(); + ) { + if (ECUtil::is_hinfo_key_string(i->first)) + out->erase(i++); + else + ++i; + } + return r; +} + +void ECBackend::rollback_append( + const hobject_t &hoid, + uint64_t old_size, + ObjectStore::Transaction *t) +{ + assert(old_size % sinfo.get_stripe_width() == 0); + t->truncate( + coll, + ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + sinfo.aligned_logical_offset_to_chunk_offset( + old_size)); +} + +void ECBackend::be_deep_scrub( + const hobject_t &poid, + ScrubMap::object &o, + ThreadPool::TPHandle &handle) { + bufferhash h(-1); + int r; + uint64_t stride = cct->_conf->osd_deep_scrub_stride; + if (stride % sinfo.get_chunk_size()) + stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size()); + uint64_t pos = 0; + while (true) { + bufferlist bl; + handle.reset_tp_timeout(); + r = store->read( + coll, + ghobject_t( + poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard), + pos, + stride, bl, + true); + if (r < 0) + break; + if (bl.length() % sinfo.get_chunk_size()) { + r = -EIO; + break; + } + pos += r; + h << bl; + if ((unsigned)r < stride) + break; + } + + ECUtil::HashInfoRef hinfo = get_hash_info(poid); + if (r == -EIO) { + dout(0) << "_scan_list " << poid << " got " + << r << " on read, read_error" << dendl; + o.read_error = true; + } + + if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) { + dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl; + o.read_error = true; + } + + if (hinfo->get_total_chunk_size() != pos) { + dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl; + o.read_error = true; + } + + /* We checked above that we match our own stored hash. We cannot + * send a hash of the actual object, so instead we simply send + * our locally stored hash of shard 0 on the assumption that if + * we match our chunk hash and our recollection of the hash for + * chunk 0 matches that of our peers, there is likely no corruption. + */ + o.digest = hinfo->get_chunk_hash(0); + o.digest_present = true; + + o.omap_digest = 0; + o.omap_digest_present = true; +} diff --git a/src/osd/ECBackend.h b/src/osd/ECBackend.h index 76c3064b78a47..d604093656102 100644 --- a/src/osd/ECBackend.h +++ b/src/osd/ECBackend.h @@ -18,7 +18,17 @@ #include "OSD.h" #include "PGBackend.h" #include "osd_types.h" +#include +#include "erasure-code/ErasureCodeInterface.h" +#include "ECTransaction.h" +#include "ECMsgTypes.h" +#include "ECUtil.h" +#include "messages/MOSDECSubOpWrite.h" +#include "messages/MOSDECSubOpWriteReply.h" +#include "messages/MOSDECSubOpRead.h" +#include "messages/MOSDECSubOpReadReply.h" +struct RecoveryMessages; class ECBackend : public PGBackend { public: RecoveryHandle *open_recovery_op(); @@ -39,7 +49,37 @@ public: bool handle_message( OpRequestRef op ); + bool can_handle_while_inactive( + OpRequestRef op + ); + friend struct SubWriteApplied; + friend struct SubWriteCommitted; + void sub_write_applied( + tid_t tid, eversion_t version); + void sub_write_committed( + tid_t tid, eversion_t version, eversion_t last_complete); + void handle_sub_write( + pg_shard_t from, + OpRequestRef msg, + ECSubWrite &op, + Context *on_local_applied_sync = 0 + ); + void handle_sub_read( + pg_shard_t from, + ECSubRead &op, + ECSubReadReply *reply + ); + void handle_sub_write_reply( + pg_shard_t from, + ECSubWriteReply &op + ); + void handle_sub_read_reply( + pg_shard_t from, + ECSubReadReply &op, + RecoveryMessages *m + ); + /// @see ReadOp below void check_recovery_sources(const OSDMapRef osdmap); void _on_change(ObjectStore::Transaction *t); @@ -49,6 +89,7 @@ public: void dump_recovery_info(Formatter *f) const; + /// @see osd/ECTransaction.cc/h PGTransaction *get_transaction(); void submit_transaction( @@ -71,12 +112,369 @@ public: uint64_t len, bufferlist *bl); + /** + * Async read mechanism + * + * Async reads use the same async read mechanism as does recovery. + * CallClientContexts is responsible for reconstructing the response + * buffer as well as for calling the callbacks. + * + * One tricky bit is that two reads may possibly not read from the same + * set of replicas. This could result in two reads completing in the + * wrong (from the interface user's point of view) order. Thus, we + * maintain a queue of in progress reads (@see in_progress_client_reads) + * to ensure that we always call the completion callback in order. + * + * Another subtely is that while we may read a degraded object, we will + * still only perform a client read from shards in the acting set. This + * ensures that we won't ever have to restart a client initiated read in + * check_recovery_sources. + */ + friend struct CallClientContexts; + struct ClientAsyncReadStatus { + bool complete; + Context *on_complete; + ClientAsyncReadStatus(Context *on_complete) + : complete(false), on_complete(on_complete) {} + }; + list in_progress_client_reads; void objects_read_async( const hobject_t &hoid, const list, pair > > &to_read, Context *on_complete); -}; +private: + friend struct ECRecoveryHandle; + uint64_t get_recovery_chunk_size() const { + uint64_t max = cct->_conf->osd_recovery_max_chunk; + max -= max % sinfo.get_stripe_width(); + max += sinfo.get_stripe_width(); + return max; + } + + /** + * Recovery + * + * Recovery uses the same underlying read mechanism as client reads + * with the slight difference that recovery reads may come from non + * acting shards. Thus, check_recovery_sources may wind up calling + * cancel_pull for a read originating with RecoveryOp. + * + * The recovery process is expressed as a state machine: + * - IDLE: Nothing is currently in progress, reads will be started and + * we will transition to READING + * - READING: We are awaiting a pending read op. Once complete, we will + * decode the buffers and proceed to WRITING + * - WRITING: We are awaiting a completed push. Once complete, we will + * either transition to COMPLETE or to IDLE to continue. + * - COMPLETE: complete + * + * We use the existing Push and PushReply messages and structures to + * handle actually shuffling the data over to the replicas. recovery_info + * and recovery_progress are expressed in terms of the logical offset + * space except for data_included which is in terms of the chunked object + * space (to match the passed buffer). + * + * xattrs are requested on the first read and used to initialize the + * object_context if missing on completion of the first read. + * + * In order to batch up reads and writes, we batch Push, PushReply, + * Transaction, and reads in a RecoveryMessages object which is passed + * among the recovery methods. + */ + struct RecoveryOp { + hobject_t hoid; + eversion_t v; + set missing_on; + set missing_on_shards; + + ObjectRecoveryInfo recovery_info; + ObjectRecoveryProgress recovery_progress; + + bool pending_read; + enum state_t { IDLE, READING, WRITING, COMPLETE } state; + + static const char* tostr(state_t state) { + switch (state) { + case ECBackend::RecoveryOp::IDLE: + return "IDLE"; + break; + case ECBackend::RecoveryOp::READING: + return "READING"; + break; + case ECBackend::RecoveryOp::WRITING: + return "WRITING"; + break; + case ECBackend::RecoveryOp::COMPLETE: + return "COMPLETE"; + break; + default: + assert(0); + return ""; + } + } + + // must be filled if state == WRITING + map returned_data; + map xattrs; + ECUtil::HashInfoRef hinfo; + ObjectContextRef obc; + set waiting_on_pushes; + + // valid in state READING + pair extent_requested; + + void dump(Formatter *f) const; + + RecoveryOp() : pending_read(false), state(IDLE) {} + }; + friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs); + map recovery_ops; + +public: + /** + * Low level async read mechanism + * + * To avoid duplicating the logic for requesting and waiting for + * multiple object shards, there is a common async read mechanism + * taking a map of hobject_t->read_request_t which defines callbacks + * taking read_result_ts as arguments. + * + * tid_to_read_map gives open read ops. check_recovery_sources uses + * shard_to_read_map and ReadOp::source_to_obj to restart reads + * involving down osds. + * + * The user is responsible for specifying replicas on which to read + * and for reassembling the buffer on the other side since client + * reads require the original object buffer while recovery only needs + * the missing pieces. + * + * Rather than handling reads on the primary directly, we simply send + * ourselves a message. This avoids a dedicated primary path for that + * part. + */ + struct read_result_t { + int r; + map errors; + boost::optional > attrs; + list< + boost::tuple< + uint64_t, uint64_t, map > > returned; + read_result_t() : r(0) {} + }; + struct read_request_t { + const list > to_read; + const set need; + const bool want_attrs; + GenContext &> *cb; + read_request_t( + const hobject_t &hoid, + const list > &to_read, + const set &need, + bool want_attrs, + GenContext &> *cb) + : to_read(to_read), need(need), want_attrs(want_attrs), + cb(cb) {} + }; + friend ostream &operator<<(ostream &lhs, const read_request_t &rhs); + + struct ReadOp { + int priority; + tid_t tid; + OpRequestRef op; // may be null if not on behalf of a client + + map to_read; + map complete; + + map > obj_to_source; + map > source_to_obj; + + void dump(Formatter *f) const; + + set in_progress; + }; + friend struct FinishReadOp; + void filter_read_op( + const OSDMapRef osdmap, + ReadOp &op); + void complete_read_op(ReadOp &rop, RecoveryMessages *m); + friend ostream &operator<<(ostream &lhs, const ReadOp &rhs); + map tid_to_read_map; + map > shard_to_read_map; + void start_read_op( + int priority, + map &to_read, + OpRequestRef op); + + + /** + * Client writes + * + * ECTransaction is responsible for generating a transaction for + * each shard to which we need to send the write. As required + * by the PGBackend interface, the ECBackend write mechanism + * passes trim information with the write and last_complete back + * with the reply. + * + * As with client reads, there is a possibility of out-of-order + * completions. Thus, callbacks and completion are called in order + * on the writing list. + */ + struct Op { + hobject_t hoid; + eversion_t version; + eversion_t trim_to; + vector log_entries; + Context *on_local_applied_sync; + Context *on_all_applied; + Context *on_all_commit; + tid_t tid; + osd_reqid_t reqid; + OpRequestRef client_op; + + ECTransaction *t; + + set temp_added; + set temp_cleared; + + set pending_commit; + set pending_apply; + + map unstable_hash_infos; + ~Op() { + delete t; + delete on_local_applied_sync; + delete on_all_applied; + delete on_all_commit; + } + }; + friend ostream &operator<<(ostream &lhs, const Op &rhs); + + void continue_recovery_op( + RecoveryOp &op, + RecoveryMessages *m); + + void dispatch_recovery_messages(RecoveryMessages &m, int priority); + friend struct OnRecoveryReadComplete; + void handle_recovery_read_complete( + const hobject_t &hoid, + boost::tuple > &to_read, + boost::optional > attrs, + RecoveryMessages *m); + void handle_recovery_push( + PushOp &op, + RecoveryMessages *m); + void handle_recovery_push_reply( + PushReplyOp &op, + pg_shard_t from, + RecoveryMessages *m); + + map tid_to_op_map; /// lists below point into here + list writing; + + CephContext *cct; + ErasureCodeInterfaceRef ec_impl; + + + /** + * ECRecPred + * + * Determines the whether _have is suffient to recover an object + */ + class ECRecPred : public IsRecoverablePredicate { + set want; + ErasureCodeInterfaceRef ec_impl; + public: + ECRecPred(ErasureCodeInterfaceRef ec_impl) : ec_impl(ec_impl) { + for (unsigned i = 0; i < ec_impl->get_data_chunk_count(); ++i) { + want.insert(i); + } + } + bool operator()(const set &_have) const { + set have; + for (set::const_iterator i = _have.begin(); + i != _have.end(); + ++i) { + have.insert(i->shard); + } + set min; + return ec_impl->minimum_to_decode(want, have, &min) == 0; + } + }; + IsRecoverablePredicate *get_is_recoverable_predicate() { + return new ECRecPred(ec_impl); + } + + /** + * ECReadPred + * + * Determines the whether _have is suffient to read an object + */ + class ECReadPred : public IsReadablePredicate { + pg_shard_t whoami; + ECRecPred rec_pred; + public: + ECReadPred( + pg_shard_t whoami, + ErasureCodeInterfaceRef ec_impl) : whoami(whoami), rec_pred(ec_impl) {} + bool operator()(const set &_have) const { + return _have.count(whoami) && rec_pred(_have); + } + }; + IsReadablePredicate *get_is_readable_predicate() { + return new ECReadPred(get_parent()->whoami_shard(), ec_impl); + } + + + const ECUtil::stripe_info_t sinfo; + /// If modified, ensure that the ref is held until the update is applied + SharedPtrRegistry unstable_hashinfo_registry; + ECUtil::HashInfoRef get_hash_info(const hobject_t &hoid); + + friend struct ReadCB; + void check_op(Op *op); + void start_write(Op *op); +public: + ECBackend( + PGBackend::Listener *pg, + coll_t coll, + coll_t temp_coll, + ObjectStore *store, + CephContext *cct, + ErasureCodeInterfaceRef ec_impl) + : PGBackend(pg, store, coll, temp_coll), + cct(cct), + ec_impl(ec_impl), + stripe_width(ec_impl->get_chunk_count()), + stripe_size(4*(2<<10) /* TODO: make more flexible */) {} + + /// Returns to_read replicas sufficient to reconstruct want + int get_min_avail_to_read_shards( + const hobject_t &hoid, ///< [in] object + const set &want, ///< [in] desired shards + bool for_recovery, ///< [in] true if we may use non-acting replicas + set *to_read ///< [out] shards to read + ); ///< @return error code, 0 on success + + int objects_get_attrs( + const hobject_t &hoid, + map *out); + + void rollback_append( + const hobject_t &hoid, + uint64_t old_size, + ObjectStore::Transaction *t); + + bool scrub_supported() { return true; } + + void be_deep_scrub( + const hobject_t &obj, + ScrubMap::object &o, + ThreadPool::TPHandle &handle); + uint64_t be_get_ondisk_size(uint64_t logical_size) { + return sinfo.logical_to_next_chunk_offset(logical_size); + } +}; #endif -- 2.39.5