From 75394147abd564919bc7dda7dbcdad59289febc1 Mon Sep 17 00:00:00 2001 From: Ronen Friedman Date: Tue, 30 Jul 2024 05:59:00 -0500 Subject: [PATCH] common/not_before_queue: extending the container's API Signed-off-by: Ronen Friedman --- src/common/not_before_queue.h | 134 ++++++++++++++++++++++++++++++---- 1 file changed, 120 insertions(+), 14 deletions(-) diff --git a/src/common/not_before_queue.h b/src/common/not_before_queue.h index a92d6c981ec6d..2bae3fe026c2e 100644 --- a/src/common/not_before_queue.h +++ b/src/common/not_before_queue.h @@ -1,16 +1,5 @@ // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- // vim: ts=8 sw=2 smarttab -/* - * Ceph - scalable distributed file system - * - * Copyright (C) 2004-2006 Sage Weil - * - * This is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License version 2.1, as published by the Free Software - * Foundation. See file COPYING. - * - */ #pragma once @@ -193,6 +182,28 @@ public: return ret; } + /// Dequeue 1st eligible item that satisfies pred, std::nullopt if none + template + std::optional dequeue_by_pred(const PRED& pred) { + auto iter = std::find_if( + eligible_queue.begin(), eligible_queue.end(), + [&pred](const auto &i) { return pred(i.v); }); + + if (iter == eligible_queue.end()) { + return std::nullopt; + } + + assert(iter->status == status_t::ELIGIBLE); + eligible_queue.erase(typename eligible_queue_t::const_iterator(iter)); + iter->status = status_t::INVALID; + + std::optional ret(iter->v); + removal_registry.erase_and_dispose( + removal_registry_t::s_iterator_to(std::as_const(*iter)), + removal_registry_disposer_t{}); + return ret; + } + /** * advance_time * @@ -248,6 +259,75 @@ public: } } + /** + * remove_if_by_class + * + * Remove up to 'max_removed' items for which project_removal_class(item) == k + * AND PRED(item) == true + * + * Returns the number of items removed + */ + template + int remove_if_by_class( + const K& k, + PRED&& pred, + std::optional max_removed = std::nullopt) { + int removed = 0; + for (auto iter = + removal_registry.lower_bound(k, compare_by_removal_class_t{}); + iter != + removal_registry.upper_bound(k, compare_by_removal_class_t{});) { + + if (!pred(iter->v)) { + ++iter; + continue; + } + + if (iter->status == not_before_queue_t::status_t::INELIGIBLE) { + ineligible_queue.erase( + ineligible_queue_t::s_iterator_to(std::as_const(*iter))); + } else if (iter->status == not_before_queue_t::status_t::ELIGIBLE) { + eligible_queue.erase( + eligible_queue_t::s_iterator_to(std::as_const(*iter))); + } else { + assert(0 == "impossible status"); + } + iter->status = not_before_queue_t::status_t::INVALID; + removal_registry.erase_and_dispose( + typename removal_registry_t::const_iterator(iter++), + removal_registry_disposer_t{}); + removed++; + if (max_removed && removed >= *max_removed) { + break; + } + } + return removed; + } + + /** + * accumulate + * + * (mimics std::accumulate() for a binary operator) + * Accumulate (performing a 'left fold') over all entries. Invokes passed + * function with three params: + * f(acc, v, eligible_for_dequeue); + */ + template + ACC accumulate(BOP&& op) const { + ACC acc; + acc = std::accumulate( + eligible_queue.begin(), eligible_queue.end(), std::move(acc), + [op](ACC&& acc, const auto& i) { + return op(std::move(acc), i.v, true); + }); + acc = std::accumulate( + ineligible_queue.begin(), ineligible_queue.end(), std::move(acc), + [op](ACC&& acc, const auto& i) { + return op(std::move(acc), i.v, false); + }); + return acc; + } + /** * for_each * @@ -255,8 +335,34 @@ public: * f(val, eligible_for_dequeue); */ template - void for_each(F &&f) { - for (auto &&i: ineligible_queue) { std::invoke(f, i.v, false); } - for (auto &&i: eligible_queue) { std::invoke(f, i.v, true); } + void for_each(F&& f) const { + for (auto&& i : eligible_queue) { + std::invoke(f, i.v, true); + } + for (auto&& i : ineligible_queue) { + std::invoke(f, i.v, false); + } } + + template + void for_each_n(F&& f, int up_to) const { + for (auto&& i : eligible_queue) { + if (up_to-- <= 0) { + return; + } + std::invoke(f, i.v, true); + } + for (auto&& i : ineligible_queue) { + if (up_to-- <= 0) { + return; + } + std::invoke(f, i.v, false); + } + } + + int total_count() const { + return ineligible_queue.size() + eligible_queue.size(); + } + + int eligible_count() const { return eligible_queue.size(); } }; -- 2.39.5