For the sake of crimson.
Signed-off-by: Radosław Zarzyński <rzarzyns@redhat.com>
void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
{
-#if 0
- read_pipeline.check_recovery_sources(osdmap, [this] (const hobject_t& obj) {
- recovery_ops.erase(obj);
- });
-#endif
+ struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
+ ECCommon::ReadPipeline& read_pipeline;
+ ceph_tid_t tid;
+ FinishReadOp(ECCommon::ReadPipeline& read_pipeline, ceph_tid_t tid)
+ : read_pipeline(read_pipeline), tid(tid) {}
+ void finish(ThreadPool::TPHandle&) override {
+ auto ropiter = read_pipeline.tid_to_read_map.find(tid);
+ ceph_assert(ropiter != read_pipeline.tid_to_read_map.end());
+ read_pipeline.complete_read_op(ropiter->second);
+ }
+ };
+ read_pipeline.check_recovery_sources(
+ osdmap,
+ [this] (const hobject_t& obj) {
+ recovery_backend.recovery_ops.erase(obj);
+ },
+ [this] (const ReadOp& op) {
+ get_parent()->schedule_recovery_work(
+ get_parent()->bless_unlocked_gencontext(
+ new FinishReadOp(read_pipeline, op.tid)),
+ 1);
+ });
}
void ECBackend::on_change()
tid_to_read_map.erase(rop.tid);
}
-struct FinishReadOp : public GenContext<ThreadPool::TPHandle&> {
- ECCommon::ReadPipeline& read_pipeline;
- ceph_tid_t tid;
- FinishReadOp(ECCommon::ReadPipeline& read_pipeline, ceph_tid_t tid)
- : read_pipeline(read_pipeline), tid(tid) {}
- void finish(ThreadPool::TPHandle&) override {
- auto ropiter = read_pipeline.tid_to_read_map.find(tid);
- ceph_assert(ropiter != read_pipeline.tid_to_read_map.end());
- read_pipeline.complete_read_op(ropiter->second);
- }
-};
-
-void ECCommon::ReadPipeline::schedule_recovery_work()
-{
-#ifndef WITH_SEASTAR
- get_parent()->schedule_recovery_work(
- get_parent()->bless_unlocked_gencontext(
- nullptr), //new struct FinishReadOp(*this, op.tid)),
- 1);
-#else
- // TODO
- ceph_abort_msg("not yet implemented");
-#endif
-}
-
void ECCommon::ReadPipeline::on_change()
{
for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();
bool fast_read,
GenContextURef<std::map<hobject_t,std::pair<int, extent_map> > &&> &&func);
- template <class F>
+ template <class F, class G>
void filter_read_op(
const OSDMapRef& osdmap,
ReadOp &op,
- F&& on_erase);
+ F&& on_erase,
+ G&& on_schedule_recovery);
- template <class F>
- void check_recovery_sources(const OSDMapRef& osdmap, F&& on_erase);
+ template <class F, class G>
+ void check_recovery_sources(
+ const OSDMapRef& osdmap,
+ F&& on_erase,
+ G&& on_schedule_recovery);
void complete_read_op(ReadOp &rop);
template <> struct fmt::formatter<ECCommon::ReadOp> : fmt::ostream_formatter {};
template <> struct fmt::formatter<ECCommon::RMWPipeline::Op> : fmt::ostream_formatter {};
-template <class F>
+template <class F, class G>
void ECCommon::ReadPipeline::check_recovery_sources(
const OSDMapRef& osdmap,
- F&& on_erase)
+ F&& on_erase,
+ G&& on_schedule_recovery)
{
std::set<ceph_tid_t> tids_to_filter;
for (std::map<pg_shard_t, std::set<ceph_tid_t> >::iterator
++i) {
std::map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
ceph_assert(j != tid_to_read_map.end());
- filter_read_op(osdmap, j->second, on_erase);
+ filter_read_op(osdmap, j->second, on_erase, on_schedule_recovery);
}
}
-template <class F>
+template <class F, class G>
void ECCommon::ReadPipeline::filter_read_op(
const OSDMapRef& osdmap,
ReadOp &op,
- F&& on_erase)
+ F&& on_erase,
+ G&& on_schedule_recovery)
{
std::set<hobject_t> to_cancel;
for (std::map<pg_shard_t, std::set<hobject_t> >::iterator i = op.source_to_obj.begin();
* the pull on the affected objects and pushes from in-memory buffers
* on any now complete unaffected objects.
*/
- schedule_recovery_work();
+ on_schedule_recovery(op);
}
}