}
};
-template <class F>
-void ECCommon::ReadPipeline::filter_read_op(
- const OSDMapRef& osdmap,
- ReadOp &op,
- F&& on_erase)
+void ECCommon::ReadPipeline::schedule_recovery_work()
{
- set<hobject_t> to_cancel;
- for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
- i != op.source_to_obj.end();
- ++i) {
- if (osdmap->is_down(i->first.osd)) {
- to_cancel.insert(i->second.begin(), i->second.end());
- op.in_progress.erase(i->first);
- continue;
- }
- }
-
- if (to_cancel.empty())
- return;
-
- for (map<pg_shard_t, set<hobject_t> >::iterator i = op.source_to_obj.begin();
- i != op.source_to_obj.end();
- ) {
- for (set<hobject_t>::iterator j = i->second.begin();
- j != i->second.end();
- ) {
- if (to_cancel.count(*j))
- i->second.erase(j++);
- else
- ++j;
- }
- if (i->second.empty()) {
- op.source_to_obj.erase(i++);
- } else {
- ceph_assert(!osdmap->is_down(i->first.osd));
- ++i;
- }
- }
-
- for (set<hobject_t>::iterator i = to_cancel.begin();
- i != to_cancel.end();
- ++i) {
- get_parent()->cancel_pull(*i);
-
- ceph_assert(op.to_read.count(*i));
- read_request_t &req = op.to_read.find(*i)->second;
- dout(10) << __func__ << ": canceling " << req
- << " for obj " << *i << dendl;
- op.to_read.erase(*i);
- op.complete.erase(*i);
- on_erase(*i);
- }
-
- if (op.in_progress.empty()) {
- /* This case is odd. filter_read_op gets called while processing
- * an OSDMap. Normal, non-recovery reads only happen from acting
- * set osds. For this op to have had a read source go down and
- * there not be an interval change, it must be part of a pull during
- * log-based recovery.
- *
- * This callback delays calling complete_read_op until later to avoid
- * dealing with recovery while handling an OSDMap. We assign a
- * cost here of 1 because:
- * 1) This should be very rare, and the operation itself was already
- * throttled.
- * 2) It shouldn't result in IO, rather it should result in restarting
- * the pull on the affected objects and pushes from in-memory buffers
- * on any now complete unaffected objects.
- */
#ifndef WITH_SEASTAR
get_parent()->schedule_recovery_work(
get_parent()->bless_unlocked_gencontext(
- new FinishReadOp(*this, op.tid)),
+ nullptr), //new struct FinishReadOp(*this, op.tid)),
1);
#else
// TODO
ceph_abort_msg("not yet implemented");
#endif
- }
-}
-
-template <class F>
-void ECCommon::ReadPipeline::check_recovery_sources(
- const OSDMapRef& osdmap,
- F&& on_erase)
-{
- set<ceph_tid_t> tids_to_filter;
- for (map<pg_shard_t, set<ceph_tid_t> >::iterator
- i = shard_to_read_map.begin();
- i != shard_to_read_map.end();
- ) {
- if (osdmap->is_down(i->first.osd)) {
- tids_to_filter.insert(i->second.begin(), i->second.end());
- shard_to_read_map.erase(i++);
- } else {
- ++i;
- }
- }
- for (set<ceph_tid_t>::iterator i = tids_to_filter.begin();
- i != tids_to_filter.end();
- ++i) {
- map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
- ceph_assert(j != tid_to_read_map.end());
- filter_read_op(osdmap, j->second, on_erase);
- }
}
void ECCommon::ReadPipeline::on_change()
bool do_redundant_reads, ///< [in] true if we want to issue redundant reads to reduce latency
std::map<pg_shard_t, std::vector<std::pair<int, int>>> *to_read ///< [out] shards, corresponding subchunks to read
); ///< @return error code, 0 on success
+
+ void schedule_recovery_work();
};
/**
template <> struct fmt::formatter<ECCommon::read_result_t> : fmt::ostream_formatter {};
template <> struct fmt::formatter<ECCommon::ReadOp> : fmt::ostream_formatter {};
template <> struct fmt::formatter<ECCommon::RMWPipeline::Op> : fmt::ostream_formatter {};
+
+template <class F>
+void ECCommon::ReadPipeline::check_recovery_sources(
+ const OSDMapRef& osdmap,
+ F&& on_erase)
+{
+ std::set<ceph_tid_t> tids_to_filter;
+ for (std::map<pg_shard_t, std::set<ceph_tid_t> >::iterator
+ i = shard_to_read_map.begin();
+ i != shard_to_read_map.end();
+ ) {
+ if (osdmap->is_down(i->first.osd)) {
+ tids_to_filter.insert(i->second.begin(), i->second.end());
+ shard_to_read_map.erase(i++);
+ } else {
+ ++i;
+ }
+ }
+ for (std::set<ceph_tid_t>::iterator i = tids_to_filter.begin();
+ i != tids_to_filter.end();
+ ++i) {
+ std::map<ceph_tid_t, ReadOp>::iterator j = tid_to_read_map.find(*i);
+ ceph_assert(j != tid_to_read_map.end());
+ filter_read_op(osdmap, j->second, on_erase);
+ }
+}
+
+template <class F>
+void ECCommon::ReadPipeline::filter_read_op(
+ const OSDMapRef& osdmap,
+ ReadOp &op,
+ F&& on_erase)
+{
+ std::set<hobject_t> to_cancel;
+ for (std::map<pg_shard_t, std::set<hobject_t> >::iterator i = op.source_to_obj.begin();
+ i != op.source_to_obj.end();
+ ++i) {
+ if (osdmap->is_down(i->first.osd)) {
+ to_cancel.insert(i->second.begin(), i->second.end());
+ op.in_progress.erase(i->first);
+ continue;
+ }
+ }
+
+ if (to_cancel.empty())
+ return;
+
+ for (std::map<pg_shard_t, std::set<hobject_t> >::iterator i = op.source_to_obj.begin();
+ i != op.source_to_obj.end();
+ ) {
+ for (std::set<hobject_t>::iterator j = i->second.begin();
+ j != i->second.end();
+ ) {
+ if (to_cancel.count(*j))
+ i->second.erase(j++);
+ else
+ ++j;
+ }
+ if (i->second.empty()) {
+ op.source_to_obj.erase(i++);
+ } else {
+ ceph_assert(!osdmap->is_down(i->first.osd));
+ ++i;
+ }
+ }
+
+ for (std::set<hobject_t>::iterator i = to_cancel.begin();
+ i != to_cancel.end();
+ ++i) {
+ get_parent()->cancel_pull(*i);
+
+ ceph_assert(op.to_read.count(*i));
+ op.to_read.erase(*i);
+ op.complete.erase(*i);
+ on_erase(*i);
+ }
+
+ if (op.in_progress.empty()) {
+ /* This case is odd. filter_read_op gets called while processing
+ * an OSDMap. Normal, non-recovery reads only happen from acting
+ * set osds. For this op to have had a read source go down and
+ * there not be an interval change, it must be part of a pull during
+ * log-based recovery.
+ *
+ * This callback delays calling complete_read_op until later to avoid
+ * dealing with recovery while handling an OSDMap. We assign a
+ * cost here of 1 because:
+ * 1) This should be very rare, and the operation itself was already
+ * throttled.
+ * 2) It shouldn't result in IO, rather it should result in restarting
+ * the pull on the affected objects and pushes from in-memory buffers
+ * on any now complete unaffected objects.
+ */
+ schedule_recovery_work();
+ }
+}