From 63da6ea20fa56586f6003baf587ec7be0937d893 Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Fri, 16 Dec 2022 18:30:18 +0000 Subject: [PATCH] common: add not_before_queue_t Signed-off-by: Samuel Just --- src/common/not_before_queue.h | 259 ++++++++++++++++++++++++++++++ src/test/CMakeLists.txt | 5 + src/test/test_not_before_queue.cc | 137 ++++++++++++++++ 3 files changed, 401 insertions(+) create mode 100644 src/common/not_before_queue.h create mode 100644 src/test/test_not_before_queue.cc diff --git a/src/common/not_before_queue.h b/src/common/not_before_queue.h new file mode 100644 index 00000000000..e0f782f30ca --- /dev/null +++ b/src/common/not_before_queue.h @@ -0,0 +1,259 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2004-2006 Sage Weil + * + * This is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License version 2.1, as published by the Free Software + * Foundation. See file COPYING. + * + */ + +#pragma once + +#include + +#include "include/utime.h" + +/** + * not_before_queue_t + * + * Implements a generic priority queue with two additional properties: + * - Items are not eligble to be dequeued until their not_before value + * is after the current time (see project_not_before and advance_time) + * - Items can be dequeued efficiently by removal_class (see + * project_removal_class and remove_by_class) + * + * User must define the following free functions: + * - bool operator<(const V &lhs, const V &rhs) + * - const T &project_not_before(const V&) + * - const K &project_removal_class(const V&) + * + * operator< above should be defined such that if lhs is more urgent than + * rhs, lhs < rhs evaluates to true. + * + * project_removal_class returns a reference to a type K used in + * remove_by_class. + * + * project_not_before returns a time value comparable to the time type T. + * + * V must also have a copy constructor. + * + * The purpose of this queue implementation is to add a not_before concept + * to allow specifying a point in time before which the item will not be + * eligible for dequeueing orthogonal to the main priority. Once that point + * is passed, ordering is determined by priority as defined by the operator< + * definition. + */ +template +class not_before_queue_t { + /** + * container_t + * + * Each item has a single container_t. Every container_t is linked + * into and owned by removal_registry_t. Additionally, every element + * will be linked into exactly one of ineligible_queue and eligible_queue. + */ + struct container_t : boost::intrusive::set_base_hook<> // see removal_registry + { + // see ineligible_queue and eligible_queue + using queue_hook_t = boost::intrusive::set_member_hook<>; + queue_hook_t queue_hook; + + enum class status_t { + INVALID, // Not queued, only possible during construction and destruction + INELIGIBLE, // Queued in ineligible_queue + ELIGIBLE // Queued in eligible_queue + } status = status_t::INVALID; + + const V v; + + template + container_t(Args&&... args) : v(std::forward(args)...) {} + ~container_t() { + assert(status == status_t::INVALID); + } + }; + + using queue_hook_option_t = boost::intrusive::member_hook< + container_t, + typename container_t::queue_hook_t, + &container_t::queue_hook>; + + /** + * ineligible_queue + * + * - Contained items have project_not_before(v) > current_time. + * - Contained elements have status set to INELIGIBLE. + * - Contained elements are contained and owned by removal_registry_t + * - Uses same hook as and is mututally exclusive with eligible_queue. + */ + struct compare_by_nb_t { + bool operator()(const container_t &lhs, const container_t &rhs) const { + return project_not_before(lhs.v) < project_not_before(rhs.v); + } + }; + using ineligible_queue_t = boost::intrusive::multiset< + container_t, + queue_hook_option_t, + boost::intrusive::compare>; + ineligible_queue_t ineligible_queue; + + /** + * eligible_queue + * + * - Contains items where project_not_before(v) <= current_time. + * - Contained elements have status set to ELIGIBLE. + * - Contained elements are contained and owned by removal_registry_t + * - Uses same hook as and is mututally exclusive with ineligible_queue. + */ + struct compare_by_user_order_t { + bool operator()(const container_t &lhs, const container_t &rhs) const { + return lhs.v < rhs.v; + } + }; + using eligible_queue_t = boost::intrusive::multiset< + container_t, + queue_hook_option_t, + boost::intrusive::compare>; + eligible_queue_t eligible_queue; + + /** + * removal_registry_t + * + * - Used to efficiently remove items by removal_class. + * - Contains an entry for every item in not_before_queue_t + * (ELIGIBLE or INELIGIBLE) + * - Owns every contained item. + */ + struct compare_by_removal_class_t { + bool operator()(const container_t &lhs, const container_t &rhs) const { + return project_removal_class(lhs.v) < project_removal_class(rhs.v); + } + + template + bool operator()(const U &lhs, const container_t &rhs) const { + return lhs < project_removal_class(rhs.v); + } + + template + bool operator()(const container_t &lhs, const U &rhs) const { + return project_removal_class(lhs.v) < rhs; + } + }; + struct removal_registry_disposer_t { + void operator()(container_t *p) { delete p; } + }; + using removal_registry_t = boost::intrusive::multiset< + container_t, + boost::intrusive::compare>; + removal_registry_t removal_registry; + + /// current time, see advance_time + T current_time; +public: + /// Enqueue item constructed constructible from args... + template + void enqueue(Args&&... args) { + auto *item = new container_t(std::forward(args)...); + removal_registry.insert(*item); + + if (project_not_before(item->v) > current_time) { + item->status = container_t::status_t::INELIGIBLE; + ineligible_queue.insert(*item); + } else { + item->status = container_t::status_t::ELIGIBLE; + eligible_queue.insert(*item); + } + } + + /// Dequeue next item, return std::nullopt there are no eligible items + std::optional dequeue() { + if (eligible_queue.empty()) { + return std::nullopt; + } + + auto iter = eligible_queue.begin(); + assert(iter->status == container_t::status_t::ELIGIBLE); + + eligible_queue.erase( + typename eligible_queue_t::const_iterator(iter)); + iter->status = container_t::status_t::INVALID; + + std::optional ret(iter->v); + removal_registry.erase_and_dispose( + removal_registry_t::s_iterator_to(std::as_const(*iter)), + removal_registry_disposer_t{}); + return ret; + } + + /** + * advance_time + * + * Advances the elibility cutoff, argument must be non-decreasing in + * successive calls. + */ + void advance_time(T next_time) { + assert(next_time >= current_time); + current_time = next_time; + while (true) { + if (ineligible_queue.empty()) { + break; + } + + auto iter = ineligible_queue.begin(); + auto &item = *iter; + assert(item.status == container_t::status_t::INELIGIBLE); + + if (project_not_before(item.v) > current_time) { + break; + } + + item.status = container_t::status_t::ELIGIBLE; + ineligible_queue.erase(typename ineligible_queue_t::const_iterator(iter)); + eligible_queue.insert(item); + } + } + + /** + * remove_by_class + * + * Remove all items such that project_removal_class(item) == k + */ + template + void remove_by_class(const K &k) { + for (auto iter = removal_registry.lower_bound( + k, compare_by_removal_class_t{}); + iter != removal_registry.upper_bound( + k, compare_by_removal_class_t{}); ) { + if (iter->status == container_t::status_t::INELIGIBLE) { + ineligible_queue.erase( + ineligible_queue_t::s_iterator_to(std::as_const(*iter))); + } else if (iter->status == container_t::status_t::ELIGIBLE) { + eligible_queue.erase( + eligible_queue_t::s_iterator_to(std::as_const(*iter))); + } else { + assert(0 == "impossible status"); + } + iter->status = container_t::status_t::INVALID; + removal_registry.erase_and_dispose( + typename removal_registry_t::const_iterator(iter++), + removal_registry_disposer_t{}); + } + } + + /** + * for_each + * + * Traverse contents of queue. Invokes passed function with two params: + * f(val, eligible_for_dequeue); + */ + template + void for_each(F &&f) { + for (auto &&i: ineligible_queue) { std::invoke(f, i.v, false); } + for (auto &&i: eligible_queue) { std::invoke(f, i.v, true); } + } +}; diff --git a/src/test/CMakeLists.txt b/src/test/CMakeLists.txt index 6272b3b1ed6..0ea0bb29347 100644 --- a/src/test/CMakeLists.txt +++ b/src/test/CMakeLists.txt @@ -974,6 +974,11 @@ add_executable(unittest_texttable add_ceph_unittest(unittest_texttable) target_link_libraries(unittest_texttable ceph-common) +# unittest_not_before_queue +add_executable(unittest_not_before_queue + test_not_before_queue.cc) +add_ceph_unittest(unittest_not_before_queue) + if(NOT WIN32) # unittest_on_exit add_executable(unittest_on_exit diff --git a/src/test/test_not_before_queue.cc b/src/test/test_not_before_queue.cc new file mode 100644 index 00000000000..57241ecd91c --- /dev/null +++ b/src/test/test_not_before_queue.cc @@ -0,0 +1,137 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab + +#include + +#include "common/not_before_queue.h" +#include "gtest/gtest.h" + +// Just to have a default constructor that sets it to 0 +struct test_time_t { + unsigned time = 0; + + operator unsigned() const { return time; } + test_time_t() = default; + test_time_t(unsigned t) : time(t) {} + test_time_t &operator=(unsigned t) { + time = t; + return *this; + } +}; + +struct tv_t { + unsigned not_before = 0; + unsigned ordering_value = 0; + unsigned removal_class = 0; + + tv_t() = default; + tv_t(const tv_t &) = default; + tv_t(unsigned not_before, unsigned ov, unsigned rc) + : not_before(not_before), ordering_value(ov), removal_class(rc) {} + + auto to_tuple() const { + return std::make_tuple(not_before, ordering_value, removal_class); + } + bool operator==(const tv_t &rhs) const { + return to_tuple() == rhs.to_tuple(); + } +}; + +std::ostream &operator<<(std::ostream &lhs, const tv_t &val) { + return lhs << val.to_tuple(); +} + +const unsigned &project_not_before(const tv_t &v) { + return v.not_before; +} + +const unsigned &project_removal_class(const tv_t &v) { + return v.removal_class; +} + +bool operator<(const tv_t &lhs, const tv_t &rhs) { + return lhs.ordering_value < rhs.ordering_value; +} + +class NotBeforeTest : public testing::Test { +protected: + not_before_queue_t queue; + + void dump() { + std::cout << "Dumping queue: " << std::endl; + queue.for_each([](auto v, bool eligible) { + std::cout << " item: " << v << ", eligible: " << eligible << std::endl; + }); + } +}; + +TEST_F(NotBeforeTest, Basic) { + tv_t e0{0, 0, 0}; + tv_t e1{0, 1, 0}; + + queue.enqueue(e0); + queue.enqueue(e1); + + ASSERT_EQ(queue.dequeue(), std::make_optional(e0)); + ASSERT_EQ(queue.dequeue(), std::make_optional(e1)); + ASSERT_EQ(queue.dequeue(), std::nullopt); +} + +TEST_F(NotBeforeTest, NotBefore) { + tv_t e0{0, 0, 0}; + tv_t e1{1, 1, 0}; + tv_t e2{1, 2, 0}; + tv_t e3{1, 3, 0}; + tv_t e4{0, 4, 0}; + tv_t e5{0, 5, 0}; + + queue.enqueue(e5); + queue.enqueue(e1); + queue.enqueue(e3); + queue.enqueue(e0); + queue.enqueue(e2); + queue.enqueue(e4); + + ASSERT_EQ(queue.dequeue(), std::make_optional(e0)); + ASSERT_EQ(queue.dequeue(), std::make_optional(e4)); + ASSERT_EQ(queue.dequeue(), std::make_optional(e5)); + ASSERT_EQ(queue.dequeue(), std::nullopt); + + queue.advance_time(1); + + EXPECT_EQ(queue.dequeue(), std::make_optional(e1)); + EXPECT_EQ(queue.dequeue(), std::make_optional(e2)); + EXPECT_EQ(queue.dequeue(), std::make_optional(e3)); + ASSERT_EQ(queue.dequeue(), std::nullopt); +} + +TEST_F(NotBeforeTest, RemoveByClass) { + tv_t e0{0, 0, 1}; + tv_t e1{1, 1, 0}; + tv_t e2{1, 2, 1}; + tv_t e3{1, 3, 1}; + tv_t e4{0, 4, 1}; + tv_t e5{0, 5, 0}; + + queue.enqueue(e5); + queue.enqueue(e1); + queue.enqueue(e3); + queue.enqueue(e0); + queue.enqueue(e2); + queue.enqueue(e4); + + ASSERT_EQ(queue.dequeue(), std::make_optional(e0)); + + queue.remove_by_class(1u); + + ASSERT_EQ(queue.dequeue(), std::make_optional(e5)); + ASSERT_EQ(queue.dequeue(), std::nullopt); + + queue.advance_time(1u); + + EXPECT_EQ(queue.dequeue(), std::make_optional(e1)); + ASSERT_EQ(queue.dequeue(), std::nullopt); +} + + + -- 2.39.5