get_parent()->on_failed_pull(fl, hoid, v);
}
-struct OnRecoveryReadComplete :
- public GenContext<ECBackend::read_result_t&> {
- struct RecoveryMessages* rm;
- ECBackend *backend;
- hobject_t hoid;
-
- OnRecoveryReadComplete() = delete;
- OnRecoveryReadComplete(RecoveryMessages* rm, ECBackend *backend, const hobject_t &hoid)
- : rm(rm), backend(backend), hoid(hoid) {}
- void finish(ECBackend::read_result_t &res) override {
- if (!(res.r == 0 && res.errors.empty())) {
- backend->_failed_push(hoid, res);
- return;
- }
- ceph_assert(res.returned.size() == 1);
- backend->handle_recovery_read_complete(
- hoid,
- res.returned.back(),
- res.attrs,
- rm);
- }
-};
-
-struct RecoveryMessages : GenContext<int> {
- ECBackend *ec;
+struct RecoveryMessages {
map<hobject_t,
ECBackend::read_request_t> recovery_reads;
- RecoveryMessages* next_recovery_messages = nullptr;
map<hobject_t, set<int>> want_to_read;
- RecoveryMessages(ECBackend* ec) : ec(ec) {}
-
void recovery_read(
- ECBackend *ec,
const hobject_t &hoid, uint64_t off, uint64_t len,
set<int> &&_want_to_read,
const map<pg_shard_t, vector<pair<int, int>>> &need,
bool attrs)
{
- if (!next_recovery_messages) {
- next_recovery_messages = new RecoveryMessages{ec};
- }
list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
to_read.push_back(boost::make_tuple(off, len, 0));
ceph_assert(!recovery_reads.count(hoid));
ECBackend::read_request_t(
to_read,
need,
- attrs,
- new OnRecoveryReadComplete(
- next_recovery_messages,
- ec,
- hoid))));
+ attrs)));
}
map<pg_shard_t, vector<PushOp> > pushes;
map<pg_shard_t, vector<PushReplyOp> > push_replies;
ObjectStore::Transaction t;
- RecoveryMessages() = delete;
- ~RecoveryMessages() {}
-
- void finish(int priority) override {
- ec->dispatch_recovery_messages(*this, priority);
- }
};
void ECBackend::handle_recovery_push(
}
};
+struct RecoveryReadCompleter : ECBackend::ReadCompleter {
+ RecoveryReadCompleter(ECBackend& backend)
+ : backend(backend) {}
+
+ void finish_single_request(
+ const hobject_t &hoid,
+ ECBackend::read_result_t &res,
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> >) override
+ {
+ if (!(res.r == 0 && res.errors.empty())) {
+ backend._failed_push(hoid, res);
+ return;
+ }
+ ceph_assert(res.returned.size() == 1);
+ backend.handle_recovery_read_complete(
+ hoid,
+ res.returned.back(),
+ res.attrs,
+ &rm);
+ }
+
+ void finish(int priority) && override
+ {
+ backend.dispatch_recovery_messages(rm, priority);
+ }
+
+ ECBackend& backend;
+ RecoveryMessages rm;
+};
+
void ECBackend::dispatch_recovery_messages(RecoveryMessages &m, int priority)
{
for (map<pg_shard_t, vector<PushOp> >::iterator i = m.pushes.begin();
m.want_to_read,
m.recovery_reads,
OpRequestRef(),
- false, true, m.next_recovery_messages);
- m.next_recovery_messages = nullptr;
+ false,
+ true,
+ std::make_unique<RecoveryReadCompleter>(*this));
}
void ECBackend::continue_recovery_op(
return;
}
m->recovery_read(
- this,
op.hoid,
op.recovery_progress.data_recovered_to,
amount,
int priority)
{
ECRecoveryHandle *h = static_cast<ECRecoveryHandle*>(_h);
- RecoveryMessages m{this};
+ RecoveryMessages m;
for (list<RecoveryOp>::iterator i = h->ops.begin();
i != h->ops.end();
++i) {
}
case MSG_OSD_PG_PUSH: {
auto op = _op->get_req<MOSDPGPush>();
- RecoveryMessages rm{this};
+ RecoveryMessages rm;
for (vector<PushOp>::const_iterator i = op->pushes.begin();
i != op->pushes.end();
++i) {
case MSG_OSD_PG_PUSH_REPLY: {
const MOSDPGPushReply *op = static_cast<const MOSDPGPushReply *>(
_op->get_req());
- RecoveryMessages rm{this};
+ RecoveryMessages rm;
for (vector<PushReplyOp>::const_iterator i = op->replies.begin();
i != op->replies.end();
++i) {
rop.complete.begin();
ceph_assert(rop.to_read.size() == rop.complete.size());
for (; reqiter != rop.to_read.end(); ++reqiter, ++resiter) {
- if (reqiter->second.cb) {
- reqiter->second.cb->complete(resiter->second);
- reqiter->second.cb = nullptr;
- }
- }
+ rop.on_complete->finish_single_request(
+ reqiter->first,
+ resiter->second,
+ reqiter->second.to_read);
+ }
+ ceph_assert(rop.on_complete);
+ std::move(*rop.on_complete).finish(rop.priority);
+ rop.on_complete = nullptr;
// if the read op is over. clean all the data of this tid.
for (set<pg_shard_t>::iterator iter = rop.in_progress.begin();
iter != rop.in_progress.end();
}
rop.in_progress.clear();
tid_to_read_map.erase(rop.tid);
- if (rop.on_complete) {
- rop.on_complete->complete(rop.priority);
- rop.on_complete = nullptr;
- }
}
struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
read_request_t &req = op.to_read.find(*i)->second;
dout(10) << __func__ << ": canceling " << req
<< " for obj " << *i << dendl;
- ceph_assert(req.cb);
- delete req.cb;
- req.cb = nullptr;
-
op.to_read.erase(*i);
op.complete.erase(*i);
on_erase(*i);
i != tid_to_read_map.end();
++i) {
dout(10) << __func__ << ": cancelling " << i->second << dendl;
- for (map<hobject_t, read_request_t>::iterator j =
- i->second.to_read.begin();
- j != i->second.to_read.end();
- ++j) {
- delete j->second.cb;
- j->second.cb = nullptr;
- }
}
tid_to_read_map.clear();
shard_to_read_map.clear();
OpRequestRef _op,
bool do_redundant_reads,
bool for_recovery,
- GenContext<int> *on_complete)
+ std::unique_ptr<ECBackend::ReadCompleter> on_complete)
{
ceph_tid_t tid = get_parent()->get_tid();
ceph_assert(!tid_to_read_map.count(tid));
tid,
do_redundant_reads,
for_recovery,
- on_complete,
+ std::move(on_complete),
_op,
std::move(want_to_read),
std::move(to_read))).first->second;
on_complete)));
}
-struct CallClientContexts :
- public GenContext<ECBackend::read_result_t&> {
- hobject_t hoid;
- ECBackend::ReadPipeline &read_pipeline;
- ECBackend::ClientAsyncReadStatus *status;
- list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
- CallClientContexts(
- hobject_t hoid,
- ECBackend::ReadPipeline &read_pipeline,
- ECBackend::ClientAsyncReadStatus *status,
- const list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read)
- : hoid(hoid), read_pipeline(read_pipeline), status(status), to_read(to_read) {}
- void finish(ECBackend::read_result_t &res) override {
+struct ClientReadCompleter : ECBackend::ReadCompleter {
+ ClientReadCompleter(ECBackend::ReadPipeline &read_pipeline,
+ ECBackend::ClientAsyncReadStatus *status)
+ : read_pipeline(read_pipeline),
+ status(status) {}
+
+ void finish_single_request(
+ const hobject_t &hoid,
+ ECBackend::read_result_t &res,
+ list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) override
+ {
extent_map result;
if (res.r != 0)
goto out;
status->complete_object(hoid, res.r, std::move(result));
read_pipeline.kick_reads();
}
+
+ void finish(int priority) && override
+ {
+ // NOP
+ }
+
+ ECBackend::ReadPipeline &read_pipeline;
+ ECBackend::ClientAsyncReadStatus *status;
};
+
void ECBackend::objects_read_and_reconstruct(
const map<hobject_t,
std::list<boost::tuple<uint64_t, uint64_t, uint32_t> >
&shards);
ceph_assert(r == 0);
- CallClientContexts *c = new CallClientContexts(
- to_read.first,
- *this,
- &(in_progress_client_reads.back()),
- to_read.second);
for_read_op.insert(
make_pair(
to_read.first,
read_request_t(
to_read.second,
shards,
- false,
- c)));
+ false)));
obj_want_to_read.insert(make_pair(to_read.first, want_to_read));
}
obj_want_to_read,
for_read_op,
OpRequestRef(),
- fast_read, false, nullptr);
+ fast_read,
+ false,
+ std::make_unique<ClientReadCompleter>(*this, &(in_progress_client_reads.back())));
}();
return;
}
list<boost::tuple<uint64_t, uint64_t, uint32_t> > offsets =
rop.to_read.find(hoid)->second.to_read;
- GenContext<read_result_t&> *c = rop.to_read.find(hoid)->second.cb;
// (Note cuixf) If we need to read attrs and we read failed, try to read again.
bool want_attrs =
read_request_t(
offsets,
shards,
- want_attrs,
- c)));
+ want_attrs)));
return 0;
}
friend struct RecoveryMessages;
void dispatch_recovery_messages(RecoveryMessages &m, int priority);
friend struct OnRecoveryReadComplete;
+ friend struct RecoveryReadCompleter;
void handle_recovery_read_complete(
const hobject_t &hoid,
boost::tuple<uint64_t, uint64_t, std::map<pg_shard_t, ceph::buffer::list> > &to_read,
const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read;
std::map<pg_shard_t, std::vector<std::pair<int, int>>> need;
bool want_attrs;
- GenContext<read_result_t&> *cb;
read_request_t(
const std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > &to_read,
const std::map<pg_shard_t, std::vector<std::pair<int, int>>> &need,
- bool want_attrs,
- GenContext<read_result_t&> *cb)
- : to_read(to_read), need(need), want_attrs(want_attrs),
- cb(cb) {}
+ bool want_attrs)
+ : to_read(to_read), need(need), want_attrs(want_attrs) {}
};
friend ostream &operator<<(ostream &lhs, const read_request_t &rhs);
+ struct ReadCompleter {
+ virtual void finish_single_request(
+ const hobject_t &hoid,
+ ECBackend::read_result_t &res,
+ std::list<boost::tuple<uint64_t, uint64_t, uint32_t> > to_read) = 0;
+
+ virtual void finish(int priority) && = 0;
+
+ virtual ~ReadCompleter() = default;
+ };
+
struct ReadOp {
int priority;
ceph_tid_t tid;
// True if reading for recovery which could possibly reading only a subset
// of the available shards.
bool for_recovery;
- GenContext<int> *on_complete;
+ std::unique_ptr<ReadCompleter> on_complete;
ZTracer::Trace trace;
ceph_tid_t tid,
bool do_redundant_reads,
bool for_recovery,
- GenContext<int> *on_complete,
+ std::unique_ptr<ReadCompleter> _on_complete,
OpRequestRef op,
std::map<hobject_t, std::set<int>> &&_want_to_read,
std::map<hobject_t, read_request_t> &&_to_read)
op(op),
do_redundant_reads(do_redundant_reads),
for_recovery(for_recovery),
- on_complete(on_complete),
+ on_complete(std::move(_on_complete)),
want_to_read(std::move(_want_to_read)),
to_read(std::move(_to_read)) {
for (auto &&hpair: to_read) {
}
}
ReadOp() = delete;
- ReadOp(const ReadOp &) = default;
+ ReadOp(const ReadOp &) = delete; // due to on_complete being unique_ptr
ReadOp(ReadOp &&) = default;
};
OpRequestRef op,
bool do_redundant_reads,
bool for_recovery,
- GenContext<int> *on_complete);
+ std::unique_ptr<ReadCompleter> on_complete);
void do_read_op(ReadOp &rop);