*
*/
+#include <boost/variant.hpp>
+#include <boost/optional.hpp>
+#include <iostream>
+#include <sstream>
+
+#include "ECUtil.h"
#include "ECBackend.h"
+#include "messages/MOSDPGPush.h"
+#include "messages/MOSDPGPushReply.h"
+
+#define dout_subsys ceph_subsys_osd
+#define DOUT_PREFIX_ARGS this
+#undef dout_prefix
+#define dout_prefix _prefix(_dout, this)
+static ostream& _prefix(std::ostream *_dout, ECBackend *pgb) {
+ return *_dout << pgb->get_parent()->gen_dbg_prefix();
+}
-PGBackend::RecoveryHandle *open_recovery_op()
+struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
+ list<ECBackend::RecoveryOp> ops;
+};
+
+static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
{
- return 0;
+ lhs << "[";
+ for (map<pg_shard_t, bufferlist>::const_iterator i = rhs.begin();
+ i != rhs.end();
+ ++i) {
+ if (i != rhs.begin())
+ lhs << ", ";
+ lhs << make_pair(i->first, i->second.length());
+ }
+ return lhs << "]";
+}
+
+static ostream &operator<<(ostream &lhs, const map<int, bufferlist> &rhs)
+{
+ lhs << "[";
+ for (map<int, bufferlist>::const_iterator i = rhs.begin();
+ i != rhs.end();
+ ++i) {
+ if (i != rhs.begin())
+ lhs << ", ";
+ lhs << make_pair(i->first, i->second.length());
+ }
+ return lhs << "]";
+}
+
+static ostream &operator<<(
+ ostream &lhs,
+ const boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &rhs)
+{
+ return lhs << "(" << rhs.get<0>() << ", "
+ << rhs.get<1>() << ", " << rhs.get<2>() << ")";
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::read_request_t &rhs)
+{
+ return lhs << "read_request_t(to_read=[" << rhs.to_read << "]"
+ << ", need=" << rhs.need
+ << ", want_attrs=" << rhs.want_attrs
+ << ")";
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::read_result_t &rhs)
+{
+ lhs << "read_result_t(r=" << rhs.r
+ << ", errors=" << rhs.errors;
+ if (rhs.attrs) {
+ lhs << ", attrs=" << rhs.attrs;
+ } else {
+ lhs << ", noattrs";
+ }
+ return lhs << ", returned=" << rhs.returned;
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::ReadOp &rhs)
+{
+ lhs << "ReadOp(tid=" << rhs.tid;
+ if (rhs.op && rhs.op->get_req()) {
+ lhs << ", op=";
+ rhs.op->get_req()->print(lhs);
+ }
+ return lhs << ", to_read=" << rhs.to_read
+ << ", complete=" << rhs.complete
+ << ", priority=" << rhs.priority
+ << ", obj_to_source=" << rhs.obj_to_source
+ << ", source_to_obj=" << rhs.source_to_obj
+ << ", in_progress=" << rhs.in_progress << ")";
+}
+
+void ECBackend::ReadOp::dump(Formatter *f) const
+{
+ f->dump_stream("tid") << tid;
+ if (op && op->get_req()) {
+ f->dump_stream("op") << *(op->get_req());
+ }
+ f->dump_stream("to_read") << to_read;
+ f->dump_stream("complete") << complete;
+ f->dump_stream("priority") << priority;
+ f->dump_stream("obj_to_source") << obj_to_source;
+ f->dump_stream("source_to_obj") << source_to_obj;
+ f->dump_stream("in_progress") << in_progress;
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::Op &rhs)
+{
+ lhs << "Op(" << rhs.hoid
+ << " v=" << rhs.version
+ << " tt=" << rhs.trim_to
+ << " tid=" << rhs.tid
+ << " reqid=" << rhs.reqid;
+ if (rhs.client_op && rhs.client_op->get_req()) {
+ lhs << " client_op=";
+ rhs.client_op->get_req()->print(lhs);
+ }
+ lhs << " pending_commit=" << rhs.pending_commit
+ << " pending_apply=" << rhs.pending_apply
+ << ")";
+ return lhs;
+}
+
+ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
+{
+ return lhs << "RecoveryOp("
+ << "hoid=" << rhs.hoid
+ << " v=" << rhs.v
+ << " missing_on=" << rhs.missing_on
+ << " missing_on_shards=" << rhs.missing_on_shards
+ << " recovery_info=" << rhs.recovery_info
+ << " recovery_progress=" << rhs.recovery_progress
+ << " pending_read=" << rhs.pending_read
+ << " obc refcount=" << rhs.obc.use_count()
+ << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
+ << " waiting_on_pushes=" << rhs.waiting_on_pushes
+ << " extent_requested=" << rhs.extent_requested;
+}
+
+void ECBackend::RecoveryOp::dump(Formatter *f) const
+{
+ f->dump_stream("hoid") << hoid;
+ f->dump_stream("v") << v;
+ f->dump_stream("missing_on") << missing_on;
+ f->dump_stream("missing_on_shards") << missing_on_shards;
+ f->dump_stream("recovery_info") << recovery_info;
+ f->dump_stream("recovery_progress") << recovery_progress;
+ f->dump_stream("pending_read") << pending_read;
+ f->dump_stream("state") << tostr(state);
+ f->dump_stream("waiting_on_pushes") << waiting_on_pushes;
+ f->dump_stream("extent_requested") << extent_requested;
+}
+
+PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
+{
+ return new ECRecoveryHandle;
+}
+
+struct OnRecoveryReadComplete :
+ public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
+ ECBackend *pg;
+ hobject_t hoid;
+ set<int> want;
+ OnRecoveryReadComplete(ECBackend *pg, const hobject_t &hoid)
+ : pg(pg), hoid(hoid) {}
+ void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) {
+ ECBackend::read_result_t &res = in.second;
+ assert(res.r == 0);
+ assert(res.errors.empty());
+ assert(res.returned.size() == 1);
+ pg->handle_recovery_read_complete(
+ hoid,
+ res.returned.back(),
+ res.attrs,
+ in.first);
+ }
+};
+
+struct RecoveryMessages {
+ map<hobject_t,
+ ECBackend::read_request_t> reads;
+ void read(
+ ECBackend *ec,
+ const hobject_t &hoid, uint64_t off, uint64_t len,
+ const set<pg_shard_t> &need,
+ bool attrs) {
+ list<pair<uint64_t, uint64_t> > to_read;
+ to_read.push_back(make_pair(off, len));
+ assert(!reads.count(hoid));
+ reads.insert(
+ make_pair(
+ hoid,
+ ECBackend::read_request_t(
+ hoid,
+ to_read,
+ need,
+ attrs,
+ new OnRecoveryReadComplete(
+ ec,
+ hoid))));
+ }
+
+ map<pg_shard_t, vector<PushOp> > pushes;
+ map<pg_shard_t, vector<PushReplyOp> > push_replies;
+ ObjectStore::Transaction *t;
+ RecoveryMessages() : t(new ObjectStore::Transaction) {}
+ ~RecoveryMessages() { assert(!t); }
+};
+
+void ECBackend::handle_recovery_push(
+ PushOp &op,
+ RecoveryMessages *m)
+{
+ bool oneshot = op.before_progress.first && op.after_progress.data_complete;
+ coll_t tcoll = oneshot ? coll : get_temp_coll(m->t);
+ if (op.before_progress.first) {
+ get_parent()->on_local_recover_start(
+ op.soid,
+ m->t);
+ m->t->remove(
+ get_temp_coll(m->t),
+ ghobject_t(
+ op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+ m->t->touch(
+ tcoll,
+ ghobject_t(
+ op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+ }
+
+ if (!op.data_included.empty()) {
+ uint64_t start = op.data_included.range_start();
+ uint64_t end = op.data_included.range_end();
+ assert(op.data.length() == (end - start));
+
+ m->t->write(
+ tcoll,
+ ghobject_t(
+ op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ start,
+ op.data.length(),
+ op.data);
+ } else {
+ assert(op.data.length() == 0);
+ }
+
+ if (op.before_progress.first) {
+ if (!oneshot)
+ add_temp_obj(op.soid);
+ assert(op.attrset.count(string("_")));
+ m->t->setattrs(
+ tcoll,
+ ghobject_t(
+ op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ op.attrset);
+ }
+
+ if (op.after_progress.data_complete && !oneshot) {
+ clear_temp_obj(op.soid);
+ m->t->collection_move(
+ coll,
+ tcoll,
+ ghobject_t(
+ op.soid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard));
+ }
+ if (op.after_progress.data_complete) {
+ if ((get_parent()->pgb_is_primary())) {
+ assert(recovery_ops.count(op.soid));
+ assert(recovery_ops[op.soid].obc);
+ object_stat_sum_t stats;
+ stats.num_objects_recovered = 1;
+ stats.num_bytes_recovered = recovery_ops[op.soid].obc->obs.oi.size;
+ get_parent()->on_local_recover(
+ op.soid,
+ stats,
+ op.recovery_info,
+ recovery_ops[op.soid].obc,
+ m->t);
+ } else {
+ get_parent()->on_local_recover(
+ op.soid,
+ object_stat_sum_t(),
+ op.recovery_info,
+ ObjectContextRef(),
+ m->t);
+ }
+ }
+ m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
+ m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
+}
+
+void ECBackend::handle_recovery_push_reply(
+ PushReplyOp &op,
+ pg_shard_t from,
+ RecoveryMessages *m)
+{
+ if (!recovery_ops.count(op.soid))
+ return;
+ RecoveryOp &rop = recovery_ops[op.soid];
+ assert(rop.waiting_on_pushes.count(from));
+ rop.waiting_on_pushes.erase(from);
+ continue_recovery_op(rop, m);
+}
+
+void ECBackend::handle_recovery_read_complete(
+ const hobject_t &hoid,
+ boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
+ boost::optional<map<string, bufferlist> > attrs,
+ RecoveryMessages *m)
+{
+ dout(10) << __func__ << ": returned " << hoid << " "
+ << "(" << to_read.get<0>()
+ << ", " << to_read.get<1>()
+ << ", " << to_read.get<2>()
+ << ")"
+ << dendl;
+ assert(recovery_ops.count(hoid));
+ RecoveryOp &op = recovery_ops[hoid];
+ assert(op.returned_data.empty());
+ map<int, bufferlist*> target;
+ for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
+ i != op.missing_on_shards.end();
+ ++i) {
+ target[*i] = &(op.returned_data[*i]);
+ }
+ map<int, bufferlist> from;
+ for(map<pg_shard_t, bufferlist>::iterator i = to_read.get<2>().begin();
+ i != to_read.get<2>().end();
+ ++i) {
+ from[i->first.shard].claim(i->second);
+ }
+ dout(10) << __func__ << ": " << from << dendl;
+ ECUtil::decode(sinfo, ec_impl, from, target);
+ if (attrs) {
+ op.xattrs.swap(*attrs);
+
+ if (!op.obc) {
+ op.obc = get_parent()->get_obc(hoid, op.xattrs);
+ op.recovery_info.size = op.obc->obs.oi.size;
+ op.recovery_info.oi = op.obc->obs.oi;
+ }
+
+ ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
+ if (op.obc->obs.oi.size > 0) {
+ assert(op.xattrs.count(ECUtil::get_hinfo_key()));
+ bufferlist::iterator bp = op.xattrs[ECUtil::get_hinfo_key()].begin();
+ ::decode(hinfo, bp);
+ }
+ op.hinfo = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
+ }
+ assert(op.xattrs.size());
+ assert(op.obc);
+ continue_recovery_op(op, m);
+}
+
+struct SendPushReplies : public Context {
+ PGBackend::Listener *l;
+ epoch_t epoch;
+ map<int, MOSDPGPushReply*> replies;
+ SendPushReplies(
+ PGBackend::Listener *l,
+ epoch_t epoch,
+ map<int, MOSDPGPushReply*> &in) : l(l), epoch(epoch) {
+ replies.swap(in);
+ }
+ void finish(int) {
+ for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
+ i != replies.end();
+ ++i) {
+ l->send_message_osd_cluster(i->first, i->second, epoch);
+ }
+ replies.clear();
+ }
+ ~SendPushReplies() {
+ for (map<int, MOSDPGPushReply*>::iterator i = replies.begin();
+ i != replies.end();
+ ++i) {
+ i->second->put();
+ }
+ replies.clear();
+ }
+};
+
+void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
+{
+ for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
+ i != m.pushes.end();
+ m.pushes.erase(i++)) {
+ MOSDPGPush *msg = new MOSDPGPush();
+ msg->set_priority(priority);
+ msg->map_epoch = get_parent()->get_epoch();
+ msg->from = get_parent()->whoami_shard();
+ msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
+ msg->pushes.swap(i->second);
+ msg->compute_cost(cct);
+ get_parent()->send_message(
+ i->first.osd,
+ msg);
+ }
+ map<int, MOSDPGPushReply*> replies;
+ for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
+ m.push_replies.begin();
+ i != m.push_replies.end();
+ m.push_replies.erase(i++)) {
+ MOSDPGPushReply *msg = new MOSDPGPushReply();
+ msg->set_priority(priority);
+ msg->map_epoch = get_parent()->get_epoch();
+ msg->from = get_parent()->whoami_shard();
+ msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
+ msg->replies.swap(i->second);
+ msg->compute_cost(cct);
+ replies.insert(make_pair(i->first.osd, msg));
+ }
+ m.t->register_on_complete(
+ get_parent()->bless_context(
+ new SendPushReplies(
+ get_parent(),
+ get_parent()->get_epoch(),
+ replies)));
+ get_parent()->queue_transaction(m.t);
+ m.t = NULL;
+ if (m.reads.empty())
+ return;
+ start_read_op(
+ priority,
+ m.reads,
+ OpRequestRef());
+}
+
+void ECBackend::continue_recovery_op(
+ RecoveryOp &op,
+ RecoveryMessages *m)
+{
+ dout(10) << __func__ << ": continuing " << op << dendl;
+ while (1) {
+ switch (op.state) {
+ case RecoveryOp::IDLE: {
+ // start read
+ op.state = RecoveryOp::READING;
+ assert(!op.recovery_progress.data_complete);
+ set<int> want(op.missing_on_shards.begin(), op.missing_on_shards.end());
+ set<pg_shard_t> to_read;
+ int r = get_min_avail_to_read_shards(
+ op.hoid, want, true, &to_read);
+ assert(r == 0);
+ m->read(
+ this,
+ op.hoid,
+ op.recovery_progress.data_recovered_to,
+ get_recovery_chunk_size(),
+ to_read,
+ op.recovery_progress.first);
+ op.extent_requested = make_pair(op.recovery_progress.data_recovered_to,
+ get_recovery_chunk_size());
+ dout(10) << __func__ << ": IDLE return " << op << dendl;
+ return;
+ }
+ case RecoveryOp::READING: {
+ // read completed, start write
+ assert(op.xattrs.size());
+ assert(op.returned_data.size());
+ op.state = RecoveryOp::WRITING;
+ ObjectRecoveryProgress after_progress = op.recovery_progress;
+ after_progress.data_recovered_to += get_recovery_chunk_size();
+ after_progress.first = false;
+ if (after_progress.data_recovered_to >= op.obc->obs.oi.size) {
+ after_progress.data_recovered_to =
+ sinfo.logical_to_next_stripe_offset(
+ op.obc->obs.oi.size);
+ after_progress.data_complete = true;
+ }
+ for (set<pg_shard_t>::iterator mi = op.missing_on.begin();
+ mi != op.missing_on.end();
+ ++mi) {
+ assert(op.returned_data.count(mi->shard));
+ m->pushes[*mi].push_back(PushOp());
+ PushOp &pop = m->pushes[*mi].back();
+ pop.soid = op.hoid;
+ pop.version = op.v;
+ pop.data = op.returned_data[mi->shard];
+ dout(10) << __func__ << ": before_progress=" << op.recovery_progress
+ << ", after_progress=" << after_progress
+ << ", pop.data.length()=" << pop.data.length()
+ << ", size=" << op.obc->obs.oi.size << dendl;
+ assert(
+ pop.data.length() ==
+ sinfo.aligned_logical_offset_to_chunk_offset(
+ after_progress.data_recovered_to -
+ op.recovery_progress.data_recovered_to)
+ );
+ if (pop.data.length())
+ pop.data_included.insert(
+ sinfo.aligned_logical_offset_to_chunk_offset(
+ op.recovery_progress.data_recovered_to),
+ pop.data.length()
+ );
+ if (op.recovery_progress.first) {
+ pop.attrset = op.xattrs;
+ }
+ pop.recovery_info = op.recovery_info;
+ pop.before_progress = op.recovery_progress;
+ pop.after_progress = after_progress;
+ if (*mi != get_parent()->primary_shard())
+ get_parent()->begin_peer_recover(
+ *mi,
+ op.hoid);
+ }
+ op.returned_data.clear();
+ op.waiting_on_pushes = op.missing_on;
+ op.recovery_progress = after_progress;
+ dout(10) << __func__ << ": READING return " << op << dendl;
+ return;
+ }
+ case RecoveryOp::WRITING: {
+ if (op.waiting_on_pushes.empty()) {
+ if (op.recovery_progress.data_complete) {
+ op.state = RecoveryOp::COMPLETE;
+ for (set<pg_shard_t>::iterator i = op.missing_on.begin();
+ i != op.missing_on.end();
+ ++i) {
+ if (*i != get_parent()->primary_shard()) {
+ dout(10) << __func__ << ": on_peer_recover on " << *i
+ << ", obj " << op.hoid << dendl;
+ get_parent()->on_peer_recover(
+ *i,
+ op.hoid,
+ op.recovery_info,
+ object_stat_sum_t());
+ }
+ }
+ get_parent()->on_global_recover(op.hoid);
+ dout(10) << __func__ << ": WRITING return " << op << dendl;
+ recovery_ops.erase(op.hoid);
+ return;
+ } else {
+ op.state = RecoveryOp::IDLE;
+ dout(10) << __func__ << ": WRITING continue " << op << dendl;
+ continue;
+ }
+ }
+ return;
+ }
+ case RecoveryOp::COMPLETE: {
+ assert(0); // should never be called once complete
+ };
+ default:
+ assert(0);
+ }
+ }
}
void ECBackend::run_recovery_op(
- RecoveryHandle *h,
+ RecoveryHandle *_h,
int priority)
{
+ ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
+ RecoveryMessages m;
+ for (list<RecoveryOp>::iterator i = h->ops.begin();
+ i != h->ops.end();
+ ++i) {
+ dout(10) << __func__ << ": starting " << *i << dendl;
+ assert(!recovery_ops.count(i->hoid));
+ RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
+ continue_recovery_op(op, &m);
+ }
+ dispatch_recovery_messages(m, priority);
+ delete _h;
}
void ECBackend::recover_object(
eversion_t v,
ObjectContextRef head,
ObjectContextRef obc,
- RecoveryHandle *h)
+ RecoveryHandle *_h)
+{
+ ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
+ h->ops.push_back(RecoveryOp());
+ h->ops.back().v = v;
+ h->ops.back().hoid = hoid;
+ h->ops.back().obc = obc;
+ h->ops.back().recovery_info.soid = hoid;
+ h->ops.back().recovery_info.version = v;
+ if (obc) {
+ h->ops.back().recovery_info.size = obc->obs.oi.size;
+ h->ops.back().recovery_info.oi = obc->obs.oi;
+ }
+ h->ops.back().recovery_progress.omap_complete = true;
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_actingbackfill_shards().begin();
+ i != get_parent()->get_actingbackfill_shards().end();
+ ++i) {
+ dout(10) << "checking " << *i << dendl;
+ if (get_parent()->get_shard_missing(*i).is_missing(hoid)) {
+ h->ops.back().missing_on.insert(*i);
+ h->ops.back().missing_on_shards.insert(i->shard);
+ }
+ }
+ dout(10) << __func__ << ": built op " << h->ops.back() << dendl;
+}
+
+bool ECBackend::can_handle_while_inactive(
+ OpRequestRef _op)
{
+ return false;
}
bool ECBackend::handle_message(
- OpRequestRef op)
+ OpRequestRef _op)
{
+ dout(10) << __func__ << ": " << *_op->get_req() << dendl;
+ int priority = _op->get_req()->get_priority();
+ switch (_op->get_req()->get_type()) {
+ case MSG_OSD_EC_WRITE: {
+ MOSDECSubOpWrite *op = static_cast<MOSDECSubOpWrite*>(_op->get_req());
+ handle_sub_write(op->op.from, _op, op->op);
+ return true;
+ }
+ case MSG_OSD_EC_WRITE_REPLY: {
+ MOSDECSubOpWriteReply *op = static_cast<MOSDECSubOpWriteReply*>(
+ _op->get_req());
+ op->set_priority(priority);
+ handle_sub_write_reply(op->op.from, op->op);
+ return true;
+ }
+ case MSG_OSD_EC_READ: {
+ MOSDECSubOpRead *op = static_cast<MOSDECSubOpRead*>(_op->get_req());
+ MOSDECSubOpReadReply *reply = new MOSDECSubOpReadReply;
+ reply->pgid = get_parent()->primary_spg_t();
+ reply->map_epoch = get_parent()->get_epoch();
+ handle_sub_read(op->op.from, op->op, &(reply->op));
+ op->set_priority(priority);
+ get_parent()->send_message_osd_cluster(
+ op->op.from.osd, reply, get_parent()->get_epoch());
+ return true;
+ }
+ case MSG_OSD_EC_READ_REPLY: {
+ MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
+ _op->get_req());
+ RecoveryMessages rm;
+ handle_sub_read_reply(op->op.from, op->op, &rm);
+ dispatch_recovery_messages(rm, priority);
+ return true;
+ }
+ case MSG_OSD_PG_PUSH: {
+ MOSDPGPush *op = static_cast<MOSDPGPush *>(_op->get_req());
+ RecoveryMessages rm;
+ for (vector<PushOp>::iterator i = op->pushes.begin();
+ i != op->pushes.end();
+ ++i) {
+ handle_recovery_push(*i, &rm);
+ }
+ dispatch_recovery_messages(rm, priority);
+ return true;
+ }
+ case MSG_OSD_PG_PUSH_REPLY: {
+ MOSDPGPushReply *op = static_cast<MOSDPGPushReply *>(_op->get_req());
+ RecoveryMessages rm;
+ for (vector<PushReplyOp>::iterator i = op->replies.begin();
+ i != op->replies.end();
+ ++i) {
+ handle_recovery_push_reply(*i, op->from, &rm);
+ }
+ dispatch_recovery_messages(rm, priority);
+ return true;
+ }
+ default:
+ return false;
+ }
return false;
}
+struct SubWriteCommitted : public Context {
+ ECBackend *pg;
+ OpRequestRef msg;
+ tid_t tid;
+ eversion_t version;
+ eversion_t last_complete;
+ SubWriteCommitted(
+ ECBackend *pg,
+ OpRequestRef msg,
+ tid_t tid,
+ eversion_t version,
+ eversion_t last_complete)
+ : pg(pg), msg(msg), tid(tid),
+ version(version), last_complete(last_complete) {}
+ void finish(int) {
+ if (msg)
+ msg->mark_event("sub_op_committed");
+ pg->sub_write_committed(tid, version, last_complete);
+ }
+};
+void ECBackend::sub_write_committed(
+ tid_t tid, eversion_t version, eversion_t last_complete) {
+ if (get_parent()->pgb_is_primary()) {
+ ECSubWriteReply reply;
+ reply.tid = tid;
+ reply.last_complete = last_complete;
+ reply.committed = true;
+ reply.from = get_parent()->whoami_shard();
+ handle_sub_write_reply(
+ get_parent()->whoami_shard(),
+ reply);
+ } else {
+ get_parent()->update_last_complete_ondisk(last_complete);
+ MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
+ r->pgid = get_parent()->primary_spg_t();
+ r->map_epoch = get_parent()->get_epoch();
+ r->op.tid = tid;
+ r->op.last_complete = last_complete;
+ r->op.committed = true;
+ r->op.from = get_parent()->whoami_shard();
+ get_parent()->send_message_osd_cluster(
+ get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
+ }
+}
+
+struct SubWriteApplied : public Context {
+ ECBackend *pg;
+ OpRequestRef msg;
+ tid_t tid;
+ eversion_t version;
+ SubWriteApplied(
+ ECBackend *pg,
+ OpRequestRef msg,
+ tid_t tid,
+ eversion_t version)
+ : pg(pg), msg(msg), tid(tid), version(version) {}
+ void finish(int) {
+ if (msg)
+ msg->mark_event("sub_op_applied");
+ pg->sub_write_applied(tid, version);
+ }
+};
+void ECBackend::sub_write_applied(
+ tid_t tid, eversion_t version) {
+ parent->op_applied(version);
+ if (get_parent()->pgb_is_primary()) {
+ ECSubWriteReply reply;
+ reply.from = get_parent()->whoami_shard();
+ reply.tid = tid;
+ reply.applied = true;
+ handle_sub_write_reply(
+ get_parent()->whoami_shard(),
+ reply);
+ } else {
+ MOSDECSubOpWriteReply *r = new MOSDECSubOpWriteReply;
+ r->pgid = get_parent()->primary_spg_t();
+ r->map_epoch = get_parent()->get_epoch();
+ r->op.from = get_parent()->whoami_shard();
+ r->op.tid = tid;
+ r->op.applied = true;
+ get_parent()->send_message_osd_cluster(
+ get_parent()->primary_shard().osd, r, get_parent()->get_epoch());
+ }
+}
+
+void ECBackend::handle_sub_write(
+ pg_shard_t from,
+ OpRequestRef msg,
+ ECSubWrite &op,
+ Context *on_local_applied_sync)
+{
+ if (msg)
+ msg->mark_started();
+ assert(!get_parent()->get_log().get_missing().is_missing(op.soid));
+ if (!get_parent()->pgb_is_primary())
+ get_parent()->update_stats(op.stats);
+ ObjectStore::Transaction *localt = new ObjectStore::Transaction;
+ get_parent()->log_operation(
+ op.log_entries,
+ op.trim_to,
+ !(op.t.empty()),
+ localt);
+ localt->append(op.t);
+ if (on_local_applied_sync) {
+ dout(10) << "Queueing onreadable_sync: " << on_local_applied_sync << dendl;
+ localt->register_on_applied_sync(on_local_applied_sync);
+ }
+ localt->register_on_commit(
+ get_parent()->bless_context(
+ new SubWriteCommitted(
+ this, msg, op.tid,
+ op.at_version,
+ get_parent()->get_info().last_complete)));
+ localt->register_on_applied(
+ get_parent()->bless_context(
+ new SubWriteApplied(this, msg, op.tid, op.at_version)));
+ get_parent()->queue_transaction(localt, msg);
+}
+
+void ECBackend::handle_sub_read(
+ pg_shard_t from,
+ ECSubRead &op,
+ ECSubReadReply *reply)
+{
+ for(map<hobject_t, list<pair<uint64_t, uint64_t> > >::iterator i =
+ op.to_read.begin();
+ i != op.to_read.end();
+ ++i) {
+ for (list<pair<uint64_t, uint64_t> >::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ bufferlist bl;
+ int r = store->read(
+ i->first.is_temp() ? temp_coll : coll,
+ ghobject_t(
+ i->first, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ j->first,
+ j->second,
+ bl,
+ false);
+ if (r < 0) {
+ assert(0);
+ reply->buffers_read.erase(i->first);
+ reply->errors[i->first] = r;
+ break;
+ } else {
+ reply->buffers_read[i->first].push_back(
+ make_pair(
+ j->first,
+ bl)
+ );
+ }
+ }
+ }
+ for (set<hobject_t>::iterator i = op.attrs_to_read.begin();
+ i != op.attrs_to_read.end();
+ ++i) {
+ dout(10) << __func__ << ": fulfilling attr request on "
+ << *i << dendl;
+ if (reply->errors.count(*i))
+ continue;
+ int r = store->getattrs(
+ i->is_temp() ? temp_coll : coll,
+ ghobject_t(
+ *i, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ reply->attrs_read[*i]);
+ if (r < 0) {
+ assert(0);
+ reply->buffers_read.erase(*i);
+ reply->errors[*i] = r;
+ }
+ }
+ reply->from = get_parent()->whoami_shard();
+ reply->tid = op.tid;
+}
+
+void ECBackend::handle_sub_write_reply(
+ pg_shard_t from,
+ ECSubWriteReply &op)
+{
+ map<tid_t, Op>::iterator i = tid_to_op_map.find(op.tid);
+ assert(i != tid_to_op_map.end());
+ if (op.committed) {
+ assert(i->second.pending_commit.count(from));
+ i->second.pending_commit.erase(from);
+ if (from != get_parent()->whoami_shard()) {
+ get_parent()->update_peer_last_complete_ondisk(from, op.last_complete);
+ }
+ }
+ if (op.applied) {
+ assert(i->second.pending_apply.count(from));
+ i->second.pending_apply.erase(from);
+ }
+ check_op(&(i->second));
+}
+
+void ECBackend::handle_sub_read_reply(
+ pg_shard_t from,
+ ECSubReadReply &op,
+ RecoveryMessages *m)
+{
+ dout(10) << __func__ << ": reply " << op << dendl;
+ map<tid_t, ReadOp>::iterator iter = tid_to_read_map.find(op.tid);
+ if (iter == tid_to_read_map.end()) {
+ //canceled
+ return;
+ }
+ ReadOp &rop = iter->second;
+ for (map<hobject_t, list<pair<uint64_t, bufferlist> > >::iterator i =
+ op.buffers_read.begin();
+ i != op.buffers_read.end();
+ ++i) {
+ assert(!op.errors.count(i->first));
+ assert(rop.to_read.count(i->first));
+ list<pair<uint64_t, uint64_t> >::const_iterator req_iter =
+ rop.to_read.find(i->first)->second.to_read.begin();
+ list<
+ boost::tuple<
+ uint64_t, uint64_t, map<pg_shard_t, bufferlist> > >::iterator riter =
+ rop.complete[i->first].returned.begin();
+ for (list<pair<uint64_t, bufferlist> >::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j, ++req_iter, ++riter) {
+ assert(req_iter != rop.to_read.find(i->first)->second.to_read.end());
+ assert(riter != rop.complete[i->first].returned.end());
+ pair<uint64_t, uint64_t> adjusted =
+ sinfo.aligned_offset_len_to_chunk(
+ *req_iter);
+ assert(adjusted.first == j->first);
+ riter->get<2>()[from].claim(j->second);
+ }
+ }
+ for (map<hobject_t, map<string, bufferlist> >::iterator i = op.attrs_read.begin();
+ i != op.attrs_read.end();
+ ++i) {
+ assert(!op.errors.count(i->first));
+ assert(rop.to_read.count(i->first));
+ rop.complete[i->first].attrs = map<string, bufferlist>();
+ (*(rop.complete[i->first].attrs)).swap(i->second);
+ }
+ for (map<hobject_t, int>::iterator i = op.errors.begin();
+ i != op.errors.end();
+ ++i) {
+ rop.complete[i->first].errors.insert(
+ make_pair(
+ from,
+ i->second));
+ if (rop.complete[i->first].r == 0)
+ rop.complete[i->first].r = i->second;
+ }
+
+ map<pg_shard_t, set<tid_t> >::iterator siter = shard_to_read_map.find(from);
+ assert(siter != shard_to_read_map.end());
+ assert(siter->second.count(op.tid));
+ siter->second.erase(op.tid);
+
+ assert(rop.in_progress.count(from));
+ rop.in_progress.erase(from);
+ if (!rop.in_progress.empty()) {
+ dout(10) << __func__ << " readop not complete: " << rop << dendl;
+ } else {
+ dout(10) << __func__ << " readop complete: " << rop << dendl;
+ complete_read_op(rop, m);
+ }
+}
+
+void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
+{
+ map<hobject_t, read_request_t>::iterator reqiter =
+ rop.to_read.begin();
+ map<hobject_t, read_result_t>::iterator resiter =
+ rop.complete.begin();
+ assert(rop.to_read.size() == rop.complete.size());
+ for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
+ if (reqiter->second.cb) {
+ pair<RecoveryMessages *, read_result_t &> arg(
+ m, resiter->second);
+ reqiter->second.cb->complete(arg);
+ reqiter->second.cb = NULL;
+ }
+ }
+ tid_to_read_map.erase(rop.tid);
+}
+
+struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
+ ECBackend *ec;
+ tid_t tid;
+ FinishReadOp(ECBackend *ec, tid_t tid) : ec(ec), tid(tid) {}
+ void finish(ThreadPool::TPHandle &handle) {
+ assert(ec->tid_to_read_map.count(tid));
+ int priority = ec->tid_to_read_map[tid].priority;
+ RecoveryMessages rm;
+ ec->complete_read_op(ec->tid_to_read_map[tid], &rm);
+ ec->dispatch_recovery_messages(rm, priority);
+ }
+};
+
+void ECBackend::filter_read_op(
+ const OSDMapRef osdmap,
+ ReadOp &op)
+{
+ for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
+ i != op.source_to_obj.end();
+ ) {
+ if (!osdmap->is_down(i->first.osd)) {
+ ++i;
+ continue;
+ }
+ for (set<hobject_t>::iterator j = i->second.begin();
+ j != i->second.end();
+ ++j) {
+ get_parent()->cancel_pull(*j);
+
+ assert(op.to_read.count(*j));
+ read_request_t &req = op.to_read.find(*j)->second;
+ assert(req.cb);
+ delete req.cb;
+ req.cb = NULL;
+
+ op.to_read.erase(*j);
+ op.complete.erase(*j);
+ op.obj_to_source.erase(*j);
+ op.in_progress.erase(i->first);
+ recovery_ops.erase(*j);
+ if (op.in_progress.empty()) {
+ get_parent()->schedule_work(
+ get_parent()->bless_gencontext(
+ new FinishReadOp(this, op.tid)));
+ }
+ }
+ op.source_to_obj.erase(i++);
+ }
+};
+
void ECBackend::check_recovery_sources(const OSDMapRef osdmap)
{
+ set<tid_t> tids_to_filter;
+ for (map<pg_shard_t, set<tid_t> >::iterator i = shard_to_read_map.begin();
+ i != shard_to_read_map.end();
+ ) {
+ if (osdmap->is_down(i->first.osd)) {
+ tids_to_filter.insert(i->second.begin(), i->second.end());
+ shard_to_read_map.erase(i++);
+ } else {
+ ++i;
+ }
+ }
+ for (set<tid_t>::iterator i = tids_to_filter.begin();
+ i != tids_to_filter.end();
+ ++i) {
+ map<tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
+ assert(j != tid_to_read_map.end());
+ filter_read_op(osdmap, j->second);
+ }
}
void ECBackend::_on_change(ObjectStore::Transaction *t)
{
+ writing.clear();
+ tid_to_op_map.clear();
+ for (map<tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
+ i != tid_to_read_map.end();
+ ++i) {
+ dout(10) << __func__ << ": cancelling " << i->second << dendl;
+ for (map<hobject_t, read_request_t>::iterator j =
+ i->second.to_read.begin();
+ j != i->second.to_read.end();
+ ++j) {
+ delete j->second.cb;
+ j->second.cb = 0;
+ }
+ }
+ tid_to_read_map.clear();
+ for (list<ClientAsyncReadStatus>::iterator i = in_progress_client_reads.begin();
+ i != in_progress_client_reads.end();
+ ++i) {
+ delete i->on_complete;
+ i->on_complete = NULL;
+ }
+ in_progress_client_reads.clear();
+ shard_to_read_map.clear();
+ clear_state();
}
void ECBackend::clear_state()
{
+ recovery_ops.clear();
}
void ECBackend::on_flushed()
{
}
-
void ECBackend::dump_recovery_info(Formatter *f) const
{
+ f->open_array_section("recovery_ops");
+ for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
+ i != recovery_ops.end();
+ ++i) {
+ f->open_object_section("op");
+ i->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
+ f->open_array_section("read_ops");
+ for (map<tid_t, ReadOp>::const_iterator i = tid_to_read_map.begin();
+ i != tid_to_read_map.end();
+ ++i) {
+ f->open_object_section("read_op");
+ i->second.dump(f);
+ f->close_section();
+ }
+ f->close_section();
}
PGBackend::PGTransaction *ECBackend::get_transaction()
return new ECTransaction;
}
+struct MustPrependHashInfo : public ObjectModDesc::Visitor {
+ enum { EMPTY, FOUND_APPEND, FOUND_CREATE_STASH } state;
+ MustPrependHashInfo() : state(EMPTY) {}
+ void append(uint64_t) {
+ if (state == EMPTY) {
+ state = FOUND_APPEND;
+ }
+ }
+ void rmobject(version_t) {
+ if (state == EMPTY) {
+ state = FOUND_CREATE_STASH;
+ }
+ }
+ void create() {
+ if (state == EMPTY) {
+ state = FOUND_CREATE_STASH;
+ }
+ }
+ bool must_prepend_hash_info() const { return state == FOUND_APPEND; }
+};
+
void ECBackend::submit_transaction(
const hobject_t &hoid,
const eversion_t &at_version,
- PGTransaction *t,
+ PGTransaction *_t,
const eversion_t &trim_to,
vector<pg_log_entry_t> &log_entries,
Context *on_local_applied_sync,
Context *on_all_commit,
tid_t tid,
osd_reqid_t reqid,
- OpRequestRef op)
+ OpRequestRef client_op
+ )
+{
+ assert(!tid_to_op_map.count(tid));
+ Op *op = &(tid_to_op_map[tid]);
+ op->hoid = hoid;
+ op->version = at_version;
+ op->trim_to = trim_to;
+ op->log_entries.swap(log_entries);
+ op->on_local_applied_sync = on_local_applied_sync;
+ op->on_all_applied = on_all_applied;
+ op->on_all_commit = on_all_commit;
+ op->tid = tid;
+ op->reqid = reqid;
+ op->client_op = client_op;
+
+ op->t = static_cast<ECTransaction*>(_t);
+
+ set<hobject_t> need_hinfos;
+ op->t->get_append_objects(&need_hinfos);
+ for (set<hobject_t>::iterator i = need_hinfos.begin();
+ i != need_hinfos.end();
+ ++i) {
+ op->unstable_hash_infos.insert(
+ make_pair(
+ *i,
+ get_hash_info(*i)));
+ }
+
+ for (vector<pg_log_entry_t>::iterator i = op->log_entries.begin();
+ i != op->log_entries.end();
+ ++i) {
+ MustPrependHashInfo vis;
+ i->mod_desc.visit(&vis);
+ if (vis.must_prepend_hash_info()) {
+ dout(10) << __func__ << ": stashing HashInfo for "
+ << i->soid << " for entry " << *i << dendl;
+ assert(op->unstable_hash_infos.count(i->soid));
+ ObjectModDesc desc;
+ map<string, boost::optional<bufferlist> > old_attrs;
+ bufferlist old_hinfo;
+ ::encode(*(op->unstable_hash_infos[i->soid]), old_hinfo);
+ old_attrs[ECUtil::get_hinfo_key()] = old_hinfo;
+ desc.setattrs(old_attrs);
+ i->mod_desc.swap(desc);
+ i->mod_desc.claim_append(desc);
+ assert(i->mod_desc.can_rollback());
+ }
+ }
+
+ dout(10) << __func__ << ": op " << *op << " starting" << dendl;
+ start_write(op);
+ writing.push_back(op);
+ dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
+}
+
+int ECBackend::get_min_avail_to_read_shards(
+ const hobject_t &hoid,
+ const set<int> &want,
+ bool for_recovery,
+ set<pg_shard_t> *to_read)
+{
+ map<hobject_t, set<pg_shard_t> >::const_iterator miter =
+ get_parent()->get_missing_loc_shards().find(hoid);
+
+ set<int> have;
+ map<shard_id_t, pg_shard_t> shards;
+
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_acting_shards().begin();
+ i != get_parent()->get_acting_shards().end();
+ ++i) {
+ dout(10) << __func__ << ": checking acting " << *i << dendl;
+ const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+ if (!missing.is_missing(hoid)) {
+ assert(!have.count(i->shard));
+ have.insert(i->shard);
+ assert(!shards.count(i->shard));
+ shards.insert(make_pair(i->shard, *i));
+ }
+ }
+
+ if (for_recovery) {
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_backfill_shards().begin();
+ i != get_parent()->get_backfill_shards().end();
+ ++i) {
+ if (have.count(i->shard)) {
+ assert(shards.count(i->shard));
+ continue;
+ }
+ dout(10) << __func__ << ": checking backfill " << *i << dendl;
+ assert(!shards.count(i->shard));
+ const pg_info_t &info = get_parent()->get_shard_info(*i);
+ const pg_missing_t &missing = get_parent()->get_shard_missing(*i);
+ if (hoid < info.last_backfill && !missing.is_missing(hoid)) {
+ have.insert(i->shard);
+ shards.insert(make_pair(i->shard, *i));
+ }
+ }
+
+ if (miter != get_parent()->get_missing_loc_shards().end()) {
+ for (set<pg_shard_t>::iterator i = miter->second.begin();
+ i != miter->second.end();
+ ++i) {
+ dout(10) << __func__ << ": checking missing_loc " << *i << dendl;
+ boost::optional<const pg_missing_t &> m =
+ get_parent()->maybe_get_shard_missing(*i);
+ if (m) {
+ assert(!(*m).is_missing(hoid));
+ }
+ have.insert(i->shard);
+ shards.insert(make_pair(i->shard, *i));
+ }
+ }
+ }
+
+ set<int> need;
+ int r = ec_impl->minimum_to_decode(want, have, &need);
+ if (r < 0)
+ return r;
+
+ if (!to_read)
+ return 0;
+
+ for (set<int>::iterator i = need.begin();
+ i != need.end();
+ ++i) {
+ assert(shards.count(*i));
+ to_read->insert(shards[*i]);
+ }
+ return 0;
+}
+
+void ECBackend::start_read_op(
+ int priority,
+ map<hobject_t, read_request_t> &to_read,
+ OpRequestRef _op)
+{
+ tid_t tid = get_parent()->get_tid();
+ assert(!tid_to_read_map.count(tid));
+ ReadOp &op(tid_to_read_map[tid]);
+ op.priority = priority;
+ op.tid = tid;
+ op.to_read.swap(to_read);
+ op.op = _op;
+ dout(10) << __func__ << ": starting " << op << dendl;
+
+ map<pg_shard_t, ECSubRead> messages;
+ for (map<hobject_t, read_request_t>::iterator i = op.to_read.begin();
+ i != op.to_read.end();
+ ++i) {
+ list<boost::tuple<
+ uint64_t, uint64_t, map<pg_shard_t, bufferlist> > > &reslist =
+ op.complete[i->first].returned;
+ bool need_attrs = i->second.want_attrs;
+ for (set<pg_shard_t>::const_iterator j = i->second.need.begin();
+ j != i->second.need.end();
+ ++j) {
+ if (need_attrs) {
+ messages[*j].attrs_to_read.insert(i->first);
+ need_attrs = false;
+ }
+ op.obj_to_source[i->first].insert(*j);
+ op.source_to_obj[*j].insert(i->first);
+ }
+ for (list<pair<uint64_t, uint64_t> >::const_iterator j =
+ i->second.to_read.begin();
+ j != i->second.to_read.end();
+ ++j) {
+ reslist.push_back(
+ boost::make_tuple(
+ j->first,
+ j->second,
+ map<pg_shard_t, bufferlist>()));
+ pair<uint64_t, uint64_t> chunk_off_len =
+ sinfo.aligned_offset_len_to_chunk(
+ *j);
+ for (set<pg_shard_t>::const_iterator k = i->second.need.begin();
+ k != i->second.need.end();
+ ++k) {
+ messages[*k].to_read[i->first].push_back(chunk_off_len);
+ }
+ assert(!need_attrs);
+ }
+ }
+
+ for (map<pg_shard_t, ECSubRead>::iterator i = messages.begin();
+ i != messages.end();
+ ++i) {
+ op.in_progress.insert(i->first);
+ shard_to_read_map[i->first].insert(op.tid);
+ i->second.tid = tid;
+ MOSDECSubOpRead *msg = new MOSDECSubOpRead;
+ msg->set_priority(priority);
+ msg->pgid = spg_t(
+ get_parent()->whoami_spg_t().pgid,
+ i->first.shard);
+ msg->map_epoch = get_parent()->get_epoch();
+ msg->op = i->second;
+ msg->op.from = get_parent()->whoami_shard();
+ msg->op.tid = tid;
+ get_parent()->send_message_osd_cluster(
+ i->first.osd,
+ msg,
+ get_parent()->get_epoch());
+ }
+ dout(10) << __func__ << ": started " << op << dendl;
+}
+
+ECUtil::HashInfoRef ECBackend::get_hash_info(
+ const hobject_t &hoid)
+{
+ dout(10) << __func__ << ": Getting attr on " << hoid << dendl;
+ ECUtil::HashInfoRef ref = unstable_hashinfo_registry.lookup(hoid);
+ if (!ref) {
+ dout(10) << __func__ << ": not in cache " << hoid << dendl;
+ struct stat st;
+ int r = store->stat(
+ coll,
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ &st);
+ ECUtil::HashInfo hinfo(ec_impl->get_chunk_count());
+ if (r >= 0 && st.st_size > 0) {
+ dout(10) << __func__ << ": found on disk, size " << st.st_size << dendl;
+ bufferlist bl;
+ r = store->getattr(
+ coll,
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ ECUtil::get_hinfo_key(),
+ bl);
+ if (r >= 0) {
+ bufferlist::iterator bp = bl.begin();
+ ::decode(hinfo, bp);
+ assert(hinfo.get_total_chunk_size() == (unsigned)st.st_size);
+ } else {
+ assert(0 == "missing hash attr");
+ }
+ }
+ ref = unstable_hashinfo_registry.lookup_or_create(hoid, hinfo);
+ }
+ return ref;
+}
+
+void ECBackend::check_op(Op *op)
{
+ if (op->pending_apply.empty() && op->on_all_applied) {
+ dout(10) << __func__ << " Calling on_all_applied on " << *op << dendl;
+ op->on_all_applied->complete(0);
+ op->on_all_applied = 0;
+ }
+ if (op->pending_commit.empty() && op->on_all_commit) {
+ dout(10) << __func__ << " Calling on_all_commit on " << *op << dendl;
+ op->on_all_commit->complete(0);
+ op->on_all_commit = 0;
+ }
+ if (op->pending_apply.empty() && op->pending_commit.empty()) {
+ // done!
+ assert(writing.front() == op);
+ dout(10) << __func__ << " Completing " << *op << dendl;
+ writing.pop_front();
+ tid_to_op_map.erase(op->tid);
+ }
+ for (map<tid_t, Op>::iterator i = tid_to_op_map.begin();
+ i != tid_to_op_map.end();
+ ++i) {
+ dout(20) << __func__ << " tid " << i->first <<": " << i->second << dendl;
+ }
+}
+
+void ECBackend::start_write(Op *op) {
+ map<shard_id_t, ObjectStore::Transaction> trans;
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_actingbackfill_shards().begin();
+ i != get_parent()->get_actingbackfill_shards().end();
+ ++i) {
+ trans[i->shard];
+ }
+ op->t->generate_transactions(
+ op->unstable_hash_infos,
+ ec_impl,
+ get_parent()->get_info().pgid.pgid,
+ sinfo,
+ &trans,
+ &(op->temp_added),
+ &(op->temp_cleared));
+
+ dout(10) << "onreadable_sync: " << op->on_local_applied_sync << dendl;
+
+ for (set<pg_shard_t>::const_iterator i =
+ get_parent()->get_actingbackfill_shards().begin();
+ i != get_parent()->get_actingbackfill_shards().end();
+ ++i) {
+ op->pending_apply.insert(*i);
+ op->pending_commit.insert(*i);
+ map<shard_id_t, ObjectStore::Transaction>::iterator iter =
+ trans.find(i->shard);
+ assert(iter != trans.end());
+ bool should_send = get_parent()->should_send_op(*i, op->hoid);
+ pg_stat_t stats =
+ should_send ?
+ get_info().stats :
+ parent->get_shard_info().find(*i)->second.stats;
+
+ ECSubWrite sop(
+ get_parent()->whoami_shard(),
+ op->tid,
+ op->reqid,
+ op->hoid,
+ stats,
+ should_send ? iter->second : ObjectStore::Transaction(),
+ op->version,
+ op->trim_to,
+ op->log_entries,
+ op->temp_added,
+ op->temp_cleared);
+ if (*i == get_parent()->whoami_shard()) {
+ handle_sub_write(
+ get_parent()->whoami_shard(),
+ op->client_op,
+ sop,
+ op->on_local_applied_sync);
+ op->on_local_applied_sync = 0;
+ } else {
+ MOSDECSubOpWrite *r = new MOSDECSubOpWrite(sop);
+ r->set_priority(cct->_conf->osd_client_op_priority);
+ r->pgid = spg_t(get_parent()->primary_spg_t().pgid, i->shard);
+ r->map_epoch = get_parent()->get_epoch();
+ get_parent()->send_message_osd_cluster(
+ i->osd, r, get_parent()->get_epoch());
+ }
+ }
}
int ECBackend::objects_read_sync(
return -EOPNOTSUPP;
}
+struct CallClientContexts :
+ public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
+ ECBackend *ec;
+ ECBackend::ClientAsyncReadStatus *status;
+ list<pair<pair<uint64_t, uint64_t>,
+ pair<bufferlist*, Context*> > > to_read;
+ CallClientContexts(
+ ECBackend *ec,
+ ECBackend::ClientAsyncReadStatus *status,
+ const list<pair<pair<uint64_t, uint64_t>,
+ pair<bufferlist*, Context*> > > &to_read)
+ : ec(ec), status(status), to_read(to_read) {}
+ void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) {
+ ECBackend::read_result_t &res = in.second;
+ assert(res.returned.size() == to_read.size());
+ assert(res.r == 0);
+ assert(res.errors.empty());
+ for (list<pair<pair<uint64_t, uint64_t>,
+ pair<bufferlist*, Context*> > >::iterator i = to_read.begin();
+ i != to_read.end();
+ to_read.erase(i++)) {
+ pair<uint64_t, uint64_t> adjusted =
+ ec->sinfo.offset_len_to_stripe_bounds(i->first);
+ assert(res.returned.front().get<0>() == adjusted.first &&
+ res.returned.front().get<1>() == adjusted.second);
+ map<int, bufferlist> to_decode;
+ bufferlist bl;
+ for (map<pg_shard_t, bufferlist>::iterator j =
+ res.returned.front().get<2>().begin();
+ j != res.returned.front().get<2>().end();
+ ++j) {
+ to_decode[j->first.shard].claim(j->second);
+ }
+ ECUtil::decode(
+ ec->sinfo,
+ ec->ec_impl,
+ to_decode,
+ &bl);
+ assert(i->second.second);
+ assert(i->second.first);
+ i->second.first->substr_of(
+ bl,
+ i->first.first - adjusted.first,
+ i->first.second);
+ if (i->second.second) {
+ i->second.second->complete(i->second.first->length());
+ }
+ res.returned.pop_front();
+ }
+ status->complete = true;
+ list<ECBackend::ClientAsyncReadStatus> &ip =
+ ec->in_progress_client_reads;
+ while (ip.size() && ip.front().complete) {
+ if (ip.front().on_complete) {
+ ip.front().on_complete->complete(0);
+ ip.front().on_complete = NULL;
+ }
+ ip.pop_front();
+ }
+ }
+ ~CallClientContexts() {
+ for (list<pair<pair<uint64_t, uint64_t>,
+ pair<bufferlist*, Context*> > >::iterator i = to_read.begin();
+ i != to_read.end();
+ to_read.erase(i++)) {
+ delete i->second.second;
+ }
+ }
+};
+
void ECBackend::objects_read_async(
const hobject_t &hoid,
const list<pair<pair<uint64_t, uint64_t>,
pair<bufferlist*, Context*> > > &to_read,
Context *on_complete)
{
+ in_progress_client_reads.push_back(ClientAsyncReadStatus(on_complete));
+ CallClientContexts *c = new CallClientContexts(
+ this, &(in_progress_client_reads.back()), to_read);
+ list<pair<uint64_t, uint64_t> > offsets;
+ for (list<pair<pair<uint64_t, uint64_t>,
+ pair<bufferlist*, Context*> > >::const_iterator i =
+ to_read.begin();
+ i != to_read.end();
+ ++i) {
+ offsets.push_back(
+ sinfo.offset_len_to_stripe_bounds(i->first));
+ }
+
+ set<int> want_to_read;
+ for (int i = 0; i < (int)ec_impl->get_data_chunk_count(); ++i) {
+ want_to_read.insert(i);
+ }
+ set<pg_shard_t> shards;
+ int r = get_min_avail_to_read_shards(
+ hoid,
+ want_to_read,
+ false,
+ &shards);
+ assert(r == 0);
+
+ map<hobject_t, read_request_t> for_read_op;
+ for_read_op.insert(
+ make_pair(
+ hoid,
+ read_request_t(
+ hoid,
+ offsets,
+ shards,
+ false,
+ c)));
+
+ start_read_op(
+ cct->_conf->osd_client_op_priority,
+ for_read_op,
+ OpRequestRef());
return;
}
+
+
+int ECBackend::objects_get_attrs(
+ const hobject_t &hoid,
+ map<string, bufferlist> *out)
+{
+ int r = store->getattrs(
+ coll,
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ *out);
+ if (r < 0)
+ return r;
+
+ for (map<string, bufferlist>::iterator i = out->begin();
+ i != out->end();
+ ) {
+ if (ECUtil::is_hinfo_key_string(i->first))
+ out->erase(i++);
+ else
+ ++i;
+ }
+ return r;
+}
+
+void ECBackend::rollback_append(
+ const hobject_t &hoid,
+ uint64_t old_size,
+ ObjectStore::Transaction *t)
+{
+ assert(old_size % sinfo.get_stripe_width() == 0);
+ t->truncate(
+ coll,
+ ghobject_t(hoid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ sinfo.aligned_logical_offset_to_chunk_offset(
+ old_size));
+}
+
+void ECBackend::be_deep_scrub(
+ const hobject_t &poid,
+ ScrubMap::object &o,
+ ThreadPool::TPHandle &handle) {
+ bufferhash h(-1);
+ int r;
+ uint64_t stride = cct->_conf->osd_deep_scrub_stride;
+ if (stride % sinfo.get_chunk_size())
+ stride += sinfo.get_chunk_size() - (stride % sinfo.get_chunk_size());
+ uint64_t pos = 0;
+ while (true) {
+ bufferlist bl;
+ handle.reset_tp_timeout();
+ r = store->read(
+ coll,
+ ghobject_t(
+ poid, ghobject_t::NO_GEN, get_parent()->whoami_shard().shard),
+ pos,
+ stride, bl,
+ true);
+ if (r < 0)
+ break;
+ if (bl.length() % sinfo.get_chunk_size()) {
+ r = -EIO;
+ break;
+ }
+ pos += r;
+ h << bl;
+ if ((unsigned)r < stride)
+ break;
+ }
+
+ ECUtil::HashInfoRef hinfo = get_hash_info(poid);
+ if (r == -EIO) {
+ dout(0) << "_scan_list " << poid << " got "
+ << r << " on read, read_error" << dendl;
+ o.read_error = true;
+ }
+
+ if (hinfo->get_chunk_hash(get_parent()->whoami_shard().shard) != h.digest()) {
+ dout(0) << "_scan_list " << poid << " got incorrect hash on read" << dendl;
+ o.read_error = true;
+ }
+
+ if (hinfo->get_total_chunk_size() != pos) {
+ dout(0) << "_scan_list " << poid << " got incorrect size on read" << dendl;
+ o.read_error = true;
+ }
+
+ /* We checked above that we match our own stored hash. We cannot
+ * send a hash of the actual object, so instead we simply send
+ * our locally stored hash of shard 0 on the assumption that if
+ * we match our chunk hash and our recollection of the hash for
+ * chunk 0 matches that of our peers, there is likely no corruption.
+ */
+ o.digest = hinfo->get_chunk_hash(0);
+ o.digest_present = true;
+
+ o.omap_digest = 0;
+ o.omap_digest_present = true;
+}
#include "OSD.h"
#include "PGBackend.h"
#include "osd_types.h"
+#include <boost/optional.hpp>
+#include "erasure-code/ErasureCodeInterface.h"
+#include "ECTransaction.h"
+#include "ECMsgTypes.h"
+#include "ECUtil.h"
+#include "messages/MOSDECSubOpWrite.h"
+#include "messages/MOSDECSubOpWriteReply.h"
+#include "messages/MOSDECSubOpRead.h"
+#include "messages/MOSDECSubOpReadReply.h"
+struct RecoveryMessages;
class ECBackend : public PGBackend {
public:
RecoveryHandle *open_recovery_op();
bool handle_message(
OpRequestRef op
);
+ bool can_handle_while_inactive(
+ OpRequestRef op
+ );
+ friend struct SubWriteApplied;
+ friend struct SubWriteCommitted;
+ void sub_write_applied(
+ tid_t tid, eversion_t version);
+ void sub_write_committed(
+ tid_t tid, eversion_t version, eversion_t last_complete);
+ void handle_sub_write(
+ pg_shard_t from,
+ OpRequestRef msg,
+ ECSubWrite &op,
+ Context *on_local_applied_sync = 0
+ );
+ void handle_sub_read(
+ pg_shard_t from,
+ ECSubRead &op,
+ ECSubReadReply *reply
+ );
+ void handle_sub_write_reply(
+ pg_shard_t from,
+ ECSubWriteReply &op
+ );
+ void handle_sub_read_reply(
+ pg_shard_t from,
+ ECSubReadReply &op,
+ RecoveryMessages *m
+ );
+ /// @see ReadOp below
void check_recovery_sources(const OSDMapRef osdmap);
void _on_change(ObjectStore::Transaction *t);
void dump_recovery_info(Formatter *f) const;
+ /// @see osd/ECTransaction.cc/h
PGTransaction *get_transaction();
void submit_transaction(
uint64_t len,
bufferlist *bl);
+ /**
+ * Async read mechanism
+ *
+ * Async reads use the same async read mechanism as does recovery.
+ * CallClientContexts is responsible for reconstructing the response
+ * buffer as well as for calling the callbacks.
+ *
+ * One tricky bit is that two reads may possibly not read from the same
+ * set of replicas. This could result in two reads completing in the
+ * wrong (from the interface user's point of view) order. Thus, we
+ * maintain a queue of in progress reads (@see in_progress_client_reads)
+ * to ensure that we always call the completion callback in order.
+ *
+ * Another subtely is that while we may read a degraded object, we will
+ * still only perform a client read from shards in the acting set. This
+ * ensures that we won't ever have to restart a client initiated read in
+ * check_recovery_sources.
+ */
+ friend struct CallClientContexts;
+ struct ClientAsyncReadStatus {
+ bool complete;
+ Context *on_complete;
+ ClientAsyncReadStatus(Context *on_complete)
+ : complete(false), on_complete(on_complete) {}
+ };
+ list<ClientAsyncReadStatus> in_progress_client_reads;
void objects_read_async(
const hobject_t &hoid,
const list<pair<pair<uint64_t, uint64_t>,
pair<bufferlist*, Context*> > > &to_read,
Context *on_complete);
-};
+private:
+ friend struct ECRecoveryHandle;
+ uint64_t get_recovery_chunk_size() const {
+ uint64_t max = cct->_conf->osd_recovery_max_chunk;
+ max -= max % sinfo.get_stripe_width();
+ max += sinfo.get_stripe_width();
+ return max;
+ }
+
+ /**
+ * Recovery
+ *
+ * Recovery uses the same underlying read mechanism as client reads
+ * with the slight difference that recovery reads may come from non
+ * acting shards. Thus, check_recovery_sources may wind up calling
+ * cancel_pull for a read originating with RecoveryOp.
+ *
+ * The recovery process is expressed as a state machine:
+ * - IDLE: Nothing is currently in progress, reads will be started and
+ * we will transition to READING
+ * - READING: We are awaiting a pending read op. Once complete, we will
+ * decode the buffers and proceed to WRITING
+ * - WRITING: We are awaiting a completed push. Once complete, we will
+ * either transition to COMPLETE or to IDLE to continue.
+ * - COMPLETE: complete
+ *
+ * We use the existing Push and PushReply messages and structures to
+ * handle actually shuffling the data over to the replicas. recovery_info
+ * and recovery_progress are expressed in terms of the logical offset
+ * space except for data_included which is in terms of the chunked object
+ * space (to match the passed buffer).
+ *
+ * xattrs are requested on the first read and used to initialize the
+ * object_context if missing on completion of the first read.
+ *
+ * In order to batch up reads and writes, we batch Push, PushReply,
+ * Transaction, and reads in a RecoveryMessages object which is passed
+ * among the recovery methods.
+ */
+ struct RecoveryOp {
+ hobject_t hoid;
+ eversion_t v;
+ set<pg_shard_t> missing_on;
+ set<shard_id_t> missing_on_shards;
+
+ ObjectRecoveryInfo recovery_info;
+ ObjectRecoveryProgress recovery_progress;
+
+ bool pending_read;
+ enum state_t { IDLE, READING, WRITING, COMPLETE } state;
+
+ static const char* tostr(state_t state) {
+ switch (state) {
+ case ECBackend::RecoveryOp::IDLE:
+ return "IDLE";
+ break;
+ case ECBackend::RecoveryOp::READING:
+ return "READING";
+ break;
+ case ECBackend::RecoveryOp::WRITING:
+ return "WRITING";
+ break;
+ case ECBackend::RecoveryOp::COMPLETE:
+ return "COMPLETE";
+ break;
+ default:
+ assert(0);
+ return "";
+ }
+ }
+
+ // must be filled if state == WRITING
+ map<shard_id_t, bufferlist> returned_data;
+ map<string, bufferlist> xattrs;
+ ECUtil::HashInfoRef hinfo;
+ ObjectContextRef obc;
+ set<pg_shard_t> waiting_on_pushes;
+
+ // valid in state READING
+ pair<uint64_t, uint64_t> extent_requested;
+
+ void dump(Formatter *f) const;
+
+ RecoveryOp() : pending_read(false), state(IDLE) {}
+ };
+ friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs);
+ map<hobject_t, RecoveryOp> recovery_ops;
+
+public:
+ /**
+ * Low level async read mechanism
+ *
+ * To avoid duplicating the logic for requesting and waiting for
+ * multiple object shards, there is a common async read mechanism
+ * taking a map of hobject_t->read_request_t which defines callbacks
+ * taking read_result_ts as arguments.
+ *
+ * tid_to_read_map gives open read ops. check_recovery_sources uses
+ * shard_to_read_map and ReadOp::source_to_obj to restart reads
+ * involving down osds.
+ *
+ * The user is responsible for specifying replicas on which to read
+ * and for reassembling the buffer on the other side since client
+ * reads require the original object buffer while recovery only needs
+ * the missing pieces.
+ *
+ * Rather than handling reads on the primary directly, we simply send
+ * ourselves a message. This avoids a dedicated primary path for that
+ * part.
+ */
+ struct read_result_t {
+ int r;
+ map<pg_shard_t, int> errors;
+ boost::optional<map<string, bufferlist> > attrs;
+ list<
+ boost::tuple<
+ uint64_t, uint64_t, map<pg_shard_t, bufferlist> > > returned;
+ read_result_t() : r(0) {}
+ };
+ struct read_request_t {
+ const list<pair<uint64_t, uint64_t> > to_read;
+ const set<pg_shard_t> need;
+ const bool want_attrs;
+ GenContext<pair<RecoveryMessages *, read_result_t& > &> *cb;
+ read_request_t(
+ const hobject_t &hoid,
+ const list<pair<uint64_t, uint64_t> > &to_read,
+ const set<pg_shard_t> &need,
+ bool want_attrs,
+ GenContext<pair<RecoveryMessages *, read_result_t& > &> *cb)
+ : to_read(to_read), need(need), want_attrs(want_attrs),
+ cb(cb) {}
+ };
+ friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
+
+ struct ReadOp {
+ int priority;
+ tid_t tid;
+ OpRequestRef op; // may be null if not on behalf of a client
+
+ map<hobject_t, read_request_t> to_read;
+ map<hobject_t, read_result_t> complete;
+
+ map<hobject_t, set<pg_shard_t> > obj_to_source;
+ map<pg_shard_t, set<hobject_t> > source_to_obj;
+
+ void dump(Formatter *f) const;
+
+ set<pg_shard_t> in_progress;
+ };
+ friend struct FinishReadOp;
+ void filter_read_op(
+ const OSDMapRef osdmap,
+ ReadOp &op);
+ void complete_read_op(ReadOp &rop, RecoveryMessages *m);
+ friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+ map<tid_t, ReadOp> tid_to_read_map;
+ map<pg_shard_t, set<tid_t> > shard_to_read_map;
+ void start_read_op(
+ int priority,
+ map<hobject_t, read_request_t> &to_read,
+ OpRequestRef op);
+
+
+ /**
+ * Client writes
+ *
+ * ECTransaction is responsible for generating a transaction for
+ * each shard to which we need to send the write. As required
+ * by the PGBackend interface, the ECBackend write mechanism
+ * passes trim information with the write and last_complete back
+ * with the reply.
+ *
+ * As with client reads, there is a possibility of out-of-order
+ * completions. Thus, callbacks and completion are called in order
+ * on the writing list.
+ */
+ struct Op {
+ hobject_t hoid;
+ eversion_t version;
+ eversion_t trim_to;
+ vector<pg_log_entry_t> log_entries;
+ Context *on_local_applied_sync;
+ Context *on_all_applied;
+ Context *on_all_commit;
+ tid_t tid;
+ osd_reqid_t reqid;
+ OpRequestRef client_op;
+
+ ECTransaction *t;
+
+ set<hobject_t> temp_added;
+ set<hobject_t> temp_cleared;
+
+ set<pg_shard_t> pending_commit;
+ set<pg_shard_t> pending_apply;
+
+ map<hobject_t, ECUtil::HashInfoRef> unstable_hash_infos;
+ ~Op() {
+ delete t;
+ delete on_local_applied_sync;
+ delete on_all_applied;
+ delete on_all_commit;
+ }
+ };
+ friend ostream &operator<<(ostream &lhs, const Op &rhs);
+
+ void continue_recovery_op(
+ RecoveryOp &op,
+ RecoveryMessages *m);
+
+ void dispatch_recovery_messages(RecoveryMessages &m, int priority);
+ friend struct OnRecoveryReadComplete;
+ void handle_recovery_read_complete(
+ const hobject_t &hoid,
+ boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
+ boost::optional<map<string, bufferlist> > attrs,
+ RecoveryMessages *m);
+ void handle_recovery_push(
+ PushOp &op,
+ RecoveryMessages *m);
+ void handle_recovery_push_reply(
+ PushReplyOp &op,
+ pg_shard_t from,
+ RecoveryMessages *m);
+
+ map<tid_t, Op> tid_to_op_map; /// lists below point into here
+ list<Op*> writing;
+
+ CephContext *cct;
+ ErasureCodeInterfaceRef ec_impl;
+
+
+ /**
+ * ECRecPred
+ *
+ * Determines the whether _have is suffient to recover an object
+ */
+ class ECRecPred : public IsRecoverablePredicate {
+ set<int> want;
+ ErasureCodeInterfaceRef ec_impl;
+ public:
+ ECRecPred(ErasureCodeInterfaceRef ec_impl) : ec_impl(ec_impl) {
+ for (unsigned i = 0; i < ec_impl->get_data_chunk_count(); ++i) {
+ want.insert(i);
+ }
+ }
+ bool operator()(const set<pg_shard_t> &_have) const {
+ set<int> have;
+ for (set<pg_shard_t>::const_iterator i = _have.begin();
+ i != _have.end();
+ ++i) {
+ have.insert(i->shard);
+ }
+ set<int> min;
+ return ec_impl->minimum_to_decode(want, have, &min) == 0;
+ }
+ };
+ IsRecoverablePredicate *get_is_recoverable_predicate() {
+ return new ECRecPred(ec_impl);
+ }
+
+ /**
+ * ECReadPred
+ *
+ * Determines the whether _have is suffient to read an object
+ */
+ class ECReadPred : public IsReadablePredicate {
+ pg_shard_t whoami;
+ ECRecPred rec_pred;
+ public:
+ ECReadPred(
+ pg_shard_t whoami,
+ ErasureCodeInterfaceRef ec_impl) : whoami(whoami), rec_pred(ec_impl) {}
+ bool operator()(const set<pg_shard_t> &_have) const {
+ return _have.count(whoami) && rec_pred(_have);
+ }
+ };
+ IsReadablePredicate *get_is_readable_predicate() {
+ return new ECReadPred(get_parent()->whoami_shard(), ec_impl);
+ }
+
+
+ const ECUtil::stripe_info_t sinfo;
+ /// If modified, ensure that the ref is held until the update is applied
+ SharedPtrRegistry<hobject_t, ECUtil::HashInfo> unstable_hashinfo_registry;
+ ECUtil::HashInfoRef get_hash_info(const hobject_t &hoid);
+
+ friend struct ReadCB;
+ void check_op(Op *op);
+ void start_write(Op *op);
+public:
+ ECBackend(
+ PGBackend::Listener *pg,
+ coll_t coll,
+ coll_t temp_coll,
+ ObjectStore *store,
+ CephContext *cct,
+ ErasureCodeInterfaceRef ec_impl)
+ : PGBackend(pg, store, coll, temp_coll),
+ cct(cct),
+ ec_impl(ec_impl),
+ stripe_width(ec_impl->get_chunk_count()),
+ stripe_size(4*(2<<10) /* TODO: make more flexible */) {}
+
+ /// Returns to_read replicas sufficient to reconstruct want
+ int get_min_avail_to_read_shards(
+ const hobject_t &hoid, ///< [in] object
+ const set<int> &want, ///< [in] desired shards
+ bool for_recovery, ///< [in] true if we may use non-acting replicas
+ set<pg_shard_t> *to_read ///< [out] shards to read
+ ); ///< @return error code, 0 on success
+
+ int objects_get_attrs(
+ const hobject_t &hoid,
+ map<string, bufferlist> *out);
+
+ void rollback_append(
+ const hobject_t &hoid,
+ uint64_t old_size,
+ ObjectStore::Transaction *t);
+
+ bool scrub_supported() { return true; }
+
+ void be_deep_scrub(
+ const hobject_t &obj,
+ ScrubMap::object &o,
+ ThreadPool::TPHandle &handle);
+ uint64_t be_get_ondisk_size(uint64_t logical_size) {
+ return sinfo.logical_to_next_chunk_offset(logical_size);
+ }
+};
#endif