}
};
+template <class F>
void ECBackend::ReadPipeline::filter_read_op(
const OSDMapRef& osdmap,
- ReadOp &op)
+ ReadOp &op,
+ F&& on_erase)
{
set<hobject_t> to_cancel;
for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
op.to_read.erase(*i);
op.complete.erase(*i);
- // TODO: meh, this doesn't look like a part of the read pipeline
- //recovery_ops.erase(*i);
+ on_erase(*i);
}
if (op.in_progress.empty()) {
}
}
-void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
+template <class F>
+void ECBackend::ReadPipeline::check_recovery_sources(
+ const OSDMapRef& osdmap,
+ F&& on_erase)
{
- // TODO: dissect into ReadPipeline
set<ceph_tid_t> tids_to_filter;
for (map<pg_shard_t, set<ceph_tid_t> >::iterator
- i = read_pipeline.shard_to_read_map.begin();
- i != read_pipeline.shard_to_read_map.end();
+ i = shard_to_read_map.begin();
+ i != shard_to_read_map.end();
) {
if (osdmap->is_down(i->first.osd)) {
tids_to_filter.insert(i->second.begin(), i->second.end());
- read_pipeline.shard_to_read_map.erase(i++);
+ shard_to_read_map.erase(i++);
} else {
++i;
}
for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
i != tids_to_filter.end();
++i) {
- map<ceph_tid_t, ReadOp>::iterator j = read_pipeline.tid_to_read_map.find(*i);
- ceph_assert(j != read_pipeline.tid_to_read_map.end());
- read_pipeline.filter_read_op(osdmap, j->second);
+ map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
+ ceph_assert(j != tid_to_read_map.end());
+ filter_read_op(osdmap, j->second, on_erase);
}
}
+void ECBackend::check_recovery_sources(const OSDMapRef& osdmap)
+{
+ read_pipeline.check_recovery_sources(osdmap, [this] (const hobject_t& obj) {
+ recovery_ops.erase(obj);
+ });
+}
+
void ECBackend::ReadPipeline::on_change()
{
for (map<ceph_tid_t, ReadOp>::iterator i = tid_to_read_map.begin();