#include <chrono>
#include <fmt/format.h>
#include "librados/AioCompletionImpl.h"
+#include "common/async/yield_waiter.h"
#include <unordered_map>
return 0;
}
- using Clock = ceph::coarse_mono_clock;
- using Executor = boost::asio::io_context::executor_type;
- using Timer = boost::asio::basic_waitable_timer<Clock,
- boost::asio::wait_traits<Clock>, Executor>;
-
class tokens_waiter {
- const std::chrono::hours infinite_duration;
- size_t pending_tokens;
- Timer timer;
-
- struct token {
- tokens_waiter& waiter;
- token(const token& other) : waiter(other.waiter) {
- ++waiter.pending_tokens;
+ size_t pending_tokens = 0;
+ DoutPrefixProvider* const dpp;
+ ceph::async::yield_waiter<void> waiter;
+
+ public:
+ class token{
+ tokens_waiter* tw;
+ public:
+ token(const token& other) = delete;
+ token(token&& other) : tw(other.tw) {
+ other.tw = nullptr; // mark as moved
}
- token(tokens_waiter& _waiter) : waiter(_waiter) {
- ++waiter.pending_tokens;
+ token& operator=(const token& other) = delete;
+ token(tokens_waiter* _tw) : tw(_tw) {
+ ++tw->pending_tokens;
}
-
+
~token() {
- --waiter.pending_tokens;
- if (waiter.pending_tokens == 0) {
- waiter.timer.cancel();
- }
- }
+ if (!tw) {
+ return; // already moved
+ }
+ --tw->pending_tokens;
+ if (tw->pending_tokens == 0 && tw->waiter) {
+ tw->waiter.complete(boost::system::error_code{});
+ }
+ }
};
-
- public:
- tokens_waiter(boost::asio::io_context& io_context) :
- infinite_duration(1000),
- pending_tokens(0),
- timer(io_context) {}
-
+ tokens_waiter(DoutPrefixProvider* _dpp) : dpp(_dpp) {}
+ tokens_waiter(const tokens_waiter& other) = delete;
+ tokens_waiter& operator=(const tokens_waiter& other) = delete;
+
void async_wait(boost::asio::yield_context yield) {
if (pending_tokens == 0) {
return;
}
- timer.expires_after(infinite_duration);
- boost::system::error_code ec;
- timer.async_wait(yield[ec]);
- ceph_assert(ec == boost::system::errc::operation_canceled);
- }
-
- token make_token() {
- return token(*this);
- }
+ ldpp_dout(dpp, 20) << "INFO: tokens waiter is waiting on " <<
+ pending_tokens << " tokens" << dendl;
+ boost::system::error_code ec;
+ waiter.async_wait(yield[ec]);
+ ldpp_dout(dpp, 20) << "INFO: tokens waiter finished waiting for all tokens" << dendl;
+ }
};
enum class EntryProcessingResult {
return EntryProcessingResult::Successful;
}
+ using Clock = ceph::coarse_mono_clock;
+ using Executor = boost::asio::io_context::executor_type;
+ using Timer = boost::asio::basic_waitable_timer<Clock,
+ boost::asio::wait_traits<Clock>, Executor>;
+
// clean stale reservation from queue
void cleanup_queue(const std::string& queue_name, boost::asio::yield_context yield) {
while (!shutdown) {
auto stop_processing = false;
auto remove_entries = false;
auto entry_idx = 1U;
- tokens_waiter waiter(io_context);
+ tokens_waiter tw(this);
std::vector<bool> needs_migration_vector(entries.size(), false);
for (auto& entry : entries) {
if (stop_processing) {
}
entries_persistency_tracker& notifs_persistency_tracker = topics_persistency_tracker[queue_name];
+ tokens_waiter::token token(&tw);
boost::asio::spawn(yield, std::allocator_arg, make_stack_allocator(),
[this, &is_idle, ¬ifs_persistency_tracker, &queue_name, entry_idx,
total_entries, &end_marker, &remove_entries, &stop_processing,
- token = waiter.make_token(), &entry, &needs_migration_vector,
+ token = std::move(token), &entry, &needs_migration_vector,
push_endpoint = push_endpoint.get(),
&topic_info](boost::asio::yield_context yield) {
auto& persistency_tracker = notifs_persistency_tracker[entry.marker];
++entry_idx;
}
- // wait for all pending work to finish
- waiter.async_wait(yield);
+ if (!entries.empty()) {
+ // wait for all pending work to finish
+ tw.async_wait(yield);
+ }
// delete all published entries from queue
if (remove_entries) {