]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
crimson/osd/osd_operations: fix the usages of PipelineHandle::complete() and exit()
authorYingxin Cheng <yingxin.cheng@intel.com>
Wed, 20 Sep 2023 06:09:22 +0000 (14:09 +0800)
committerYingxin Cheng <yingxin.cheng@intel.com>
Thu, 2 Nov 2023 07:29:08 +0000 (15:29 +0800)
complete() should be called to leave the last phase in the normal path,
and exit() to be called in finally() to release the resources under all
circumstances.

Signed-off-by: Yingxin Cheng <yingxin.cheng@intel.com>
12 files changed:
src/crimson/osd/osd_operations/background_recovery.cc
src/crimson/osd/osd_operations/client_request.cc
src/crimson/osd/osd_operations/internal_client_request.cc
src/crimson/osd/osd_operations/logmissing_request.cc
src/crimson/osd/osd_operations/logmissing_request_reply.cc
src/crimson/osd/osd_operations/peering_event.cc
src/crimson/osd/osd_operations/pg_advance_map.cc
src/crimson/osd/osd_operations/recovery_subrequest.cc
src/crimson/osd/osd_operations/replicated_request.cc
src/crimson/osd/osd_operations/snaptrim_event.cc
src/crimson/osd/osd_operations/snaptrim_event.h
src/crimson/osd/pg_shard_manager.h

index 953ec9595dae782ed6859e16b4be5b9cc2761e9f..74bd238c987b0fec482c5fa27f6316f99da60cde 100644 (file)
@@ -196,7 +196,11 @@ BackfillRecovery::do_recovery()
     peering_pp(*pg).process
   ).then_interruptible([this] {
     pg->get_recovery_handler()->dispatch_backfill_event(std::move(evt));
+    return handle.complete();
+  }).then_interruptible([] {
     return seastar::make_ready_future<bool>(false);
+  }).finally([this] {
+    handle.exit();
   });
 }
 
index 9374fbde2cc06446f29324e8cb3533eceeec9d77..f01f0c491f1a12627e4083709f9f5857fda98574 100644 (file)
@@ -143,6 +143,9 @@ seastar::future<> ClientRequest::with_pg_int(
        } else {
          return process_op(ihref, pgref);
        }
+      }).then_interruptible([this, this_instance_id, &ihref] {
+        logger().debug("{}.{}: complete", *this, this_instance_id);
+        return ihref.handle.complete();
       }).then_interruptible([this, this_instance_id, pgref] {
        logger().debug("{}.{}: after process*", *this, this_instance_id);
        pgref->client_request_orderer.remove_request(*this);
@@ -151,11 +154,15 @@ seastar::future<> ClientRequest::with_pg_int(
     }, [this, this_instance_id, pgref](std::exception_ptr eptr) {
       // TODO: better debug output
       logger().debug("{}.{}: interrupted {}", *this, this_instance_id, eptr);
-    }, pgref).finally(
-      [opref=std::move(opref), pgref=std::move(pgref),
-       instance_handle=std::move(instance_handle), &ihref] {
-      ihref.handle.exit();
-    });
+    },
+    pgref
+  ).finally(
+    [opref=std::move(opref), pgref,
+     instance_handle=std::move(instance_handle), &ihref,
+     this_instance_id, this] {
+    logger().debug("{}.{}: exit", *this, this_instance_id);
+    ihref.handle.exit();
+  });
 }
 
 seastar::future<> ClientRequest::with_pg(
index 87438d4a1468563936fa9ede9317947e122466dd..790eb3f932d0239c8dfccf3a6f80a8332e0e7e0d 100644 (file)
@@ -110,9 +110,14 @@ seastar::future<> InternalClientRequest::start()
               });
             });
           });
-        }).handle_error_interruptible(PG::load_obc_ertr::all_same_way([] {
-          return seastar::now();
-        })).then_interruptible([] {
+        }).si_then([this] {
+          logger().debug("{}: complete", *this);
+          return handle.complete();
+        }).handle_error_interruptible(
+          PG::load_obc_ertr::all_same_way([] {
+            return seastar::now();
+          })
+        ).then_interruptible([] {
           return seastar::stop_iteration::yes;
         });
       }, [this](std::exception_ptr eptr) {
@@ -124,6 +129,9 @@ seastar::future<> InternalClientRequest::start()
       }, pg);
     }).then([this] {
       track_event<CompletionEvent>();
+    }).finally([this] {
+      logger().debug("{}: exit", *this);
+      handle.exit();
     });
   });
 }
index 739b46406500613057bad748e702333eac5c7a86..ee83977cd8a2e5d1eeecc3ec29222107dd32c76b 100644 (file)
@@ -72,8 +72,16 @@ seastar::future<> LogMissingRequest::with_pg(
       });
     }).then_interruptible([this, pg](auto) {
       return pg->do_update_log_missing(req, conn);
+    }).then_interruptible([this] {
+      logger().debug("{}: complete", *this);
+      return handle.complete();
     });
-  }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
+  }, [](std::exception_ptr) {
+    return seastar::now();
+  }, pg).finally([this, ref] {
+    logger().debug("{}: exit", *this);
+    handle.exit();
+  });
 }
 
 }
index b4bf2938e05bca0e1b93255c1d8596df982dd07a..16e61ab4a9858e33fd620549b921c976d8e3a4d8 100644 (file)
@@ -61,8 +61,17 @@ seastar::future<> LogMissingRequestReply::with_pg(
 
   IRef ref = this;
   return interruptor::with_interruption([this, pg] {
-    return pg->do_update_log_missing_reply(std::move(req));
-  }, [ref](std::exception_ptr) { return seastar::now(); }, pg);
+    return pg->do_update_log_missing_reply(std::move(req)
+    ).then_interruptible([this] {
+      logger().debug("{}: complete", *this);
+      return handle.complete();
+    });
+  }, [](std::exception_ptr) {
+    return seastar::now();
+  }, pg).finally([this, ref] {
+    logger().debug("{}: exit", *this);
+    handle.exit();
+  });
 }
 
 }
index ea4662bd01e0b72b9680bd8d1f4eac2584966569..0712147ab2b7b355a91a3ecb9fbee727cacc001a 100644 (file)
@@ -85,8 +85,9 @@ seastar::future<> PeeringEvent<T>::with_pg(
       return this->template enter_stage<interruptor>(peering_pp(*pg).process);
     }).then_interruptible([this, pg, &shard_services] {
       return pg->do_peering_event(evt, ctx
-      ).then_interruptible([this, pg, &shard_services] {
-       that()->get_handle().exit();
+      ).then_interruptible([this] {
+       return that()->get_handle().complete();
+      }).then_interruptible([this, pg, &shard_services] {
        return complete_rctx(shard_services, pg);
       });
     }).then_interruptible([pg, &shard_services]()
@@ -100,7 +101,10 @@ seastar::future<> PeeringEvent<T>::with_pg(
     });
   }, [this](std::exception_ptr ep) {
     logger().debug("{}: interrupted with {}", *this, ep);
-  }, pg);
+  }, pg).finally([this] {
+    logger().debug("{}: exit", *this);
+    that()->get_handle().exit();
+  });
 }
 
 template <class T>
index 3706af810557e327c9f8eeaeb6bd62a433f26619..ba63212fc7fc929cb6b19747432f5879f523c1fd 100644 (file)
@@ -122,8 +122,12 @@ seastar::future<> PGAdvanceMap::start()
          return shard_services.send_pg_temp();
        });
     });
-  }).then([this, ref=std::move(ref)] {
+  }).then([this] {
     logger().debug("{}: complete", *this);
+    return handle.complete();
+  }).finally([this, ref=std::move(ref)] {
+    logger().debug("{}: exit", *this);
+    handle.exit();
   });
 }
 
index 68655b8da517198147916e358e4f07cb077cf2b4..dd310d8d72743df966617f8c035531b5e9bcb8b1 100644 (file)
@@ -30,11 +30,17 @@ seastar::future<> RecoverySubRequest::with_pg(
   track_event<StartEvent>();
   IRef opref = this;
   return interruptor::with_interruption([this, pgref] {
-    return pgref->get_recovery_backend()->handle_recovery_op(m, conn);
+    return pgref->get_recovery_backend()->handle_recovery_op(m, conn
+    ).then_interruptible([this] {
+      logger().debug("{}: complete", *this);
+      return handle.complete();
+    });
   }, [](std::exception_ptr) {
     return seastar::now();
-  }, pgref).finally([this, opref, pgref] {
+  }, pgref).finally([this, opref=std::move(opref), pgref] {
+    logger().debug("{}: exit", *this);
     track_event<CompletionEvent>();
+    handle.exit();
   });
 }
 
index 09217575c8ff31a94dd3277a5752f1ea1db78646..7e16b2ebd06af527a6962e88c0bb7caa93c07b98 100644 (file)
@@ -71,10 +71,16 @@ seastar::future<> RepRequest::with_pg(
       });
     }).then_interruptible([this, pg] (auto) {
       return pg->handle_rep_op(req);
+    }).then_interruptible([this] {
+      logger().debug("{}: complete", *this);
+      return handle.complete();
     });
-  }, [ref](std::exception_ptr) {
+  }, [](std::exception_ptr) {
     return seastar::now();
-  }, pg);
+  }, pg).finally([this, ref=std::move(ref)] {
+    logger().debug("{}: exit", *this);
+    handle.exit();
+  });
 }
 
 }
index e4a1b04df142fa39da427734bc28a319a021c14f..ffd43d736ad393e038af37d03ecaa65440c3e9b7 100644 (file)
@@ -80,27 +80,15 @@ void SnapTrimEvent::dump_detail(Formatter *f) const
   f->close_section();
 }
 
-SnapTrimEvent::snap_trim_ertr::future<seastar::stop_iteration>
-SnapTrimEvent::start()
-{
-  logger().debug("{}: {}", *this, __func__);
-  return with_pg(
-    pg->get_shard_services(), pg
-  ).finally([ref=IRef{this}, this] {
-    logger().debug("{}: complete", *ref);
-    return handle.complete();
-  });
-}
-
 CommonPGPipeline& SnapTrimEvent::client_pp()
 {
   return pg->request_pg_pipeline;
 }
 
 SnapTrimEvent::snap_trim_ertr::future<seastar::stop_iteration>
-SnapTrimEvent::with_pg(
-  ShardServices &shard_services, Ref<PG> _pg)
+SnapTrimEvent::start()
 {
+  ShardServices &shard_services = pg->get_shard_services();
   return interruptor::with_interruption([&shard_services, this] {
     return enter_stage<interruptor>(
       client_pp().wait_for_active
@@ -176,7 +164,7 @@ SnapTrimEvent::with_pg(
           return subop_blocker.wait_completion();
         }).finally([this] {
          pg->snaptrim_mutex.unlock();
-       }).safe_then_interruptible([this] {
+       }).si_then([this] {
           if (!needs_pause) {
             return interruptor::now();
           }
@@ -193,17 +181,24 @@ SnapTrimEvent::with_pg(
             return seastar::sleep(
               std::chrono::milliseconds(std::lround(time_to_sleep * 1000)));
           });
-        }).safe_then_interruptible([this] {
+        }).si_then([this] {
           logger().debug("{}: all completed", *this);
           return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(
             seastar::stop_iteration::no);
         });
+      }).si_then([this](auto stop) {
+        return handle.complete().then([stop] {
+          return snap_trim_iertr::make_ready_future<seastar::stop_iteration>(stop);
+        });
       });
     });
   }, [this](std::exception_ptr eptr) -> snap_trim_ertr::future<seastar::stop_iteration> {
     logger().debug("{}: interrupted {}", *this, eptr);
     return crimson::ct_error::eagain::make();
-  }, pg);
+  }, pg).finally([this] {
+    logger().debug("{}: exit", *this);
+    handle.exit();
+  });
 }
 
 
@@ -212,18 +207,6 @@ CommonPGPipeline& SnapTrimObjSubEvent::client_pp()
   return pg->request_pg_pipeline;
 }
 
-SnapTrimObjSubEvent::remove_or_update_iertr::future<>
-SnapTrimObjSubEvent::start()
-{
-  logger().debug("{}: start", *this);
-  return with_pg(
-    pg->get_shard_services(), pg
-  ).finally([ref=IRef{this}, this] {
-    logger().debug("{}: complete", *ref);
-    return handle.complete();
-  });
-}
-
 SnapTrimObjSubEvent::remove_or_update_iertr::future<>
 SnapTrimObjSubEvent::remove_clone(
   ObjectContextRef obc,
@@ -466,7 +449,7 @@ SnapTrimObjSubEvent::remove_or_update(
                   *this, coid, old_snaps, new_snaps);
     ret = adjust_snaps(obc, head_obc, new_snaps, txn, log_entries);
   }
-  return std::move(ret).safe_then_interruptible(
+  return std::move(ret).si_then(
     [&txn, obc, num_objects_before_trim, log_entries=std::move(log_entries), head_obc=std::move(head_obc), this]() mutable {
     osd_op_p.at_version = pg->next_version();
 
@@ -484,7 +467,7 @@ SnapTrimObjSubEvent::remove_or_update(
       //  num_objects_before_trim - delta_stats.num_objects;
       //add_objects_trimmed_count(num_objects_trimmed);
     }
-  }).safe_then_interruptible(
+  }).si_then(
     [&txn, log_entries=std::move(log_entries)] () mutable {
     return remove_or_update_iertr::make_ready_future<remove_or_update_ret_t>(
       std::make_pair(std::move(txn), std::move(log_entries)));
@@ -493,8 +476,7 @@ SnapTrimObjSubEvent::remove_or_update(
 }
 
 SnapTrimObjSubEvent::remove_or_update_iertr::future<>
-SnapTrimObjSubEvent::with_pg(
-  ShardServices &shard_services, Ref<PG> _pg)
+SnapTrimObjSubEvent::start()
 {
   return enter_stage<interruptor>(
     client_pp().wait_for_active
@@ -544,10 +526,16 @@ SnapTrimObjSubEvent::with_pg(
           });
         });
       });
+    }).si_then([this] {
+      logger().debug("{}: completed", *this);
+      return handle.complete();
     }).handle_error_interruptible(
       remove_or_update_iertr::pass_further{},
       crimson::ct_error::assert_all{"unexpected error in SnapTrimObjSubEvent"}
     );
+  }).finally([this] {
+    logger().debug("{}: exit", *this);
+    handle.exit();
   });
 }
 
index a3a970a04c7d017a5f65ae5b1743ea96ac29bdf3..afb24952a045cf8c3c97ed538b47bbf876a5a631 100644 (file)
@@ -53,8 +53,6 @@ public:
   void print(std::ostream &) const final;
   void dump_detail(ceph::Formatter* f) const final;
   snap_trim_ertr::future<seastar::stop_iteration> start();
-  snap_trim_ertr::future<seastar::stop_iteration> with_pg(
-    ShardServices &shard_services, Ref<PG> pg);
 
 private:
   CommonPGPipeline& client_pp();
@@ -140,8 +138,6 @@ public:
   void print(std::ostream &) const final;
   void dump_detail(ceph::Formatter* f) const final;
   remove_or_update_iertr::future<> start();
-  remove_or_update_iertr::future<> with_pg(
-    ShardServices &shard_services, Ref<PG> pg);
 
   CommonPGPipeline& client_pp();
 
index 71a9cf1a9c804a562672775cda69c94a6b06aeae..e080dc43e4add3fd0356ca9142e57cbe63e24b95 100644 (file)
@@ -340,7 +340,14 @@ public:
        opref.get_connection_pipeline().get_pg);
     }).then([this, &opref] {
       return get_pg_to_shard_mapping().maybe_create_pg(opref.get_pgid());
-    }).then([this, &logger, op=std::move(op)](auto core) mutable {
+    }).then_wrapped([this, &logger, op=std::move(op)](auto fut) mutable {
+      if (unlikely(fut.failed())) {
+        logger.error("{}: failed before with_pg", *op);
+        op->get_handle().exit();
+        return seastar::make_exception_future<>(fut.get_exception());
+      }
+
+      auto core = fut.get();
       logger.debug("{}: can_create={}, target-core={}",
                    *op, T::can_create(), core);
       return this->template with_remote_shard_state_and_op<T>(