return pgb->get_parent()->gen_dbg_prefix(*_dout);
}
+static ostream& _prefix(std::ostream *_dout, ECBackend::RecoveryBackend *pgb) {
+ return pgb->get_parent()->gen_dbg_prefix(*_dout);
+}
+
struct ECRecoveryHandle : public PGBackend::RecoveryHandle {
- list<ECBackend::RecoveryOp> ops;
+ list<ECBackend::RecoveryBackend::RecoveryOp> ops;
};
static ostream &operator<<(ostream &lhs, const map<pg_shard_t, bufferlist> &rhs)
return lhs << "]";
}
-ostream &operator<<(ostream &lhs, const ECBackend::RecoveryOp &rhs)
+ostream &operator<<(ostream &lhs, const ECBackend::RecoveryBackend::RecoveryOp &rhs)
{
return lhs << "RecoveryOp("
<< "hoid=" << rhs.hoid
<< " recovery_info=" << rhs.recovery_info
<< " recovery_progress=" << rhs.recovery_progress
<< " obc refcount=" << rhs.obc.use_count()
- << " state=" << ECBackend::RecoveryOp::tostr(rhs.state)
+ << " state=" << ECBackend::RecoveryBackend::RecoveryOp::tostr(rhs.state)
<< " waiting_on_pushes=" << rhs.waiting_on_pushes
<< " extent_requested=" << rhs.extent_requested
<< ")";
}
-void ECBackend::RecoveryOp::dump(Formatter *f) const
+void ECBackend::RecoveryBackend::RecoveryOp::dump(Formatter *f) const
{
f->dump_stream("hoid") << hoid;
f->dump_stream("v") << v;
: PGBackend(cct, pg, store, coll, ch),
read_pipeline(cct, ec_impl, this->sinfo, get_parent()->get_eclistener()),
rmw_pipeline(cct, ec_impl, this->sinfo, get_parent()->get_eclistener(), *this),
- unstable_hashinfo_registry(cct, ec_impl),
+ recovery_backend(cct, coll, ec_impl, this->sinfo, read_pipeline, unstable_hashinfo_registry, get_parent()->get_eclistener()),
ec_impl(ec_impl),
- sinfo(ec_impl->get_data_chunk_count(), stripe_width) {
+ sinfo(ec_impl->get_data_chunk_count(), stripe_width),
+ unstable_hashinfo_registry(cct, ec_impl) {
ceph_assert((ec_impl->get_data_chunk_count() *
ec_impl->get_chunk_size(stripe_width)) == stripe_width);
}
PGBackend::RecoveryHandle *ECBackend::open_recovery_op()
+{
+ return recovery_backend.open_recovery_op();
+}
+
+PGBackend::RecoveryHandle *ECBackend::RecoveryBackend::open_recovery_op()
{
return new ECRecoveryHandle;
}
-void ECBackend::_failed_push(const hobject_t &hoid, ECCommon::read_result_t &res)
+void ECBackend::RecoveryBackend::_failed_push(const hobject_t &hoid, ECCommon::read_result_t &res)
{
dout(10) << __func__ << ": Read error " << hoid << " r="
<< res.r << " errors=" << res.errors << dendl;
const PushOp &op,
RecoveryMessages *m,
bool is_repair)
+{
+ if (get_parent()->pg_is_remote_backfilling()) {
+ get_parent()->pg_add_local_num_bytes(op.data.length());
+ get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count());
+ dout(10) << __func__ << " " << op.soid
+ << " add new actual data by " << op.data.length()
+ << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count()
+ << dendl;
+ }
+
+ recovery_backend.handle_recovery_push(op, m, is_repair);
+
+ if (op.after_progress.data_complete) {
+ if ((get_parent()->pgb_is_primary())) {
+ if (get_parent()->pg_is_repair() || is_repair)
+ get_parent()->inc_osd_stat_repaired();
+ } else {
+ // If primary told us this is a repair, bump osd_stat_t::num_objects_repaired
+ if (is_repair)
+ get_parent()->inc_osd_stat_repaired();
+ if (get_parent()->pg_is_remote_backfilling()) {
+ struct stat st;
+ int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN,
+ get_parent()->whoami_shard().shard), &st);
+ if (r == 0) {
+ get_parent()->pg_sub_local_num_bytes(st.st_size);
+ // XXX: This can be way overestimated for small objects
+ get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count());
+ dout(10) << __func__ << " " << op.soid
+ << " sub actual data by " << st.st_size
+ << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count()
+ << dendl;
+ }
+ }
+ }
+ }
+}
+
+void ECBackend::RecoveryBackend::handle_recovery_push(
+ const PushOp &op,
+ RecoveryMessages *m,
+ bool is_repair)
{
if (get_parent()->check_failsafe_full()) {
dout(10) << __func__ << " Out of space (failsafe) processing push request." << dendl;
ceph_assert(op.data.length() == 0);
}
- if (get_parent()->pg_is_remote_backfilling()) {
- get_parent()->pg_add_local_num_bytes(op.data.length());
- get_parent()->pg_add_num_bytes(op.data.length() * get_ec_data_chunk_count());
- dout(10) << __func__ << " " << op.soid
- << " add new actual data by " << op.data.length()
- << " add new num_bytes by " << op.data.length() * get_ec_data_chunk_count()
- << dendl;
- }
-
if (op.before_progress.first) {
ceph_assert(op.attrset.count(string("_")));
m->t.setattrs(
ObjectContextRef(),
false,
&m->t);
- if (get_parent()->pg_is_remote_backfilling()) {
- struct stat st;
- int r = store->stat(ch, ghobject_t(op.soid, ghobject_t::NO_GEN,
- get_parent()->whoami_shard().shard), &st);
- if (r == 0) {
- get_parent()->pg_sub_local_num_bytes(st.st_size);
- // XXX: This can be way overestimated for small objects
- get_parent()->pg_sub_num_bytes(st.st_size * get_ec_data_chunk_count());
- dout(10) << __func__ << " " << op.soid
- << " sub actual data by " << st.st_size
- << " sub num_bytes by " << st.st_size * get_ec_data_chunk_count()
- << dendl;
- }
- }
}
}
m->push_replies[get_parent()->primary_shard()].push_back(PushReplyOp());
m->push_replies[get_parent()->primary_shard()].back().soid = op.soid;
}
-void ECBackend::handle_recovery_push_reply(
+void ECBackend::RecoveryBackend::handle_recovery_push_reply(
const PushReplyOp &op,
pg_shard_t from,
RecoveryMessages *m)
continue_recovery_op(rop, m);
}
-void ECBackend::handle_recovery_read_complete(
+void ECBackend::RecoveryBackend::handle_recovery_read_complete(
const hobject_t &hoid,
boost::tuple<uint64_t, uint64_t, map<pg_shard_t, bufferlist> > &to_read,
std::optional<map<string, bufferlist, less<>> > attrs,
<< ")"
<< dendl;
ceph_assert(recovery_ops.count(hoid));
- RecoveryOp &op = recovery_ops[hoid];
+ RecoveryBackend::RecoveryOp &op = recovery_ops[hoid];
ceph_assert(op.returned_data.empty());
map<int, bufferlist*> target;
for (set<shard_id_t>::iterator i = op.missing_on_shards.begin();
};
struct RecoveryReadCompleter : ECCommon::ReadCompleter {
- RecoveryReadCompleter(ECBackend& backend)
+ RecoveryReadCompleter(ECBackend::RecoveryBackend& backend)
: backend(backend) {}
void finish_single_request(
backend.dispatch_recovery_messages(rm, priority);
}
- ECBackend& backend;
+ ECBackend::RecoveryBackend& backend;
RecoveryMessages rm;
};
-void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
+void ECBackend::RecoveryBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
{
for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
i != m.pushes.end();
m.pushes.erase(i++)) {
MOSDPGPush *msg = new MOSDPGPush();
msg->set_priority(priority);
- msg->map_epoch = get_osdmap_epoch();
+ msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
msg->from = get_parent()->whoami_shard();
msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
msg->pushes.swap(i->second);
msg->compute_cost(cct);
msg->is_repair = get_parent()->pg_is_repair();
- get_parent()->send_message(
- i->first.osd,
- msg);
+ std::vector wrapped_msg {
+ std::make_pair(i->first.osd, static_cast<Message*>(msg))
+ };
+ get_parent()->send_message_osd_cluster(wrapped_msg, msg->map_epoch);
}
map<int, MOSDPGPushReply*> replies;
for (map<pg_shard_t, vector<PushReplyOp> >::iterator i =
m.push_replies.erase(i++)) {
MOSDPGPushReply *msg = new MOSDPGPushReply();
msg->set_priority(priority);
- msg->map_epoch = get_osdmap_epoch();
+ msg->map_epoch = get_parent()->pgb_get_osdmap_epoch();
msg->min_epoch = get_parent()->get_last_peering_reset_epoch();
msg->from = get_parent()->whoami_shard();
msg->pgid = spg_t(get_parent()->get_info().pgid.pgid, i->first.shard);
void ECBackend::RecoveryBackend::continue_recovery_op(
RecoveryBackend::RecoveryOp &op,
RecoveryMessages *m)
-{
-}
-
-void ECBackend::continue_recovery_op(
- RecoveryOp &op,
- RecoveryMessages *m)
{
dout(10) << __func__ << ": continuing " << op << dendl;
+ using RecoveryOp = RecoveryBackend::RecoveryOp;
while (1) {
switch (op.state) {
case RecoveryOp::IDLE: {
ceph_assert(recovery_ops.count(op.hoid));
eversion_t v = recovery_ops[op.hoid].v;
recovery_ops.erase(op.hoid);
+ // TODO: not in crimson yet
get_parent()->on_failed_pull({get_parent()->whoami_shard()},
op.hoid, v);
return;
ceph_assert(!op.recovery_progress.first);
dout(10) << __func__ << ": canceling recovery op for obj " << op.hoid
<< dendl;
+ // in crimson
get_parent()->cancel_pull(op.hoid);
recovery_ops.erase(op.hoid);
return;
pop.before_progress = op.recovery_progress;
pop.after_progress = after_progress;
if (*mi != get_parent()->primary_shard())
+ // already in crimson -- junction point with PeeringState
get_parent()->begin_peer_recover(
*mi,
op.hoid);
stat.num_bytes_recovered = op.recovery_info.size;
stat.num_keys_recovered = 0; // ??? op ... omap_entries.size(); ?
stat.num_objects_recovered = 1;
+ // TODO: not in crimson yet
if (get_parent()->pg_is_repair())
stat.num_objects_repaired = 1;
+ // pg_recovery.cc in crimson has it
get_parent()->on_global_recover(op.hoid, stat, false);
dout(10) << __func__ << ": WRITING return " << op << dendl;
recovery_ops.erase(op.hoid);
RecoveryHandle *_h,
int priority)
{
- ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
+ ceph_assert(_h);
+ ECRecoveryHandle &h = static_cast<ECRecoveryHandle&>(*_h);
+ recovery_backend.run_recovery_op(h, priority);
+ send_recovery_deletes(priority, h.deletes);
+ delete _h;
+}
+
+void ECBackend::RecoveryBackend::run_recovery_op(
+ ECRecoveryHandle &h,
+ int priority)
+{
RecoveryMessages m;
- for (list<RecoveryOp>::iterator i = h->ops.begin();
- i != h->ops.end();
+ for (list<RecoveryOp>::iterator i = h.ops.begin();
+ i != h.ops.end();
++i) {
dout(10) << __func__ << ": starting " << *i << dendl;
ceph_assert(!recovery_ops.count(i->hoid));
RecoveryOp &op = recovery_ops.insert(make_pair(i->hoid, *i)).first->second;
continue_recovery_op(op, &m);
}
-
dispatch_recovery_messages(m, priority);
- send_recovery_deletes(priority, h->deletes);
- delete _h;
}
int ECBackend::recover_object(
ObjectContextRef head,
ObjectContextRef obc,
RecoveryHandle *_h)
+{
+ return recovery_backend.recover_object(hoid, v, head, obc, _h);
+}
+
+int ECBackend::RecoveryBackend::recover_object(
+ const hobject_t &hoid,
+ eversion_t v,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *_h)
{
ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
h->ops.push_back(RecoveryOp());
++i) {
handle_recovery_push(*i, &rm, op->is_repair);
}
- dispatch_recovery_messages(rm, priority);
+ recovery_backend.dispatch_recovery_messages(rm, priority);
return true;
}
case MSG_OSD_PG_PUSH_REPLY: {
for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
i != op->replies.end();
++i) {
- handle_recovery_push_reply(*i, op->from, &rm);
+ recovery_backend.handle_recovery_push_reply(*i, op->from, &rm);
}
- dispatch_recovery_messages(rm, priority);
+ recovery_backend.dispatch_recovery_messages(rm, priority);
return true;
}
default:
void ECBackend::clear_recovery_state()
{
- recovery_ops.clear();
+ recovery_backend.recovery_ops.clear();
}
void ECBackend::dump_recovery_info(Formatter *f) const
{
f->open_array_section("recovery_ops");
- for (map<hobject_t, RecoveryOp>::const_iterator i = recovery_ops.begin();
- i != recovery_ops.end();
+ for (map<hobject_t, RecoveryBackend::RecoveryOp>::const_iterator i = recovery_backend.recovery_ops.begin();
+ i != recovery_backend.recovery_ops.end();
++i) {
f->open_object_section("op");
i->second.dump(f);
void kick_reads();
- uint64_t get_recovery_chunk_size() const {
- return round_up_to(cct->_conf->osd_recovery_max_chunk,
- sinfo.get_stripe_width());
- }
-
/**
* Recovery
*
* Transaction, and reads in a RecoveryMessages object which is passed
* among the recovery methods.
*/
+public:
+ struct RecoveryBackend {
+ CephContext* cct;
+ const coll_t &coll;
+ ceph::ErasureCodeInterfaceRef ec_impl;
+ const ECUtil::stripe_info_t& sinfo;
+ ReadPipeline& read_pipeline;
+ UnstableHashInfoRegistry& unstable_hashinfo_registry;
+ // TODO: lay an interface down here
+ ECListener* parent;
+
+ ECListener *get_parent() const { return parent; }
+ const OSDMapRef& get_osdmap() const { return get_parent()->pgb_get_osdmap(); }
+ epoch_t get_osdmap_epoch() const { return get_parent()->pgb_get_osdmap_epoch(); }
+ const pg_info_t &get_info() { return get_parent()->get_info(); }
+ void add_temp_obj(const hobject_t &oid) { get_parent()->add_temp_obj(oid); }
+ void clear_temp_obj(const hobject_t &oid) { get_parent()->clear_temp_obj(oid); }
+
+ RecoveryBackend(CephContext* cct,
+ const coll_t &coll,
+ ceph::ErasureCodeInterfaceRef ec_impl,
+ const ECUtil::stripe_info_t& sinfo,
+ ReadPipeline& read_pipeline,
+ UnstableHashInfoRegistry& unstable_hashinfo_registry,
+ ECListener* parent)
+ : cct(cct),
+ coll(coll),
+ ec_impl(std::move(ec_impl)),
+ sinfo(sinfo),
+ read_pipeline(read_pipeline),
+ unstable_hashinfo_registry(unstable_hashinfo_registry),
+ parent(parent) {
+ }
+ // <<<----
struct RecoveryOp {
hobject_t hoid;
eversion_t v;
static const char* tostr(state_t state) {
switch (state) {
- case ECBackend::RecoveryOp::IDLE:
+ case RecoveryOp::IDLE:
return "IDLE";
- case ECBackend::RecoveryOp::READING:
+ case RecoveryOp::READING:
return "READING";
- case ECBackend::RecoveryOp::WRITING:
+ case RecoveryOp::WRITING:
return "WRITING";
- case ECBackend::RecoveryOp::COMPLETE:
+ case RecoveryOp::COMPLETE:
return "COMPLETE";
default:
ceph_abort();
friend ostream &operator<<(ostream &lhs, const RecoveryOp &rhs);
std::map<hobject_t, RecoveryOp> recovery_ops;
+ uint64_t get_recovery_chunk_size() const {
+ return round_up_to(cct->_conf->osd_recovery_max_chunk,
+ sinfo.get_stripe_width());
+ }
+
+ void dispatch_recovery_messages(RecoveryMessages &m, int priority);
+
+ RecoveryHandle *open_recovery_op();
+ void run_recovery_op(
+ struct ECRecoveryHandle &h,
+ int priority);
+ int recover_object(
+ const hobject_t &hoid,
+ eversion_t v,
+ ObjectContextRef head,
+ ObjectContextRef obc,
+ RecoveryHandle *h);
void continue_recovery_op(
- RecoveryOp &op,
+ RecoveryBackend::RecoveryOp &op,
RecoveryMessages *m);
- friend struct RecoveryMessages;
- void dispatch_recovery_messages(RecoveryMessages &m, int priority);
- friend struct OnRecoveryReadComplete;
- friend struct RecoveryReadCompleter;
void handle_recovery_read_complete(
const hobject_t &hoid,
boost::tuple<uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > &to_read,
const PushReplyOp &op,
pg_shard_t from,
RecoveryMessages *m);
+ friend struct RecoveryMessages;
+ int get_ec_data_chunk_count() const {
+ return ec_impl->get_data_chunk_count();
+ }
+ void _failed_push(const hobject_t &hoid, ECCommon::read_result_t &res);
+ };
+ friend ostream &operator<<(ostream &lhs, const RecoveryBackend::RecoveryOp &rhs);
+ friend struct RecoveryMessages;
+ friend struct OnRecoveryReadComplete;
+ friend struct RecoveryReadCompleter;
+
+ void handle_recovery_push(
+ const PushOp &op,
+ RecoveryMessages *m,
+ bool is_repair);
public:
struct ReadPipeline read_pipeline;
struct RMWPipeline rmw_pipeline;
+ struct RecoveryBackend recovery_backend;
ceph::ErasureCodeInterfaceRef ec_impl;
uint64_t be_get_ondisk_size(uint64_t logical_size) const final {
return sinfo.logical_to_next_chunk_offset(logical_size);
}
- void _failed_push(const hobject_t &hoid, ECBackend::read_result_t &res);
};
ostream &operator<<(ostream &lhs, const ECBackend::RMWPipeline::pipeline_state_t &rhs);