struct OnRecoveryReadComplete :
public GenContext<pair<RecoveryMessages*, ECBackend::read_result_t& > &> {
+ struct RecoveryMessages* rm;
ECBackend *backend;
hobject_t hoid;
- OnRecoveryReadComplete(ECBackend *backend, const hobject_t &hoid)
- : backend(backend), hoid(hoid) {}
+
+ OnRecoveryReadComplete(RecoveryMessages* rm, ECBackend *backend, const hobject_t &hoid)
+ : rm(rm), backend(backend), hoid(hoid) {}
void finish(pair<RecoveryMessages *, ECBackend::read_result_t &> &in) override {
ECBackend::read_result_t &res = in.second;
if (!(res.r == 0 && res.errors.empty())) {
hoid,
res.returned.back(),
res.attrs,
- in.first);
+ rm);
}
};
-struct RecoveryMessages {
+struct RecoveryMessages : GenContext<int> {
+ ECBackend *ec;
map<hobject_t,
ECBackend::read_request_t> recovery_reads;
map<hobject_t, set<int>> want_to_read;
+
+ RecoveryMessages(ECBackend* ec) : ec(ec) {}
+
void recovery_read(
ECBackend *ec,
const hobject_t &hoid, uint64_t off, uint64_t len,
need,
attrs,
new OnRecoveryReadComplete(
+ this,
ec,
hoid))));
}
ObjectStore::Transaction t;
RecoveryMessages() {}
~RecoveryMessages() {}
+
+ void finish(int priority) override {
+ ec->dispatch_recovery_messages(*this, priority);
+ }
};
void ECBackend::handle_recovery_push(
m.want_to_read,
m.recovery_reads,
OpRequestRef(),
- false, true);
+ false, true, new RecoveryMessages{});
}
void ECBackend::continue_recovery_op(
int priority)
{
ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
- RecoveryMessages m;
+ RecoveryMessages m{this};
for (list<RecoveryOp>::iterator i = h->ops.begin();
i != h->ops.end();
++i) {
// buffers. It does not conflict with ECSubReadReply operator<<.
MOSDECSubOpReadReply *op = static_cast<MOSDECSubOpReadReply*>(
_op->get_nonconst_req());
- RecoveryMessages rm;
- handle_sub_read_reply(op->op.from, op->op, &rm, _op->pg_trace);
- dispatch_recovery_messages(rm, priority);
+ handle_sub_read_reply(op->op.from, op->op, _op->pg_trace);
+ // dispatch_recovery_messages() in the case of recovery_reads
+ // is called via the `on_complete` callback
return true;
}
case MSG_OSD_PG_PUSH: {
auto op = _op->get_req<MOSDPGPush>();
- RecoveryMessages rm;
+ RecoveryMessages rm{this};
for (vector<PushOp>::const_iterator i = op->pushes.begin();
i != op->pushes.end();
++i) {
case MSG_OSD_PG_PUSH_REPLY: {
const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
_op->get_req());
- RecoveryMessages rm;
+ RecoveryMessages rm{this};
for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
i != op->replies.end();
++i) {
void ECBackend::handle_sub_read_reply(
pg_shard_t from,
ECSubReadReply &op,
- RecoveryMessages *m,
const ZTracer::Trace &trace)
{
trace.event("ec sub read reply");
is_complete == rop.complete.size()) {
dout(20) << __func__ << " Complete: " << rop << dendl;
rop.trace.event("ec read complete");
- complete_read_op(rop, m);
+ complete_read_op(rop);
} else {
dout(10) << __func__ << " readop not complete: " << rop << dendl;
}
}
-void ECBackend::complete_read_op(ReadOp &rop, RecoveryMessages *m)
+void ECBackend::complete_read_op(ReadOp &rop)
{
map<hobject_t, read_request_t>::iterator reqiter =
rop.to_read.begin();
}
rop.in_progress.clear();
tid_to_read_map.erase(rop.tid);
+ if (rop.on_complete) {
+ rop.on_complete->complete(rop.priority);
+ rop.on_complete = nullptr;
+ }
}
struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
void finish(ThreadPool::TPHandle &handle) override {
auto ropiter = ec->tid_to_read_map.find(tid);
ceph_assert(ropiter != ec->tid_to_read_map.end());
- int priority = ropiter->second.priority;
- RecoveryMessages rm;
- ec->complete_read_op(ropiter->second, &rm);
- ec->dispatch_recovery_messages(rm, priority);
+ ec->complete_read_op(ropiter->second);
}
};
map<hobject_t, read_request_t> &to_read,
OpRequestRef _op,
bool do_redundant_reads,
- bool for_recovery)
+ bool for_recovery,
+ GenContext<int> *on_complete)
{
ceph_tid_t tid = get_parent()->get_tid();
ceph_assert(!tid_to_read_map.count(tid));
tid,
do_redundant_reads,
for_recovery,
+ on_complete,
_op,
std::move(want_to_read),
std::move(to_read))).first->second;
obj_want_to_read,
for_read_op,
OpRequestRef(),
- fast_read, false);
+ fast_read, false, nullptr);
return;
}
void handle_sub_read_reply(
pg_shard_t from,
ECSubReadReply &op,
- RecoveryMessages *m,
const ZTracer::Trace &trace
);
void continue_recovery_op(
RecoveryOp &op,
RecoveryMessages *m);
+ friend struct RecoveryMessages;
void dispatch_recovery_messages(RecoveryMessages &m, int priority);
friend struct OnRecoveryReadComplete;
void handle_recovery_read_complete(
// True if reading for recovery which could possibly reading only a subset
// of the available shards.
bool for_recovery;
+ GenContext<int> *on_complete;
ZTracer::Trace trace;
ceph_tid_t tid,
bool do_redundant_reads,
bool for_recovery,
+ GenContext<int> *on_complete,
OpRequestRef op,
std::map<hobject_t, std::set<int>> &&_want_to_read,
std::map<hobject_t, read_request_t> &&_to_read)
- : priority(priority), tid(tid), op(op), do_redundant_reads(do_redundant_reads),
- for_recovery(for_recovery), want_to_read(std::move(_want_to_read)),
+ : priority(priority),
+ tid(tid),
+ op(op),
+ do_redundant_reads(do_redundant_reads),
+ for_recovery(for_recovery),
+ on_complete(on_complete),
+ want_to_read(std::move(_want_to_read)),
to_read(std::move(_to_read)) {
for (auto &&hpair: to_read) {
auto &returned = complete[hpair.first].returned;
void filter_read_op(
const OSDMapRef& osdmap,
ReadOp &op);
- void complete_read_op(ReadOp &rop, RecoveryMessages *m);
+ void complete_read_op(ReadOp &rop);
friend ostream &operator<<(ostream &lhs, const ReadOp &rhs);
+
std::map<ceph_tid_t, ReadOp> tid_to_read_map;
std::map<pg_shard_t, std::set<ceph_tid_t> > shard_to_read_map;
void start_read_op(
std::map<hobject_t, std::set<int>> &want_to_read,
std::map<hobject_t, read_request_t> &to_read,
OpRequestRef op,
- bool do_redundant_reads, bool for_recovery);
+ bool do_redundant_reads,
+ bool for_recovery,
+ GenContext<int> *on_complete);
void do_read_op(ReadOp &rop);
int send_all_remaining_reads(