From: Adam Emerson Date: Tue, 29 Aug 2023 22:14:26 +0000 (-0400) Subject: test/neorados: Rounding out test coverage, part 3 X-Git-Tag: v19.3.0~349^2~4 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=dce23263d642640bae1f656d0266d53e3146dac0;p=ceph.git test/neorados: Rounding out test coverage, part 3 This includes snapshots and watch_notify Signed-off-by: Adam Emerson --- diff --git a/src/test/neorados/CMakeLists.txt b/src/test/neorados/CMakeLists.txt index ceb64c85bedf..97b99a04e5d7 100644 --- a/src/test/neorados/CMakeLists.txt +++ b/src/test/neorados/CMakeLists.txt @@ -169,3 +169,35 @@ target_link_libraries(ceph_test_neorados_pool install(TARGETS ceph_test_neorados_pool DESTINATION ${CMAKE_INSTALL_BINDIR}) + +add_executable(ceph_test_neorados_snapshots + snapshots.cc + ) +target_link_libraries(ceph_test_neorados_snapshots + libneorados + ${BLKID_LIBRARIES} + ${CMAKE_DL_LIBS} + ${CRYPTO_LIBS} + ${EXTRALIBS} + neoradostest-support + ${UNITTEST_LIBS} + ) +install(TARGETS + ceph_test_neorados_snapshots + DESTINATION ${CMAKE_INSTALL_BINDIR}) + +add_executable(ceph_test_neorados_watch_notify + watch_notify.cc + ) +target_link_libraries(ceph_test_neorados_watch_notify + libneorados + ${BLKID_LIBRARIES} + ${CMAKE_DL_LIBS} + ${CRYPTO_LIBS} + ${EXTRALIBS} + neoradostest-support + ${UNITTEST_LIBS} + ) +install(TARGETS + ceph_test_neorados_watch_notify + DESTINATION ${CMAKE_INSTALL_BINDIR}) diff --git a/src/test/neorados/common_tests.h b/src/test/neorados/common_tests.h index eecebe20f9ef..9610f08f643f 100644 --- a/src/test/neorados/common_tests.h +++ b/src/test/neorados/common_tests.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include @@ -32,6 +33,7 @@ #include #include #include +#include #include #include @@ -312,6 +314,14 @@ protected: co_return bl; } + boost::asio::awaitable + create_obj(std::string_view oid) { + neorados::WriteOp op; + op.create(true); + co_return co_await rados().execute(oid, pool(), std::move(op), + boost::asio::use_awaitable); + } + public: /// \brief Create RADOS handle and pool for the test @@ -510,3 +520,13 @@ inline bool is_crimson_cluster() { std::cerr << "Not supported by crimson yet. Skipped" << std::endl; \ co_return; \ } + +/// \brief Wait for a specified time +/// +/// \param dur Time to wait. +template +boost::asio::awaitable wait_for(std::chrono::duration dur) +{ + boost::asio::steady_timer t(co_await boost::asio::this_coro::executor, dur); + co_return co_await t.async_wait(boost::asio::use_awaitable); +} diff --git a/src/test/neorados/handler_error.cc b/src/test/neorados/handler_error.cc index 26d468bc0246..7f09c4e7d754 100644 --- a/src/test/neorados/handler_error.cc +++ b/src/test/neorados/handler_error.cc @@ -35,7 +35,7 @@ namespace buffer = ceph::buffer; CORO_TEST_F(neocls_handler_error, test_handler_error, NeoRadosTest) { std::string_view oid = "obj"; - co_await create_obj(rados(), oid, pool(), asio::use_awaitable); + co_await create_obj(oid); { neorados::ReadOp op; diff --git a/src/test/neorados/snapshots.cc b/src/test/neorados/snapshots.cc new file mode 100644 index 000000000000..d1e6c96d6e83 --- /dev/null +++ b/src/test/neorados/snapshots.cc @@ -0,0 +1,431 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM + * + * See file COPYING for license information. + * + */ + +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +#include "include/neorados/RADOS.hpp" + +#include "osd/error_code.h" + +#include "test/neorados/common_tests.h" + +#include "gtest/gtest.h" + +using std::uint64_t; + +namespace asio = boost::asio; +namespace sys = boost::system; + +using namespace std::literals; + +using neorados::ReadOp; +using neorados::WriteOp; + +inline asio::awaitable new_selfmanaged_snap(neorados::RADOS& rados, + std::vector& snaps, + neorados::IOContext& ioc) { + snaps.push_back(co_await rados.allocate_selfmanaged_snap( + ioc.get_pool(), asio::use_awaitable)); + std::reverse(snaps.begin(), snaps.end()); + ioc.set_write_snap_context({{snaps[0], snaps}}); + std::reverse(snaps.begin(), snaps.end()); + co_return; +} + +inline asio::awaitable rm_selfmanaged_snaps(neorados::RADOS& rados, + std::vector& snaps, + neorados::IOContext& ioc) { + std::reverse(snaps.begin(), snaps.end()); + for (auto snapid : snaps) { + co_await rados.delete_selfmanaged_snap(ioc.get_pool(), snapid, + asio::use_awaitable); + } + snaps.clear(); +} + +static constexpr auto oid = "oid"sv; + +CORO_TEST_F(NeoRadosSnapshots, SnapList, NeoRadosTest) { + static const auto snap1 = "snap1"s; + co_await create_obj(oid); + EXPECT_FALSE(rados().get_self_managed_snaps_mode(pool())); + co_await rados().create_pool_snap(pool(), snap1, + asio::use_awaitable); + EXPECT_FALSE(rados().get_self_managed_snaps_mode(pool())); + + auto snaps = rados().list_snaps(pool()); + EXPECT_EQ(1u, snaps.size()); + auto rid = rados().lookup_snap(pool(), snap1); + EXPECT_EQ(rid, snaps[0]); + co_await rados().delete_pool_snap(pool().get_pool(), snap1, asio::use_awaitable); + EXPECT_FALSE(rados().get_self_managed_snaps_mode(pool())); + co_return; +} + +CORO_TEST_F(NeoRadosSnapshots, SnapRemove, NeoRadosTest) { + static const auto snap1 = "snap1"s; + co_await create_obj(oid); + co_await rados().create_pool_snap(pool(), snap1, + asio::use_awaitable); + rados().lookup_snap(pool(), snap1); + co_await rados().delete_pool_snap(pool().get_pool(), snap1, asio::use_awaitable); + EXPECT_THROW(rados().lookup_snap(pool(), snap1);, + sys::system_error); + + co_return; +} + +CORO_TEST_F(NeoRadosSnapshots, Rollback, NeoRadosTest) { + static const auto snap1 = "snap1"s; + const auto bl1 = filled_buffer_list(0xcc, 128); + const auto bl2 = filled_buffer_list(0xdd, 128); + + co_await execute(oid, WriteOp{}.write(0, bl1)); + co_await rados().create_pool_snap(pool(), snap1, asio::use_awaitable); + co_await execute(oid, WriteOp{}.write_full(bl2)); + + auto resbl = co_await read(oid); + EXPECT_EQ(bl2, resbl); + + co_await execute(oid, WriteOp{}.rollback(rados().lookup_snap(pool(), snap1))); + + resbl = co_await read(oid); + EXPECT_EQ(bl1, resbl); + + co_return; +} + +CORO_TEST_F(NeoRadosSnapshots, SnapGetName, NeoRadosTest) { + static const auto snapfoo = "snapfoo"s; + static const auto snapbar = "snapbar"s; + co_await create_obj(oid); + co_await rados().create_pool_snap(pool(), snapfoo, asio::use_awaitable); + auto rid = rados().lookup_snap(pool(), snapfoo); + EXPECT_EQ(snapfoo, rados().get_snap_name(pool(), rid)); + rados().get_snap_timestamp(pool(), rid); + co_await rados().delete_pool_snap(pool().get_pool(), snapfoo, asio::use_awaitable); + co_return; +} + +CORO_TEST_F(NeoRadosSnapshots, SnapCreateRemove, NeoRadosTest) { + // reproduces http://tracker.ceph.com/issues/10262 + static const auto snapfoo = "snapfoo"s; + static const auto snapbar = "snapbar"s; + const auto bl = to_buffer_list("foo"sv); + co_await execute(oid, WriteOp{}.write_full(bl)); + co_await rados().create_pool_snap(pool(), snapfoo, asio::use_awaitable); + co_await execute(oid, WriteOp{}.remove()); + co_await rados().create_pool_snap(pool(), snapbar, asio::use_awaitable); + + WriteOp op; + op.create(false); + op.remove(); + co_await execute(oid, std::move(op)); + co_await rados().delete_pool_snap(pool().get_pool(), snapfoo, + asio::use_awaitable); + co_await rados().delete_pool_snap(pool().get_pool(), snapbar, + asio::use_awaitable); + co_return; +} + +CORO_TEST_F(NeoRadosSelfManagedSnaps, Snap, NeoRadosTest) { + std::vector my_snaps; + EXPECT_FALSE(rados().get_self_managed_snaps_mode(pool())); + auto ioc = pool(); + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + EXPECT_TRUE(rados().get_self_managed_snaps_mode(pool())); + + + const auto bl1 = filled_buffer_list(0xcc, 128); + co_await execute(oid, WriteOp{}.write(0, bl1), ioc); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + const auto bl2 = filled_buffer_list(0xdd, 128); + co_await execute(oid, WriteOp{}.write(0, bl2), ioc); + + ioc.set_read_snap(my_snaps[1]); + auto resbl = co_await read(oid, ioc); + EXPECT_EQ(bl1, resbl); + + co_await rados().delete_selfmanaged_snap(ioc.get_pool(), my_snaps.back(), + asio::use_awaitable); + my_snaps.pop_back(); + ioc.set_read_snap(neorados::snap_head); + EXPECT_TRUE(rados().get_self_managed_snaps_mode(pool())); + co_await execute(oid, WriteOp{}.remove()); + co_return; +} + +CORO_TEST_F(NeoRadosSelfManagedSnaps, Rollback, NeoRadosTest) { + SKIP_IF_CRIMSON(); + static constexpr auto len = 128u; + std::vector my_snaps; + + auto ioc = pool(); + auto readioc = pool(); + readioc.set_read_snap(neorados::snap_dir); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + const auto bl1 = filled_buffer_list(0xcc, len); + co_await execute(oid, WriteOp{}.write(0, bl1), ioc); + co_await execute(oid, WriteOp{}.write(len, bl1), ioc); + co_await execute(oid, WriteOp{}.write(len * 2, bl1), ioc); + + neorados::SnapSet ss; + co_await execute(oid, ReadOp{}.list_snaps(&ss), readioc); + EXPECT_EQ(1u, ss.clones.size()); + EXPECT_EQ(neorados::snap_head, ss.clones[0].cloneid); + EXPECT_EQ(0u, ss.clones[0].snaps.size()); + EXPECT_EQ(0u, ss.clones[0].overlap.size()); + EXPECT_EQ(len * 3, ss.clones[0].size); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + const auto bl2 = filled_buffer_list(0xdd, 128); + // Once in the middle + co_await execute(oid, WriteOp{}.write(len, bl2), ioc); + // Once after the end + co_await execute(oid, WriteOp{}.write(len * 3, bl1), ioc); + + + co_await expect_error_code(execute(oid, ReadOp{}.list_snaps(&ss), ioc), + sys::errc::invalid_argument); + co_await execute(oid, ReadOp{}.list_snaps(&ss), readioc); + EXPECT_EQ(2u, ss.clones.size()); + EXPECT_EQ(my_snaps[1], ss.clones[0].cloneid); + EXPECT_EQ(1u, ss.clones[0].snaps.size()); + EXPECT_EQ(my_snaps[1], ss.clones[0].snaps[0]); + EXPECT_EQ(2u, ss.clones[0].overlap.size()); + EXPECT_EQ(0u, ss.clones[0].overlap[0].first); + EXPECT_EQ(len, ss.clones[0].overlap[0].second); + EXPECT_EQ(len * 2, ss.clones[0].overlap[1].first); + EXPECT_EQ(len, ss.clones[0].overlap[1].second); + EXPECT_EQ(len * 3, ss.clones[0].size); + EXPECT_EQ(neorados::snap_head, ss.clones[1].cloneid); + EXPECT_EQ(0u, ss.clones[1].snaps.size()); + EXPECT_EQ(0u, ss.clones[1].overlap.size()); + EXPECT_EQ(len * 4, ss.clones[1].size); + + co_await execute(oid, WriteOp{}.rollback(my_snaps[1]), ioc); + + auto resbl = co_await read(oid, 0, len); + EXPECT_EQ(len, resbl.length()); + EXPECT_EQ(bl1, resbl); + resbl = co_await read(oid, len, len); + EXPECT_EQ(len, resbl.length()); + EXPECT_EQ(bl1, resbl); + + resbl = co_await read(oid, len * 2, len); + EXPECT_EQ(len, resbl.length()); + EXPECT_EQ(bl1, resbl); + + resbl = co_await read(oid, len * 3, len); + EXPECT_EQ(0u, resbl.length()); + + co_await rm_selfmanaged_snaps(rados(), my_snaps, ioc); + + co_return; +} + +CORO_TEST_F(NeoRadosSelfManagedSnaps, SnapOverlap, NeoRadosTest) { + // WIP https://tracker.ceph.com/issues/58263 + SKIP_IF_CRIMSON(); + static constexpr auto len = 128u; + std::vector my_snaps; + auto ioc = pool(); + auto readioc = pool(); + readioc.set_read_snap(neorados::snap_dir); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + const auto bl1 = filled_buffer_list(0xcc, len); + co_await execute(oid, WriteOp{}.write(0, bl1), ioc); + co_await execute(oid, WriteOp{}.write(len * 2, bl1), ioc); + co_await execute(oid, WriteOp{}.write(len * 4, bl1), ioc); + co_await execute(oid, WriteOp{}.write(len * 6, bl1), ioc); + co_await execute(oid, WriteOp{}.write(len * 8, bl1), ioc); + + neorados::SnapSet ss; + co_await execute(oid, ReadOp{}.list_snaps(&ss), readioc); + EXPECT_EQ(1u, ss.clones.size()); + EXPECT_EQ(neorados::snap_head, ss.clones[0].cloneid); + EXPECT_EQ(0u, ss.clones[0].snaps.size()); + EXPECT_EQ(0u, ss.clones[0].overlap.size()); + EXPECT_EQ(9u * len, ss.clones[0].size); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + const auto bl2 = filled_buffer_list(0xdd, len); + co_await execute(oid, WriteOp{}.write(len * 1, bl2), ioc); + co_await execute(oid, WriteOp{}.write(len * 3, bl2), ioc); + co_await execute(oid, WriteOp{}.write(len * 5, bl2), ioc); + co_await execute(oid, WriteOp{}.write(len * 7, bl2), ioc); + co_await execute(oid, WriteOp{}.write(len * 9, bl2), ioc); + + co_await execute(oid, ReadOp{}.list_snaps(&ss), readioc); + EXPECT_EQ(2u, ss.clones.size()); + EXPECT_EQ(my_snaps[1], ss.clones[0].cloneid); + EXPECT_EQ(1u, ss.clones[0].snaps.size()); + EXPECT_EQ(my_snaps[1], ss.clones[0].snaps[0]); + EXPECT_EQ(5u, ss.clones[0].overlap.size()); + EXPECT_EQ(0u, ss.clones[0].overlap[0].first); + EXPECT_EQ(len, ss.clones[0].overlap[0].second); + EXPECT_EQ(len * 2, ss.clones[0].overlap[1].first); + EXPECT_EQ(len, ss.clones[0].overlap[1].second); + EXPECT_EQ(len * 4, ss.clones[0].overlap[2].first); + EXPECT_EQ(len, ss.clones[0].overlap[2].second); + EXPECT_EQ(len * 6, ss.clones[0].overlap[3].first); + EXPECT_EQ(len, ss.clones[0].overlap[3].second); + EXPECT_EQ(len * 8, ss.clones[0].overlap[4].first); + EXPECT_EQ(len, ss.clones[0].overlap[4].second); + EXPECT_EQ(len * 9, ss.clones[0].size); + EXPECT_EQ(neorados::snap_head, ss.clones[1].cloneid); + EXPECT_EQ(0u, ss.clones[1].snaps.size()); + EXPECT_EQ(0u, ss.clones[1].overlap.size()); + EXPECT_EQ(len * 10, ss.clones[1].size); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + + const auto bl3 = filled_buffer_list(0xee, len); + co_await execute(oid, WriteOp{}.write(len * 1, bl1), ioc); + co_await execute(oid, WriteOp{}.write(len * 4, bl1), ioc); + co_await execute(oid, WriteOp{}.write(len * 5, bl1), ioc); + co_await execute(oid, WriteOp{}.write(len * 8, bl1), ioc); + + co_await execute(oid, ReadOp{}.list_snaps(&ss), readioc); + + EXPECT_EQ(3u, ss.clones.size()); + EXPECT_EQ(my_snaps[1], ss.clones[0].cloneid); + EXPECT_EQ(1u, ss.clones[0].snaps.size()); + EXPECT_EQ(my_snaps[1], ss.clones[0].snaps[0]); + EXPECT_EQ(5u, ss.clones[0].overlap.size()); + EXPECT_EQ(0u, ss.clones[0].overlap[0].first); + EXPECT_EQ(len, ss.clones[0].overlap[0].second); + EXPECT_EQ(len * 2, ss.clones[0].overlap[1].first); + EXPECT_EQ(len, ss.clones[0].overlap[1].second); + EXPECT_EQ(len * 4, ss.clones[0].overlap[2].first); + EXPECT_EQ(len, ss.clones[0].overlap[2].second); + EXPECT_EQ(len * 6, ss.clones[0].overlap[3].first); + EXPECT_EQ(len, ss.clones[0].overlap[3].second); + EXPECT_EQ(len * 8, ss.clones[0].overlap[4].first); + EXPECT_EQ(len, ss.clones[0].overlap[4].second); + EXPECT_EQ(len * 9, ss.clones[0].size); + + EXPECT_EQ(my_snaps[2], ss.clones[1].cloneid); + EXPECT_EQ(1u, ss.clones[1].snaps.size()); + EXPECT_EQ(my_snaps[2], ss.clones[1].snaps[0]); + EXPECT_EQ(4u, ss.clones[1].overlap.size()); + EXPECT_EQ(0u, ss.clones[1].overlap[0].first); + EXPECT_EQ(len, ss.clones[1].overlap[0].second); + EXPECT_EQ(len * 2, ss.clones[1].overlap[1].first); + EXPECT_EQ(len * 2, ss.clones[1].overlap[1].second); + EXPECT_EQ(len * 6, ss.clones[1].overlap[2].first); + EXPECT_EQ(len * 2, ss.clones[1].overlap[2].second); + EXPECT_EQ(len * 9, ss.clones[1].overlap[3].first); + EXPECT_EQ(len, ss.clones[1].overlap[3].second); + EXPECT_EQ(len * 10, ss.clones[1].size); + + EXPECT_EQ(neorados::snap_head, ss.clones[2].cloneid); + EXPECT_EQ(0u, ss.clones[2].snaps.size()); + EXPECT_EQ(0u, ss.clones[2].overlap.size()); + EXPECT_EQ(len * 10, ss.clones[2].size); + + co_await rm_selfmanaged_snaps(rados(), my_snaps, ioc); + + co_return; +} + +CORO_TEST_F(NeoRadosSelfManagedSnaps, Bug11677, NeoRadosTest) { + std::vector my_snaps; + auto ioc = pool(); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + + static constexpr auto len = 1 << 20; // 1 MiB + auto buf = std::make_unique(len); + std::memset(buf.get(), 0xcc, len); + + buffer::list bl1; + bl1.append(buf.get(), len); + co_await execute(oid, WriteOp{}.write(0, bl1), ioc); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + + WriteOp op; + op.assert_exists() + .remove(); + co_await execute(oid, std::move(op), ioc); + + co_await rm_selfmanaged_snaps(rados(), my_snaps, ioc); + + co_return; +} + +CORO_TEST_F(NeoRadosSelfManagedSnaps, OrderSnap, NeoRadosTest) { + static constexpr auto len = 128u; + std::vector my_snaps; + auto ioc = pool(); + const auto bl = filled_buffer_list(0xcc, len); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + co_await execute(oid, WriteOp{}.write(0, bl).ordersnap(), ioc); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + co_await execute(oid, WriteOp{}.write(0, bl).ordersnap(), ioc); + + my_snaps.pop_back(); + std::reverse(my_snaps.begin(), my_snaps.end()); + ioc.set_write_snap_context({{my_snaps[0], my_snaps}}); + std::reverse(my_snaps.begin(), my_snaps.end()); + + co_await expect_error_code(execute(oid, WriteOp() + .write(0, bl).ordersnap(), ioc), + osd_errc::old_snapc); + + co_await execute(oid, WriteOp{}.write(0, bl), ioc); + + co_return; +} + +CORO_TEST_F(NeoRadosSelfManagedSnaps, ReusePurgedSnap, NeoRadosTest) { + static constexpr auto len = 128u; + std::vector my_snaps; + auto ioc = pool(); + const auto bl = filled_buffer_list(0xcc, len); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + EXPECT_TRUE(rados().get_self_managed_snaps_mode(pool())); + co_await execute(oid, WriteOp{}.write(0, bl), ioc); + + co_await new_selfmanaged_snap(rados(), my_snaps, ioc); + std::cout << "Deleting snap " << my_snaps.back() << " in pool " + << pool_name() << "." << std::endl; + co_await rados().delete_selfmanaged_snap(ioc.get_pool(), my_snaps.back(), + asio::use_awaitable); + std::cout << "Waiting for snaps to purge." << std::endl; + co_await wait_for(15s); + std::reverse(my_snaps.begin(), my_snaps.end()); + ioc.set_write_snap_context({{my_snaps[0], my_snaps}}); + std::reverse(my_snaps.begin(), my_snaps.end()); + + co_await execute(oid, WriteOp() + .write(0, filled_buffer_list(0xdd, len))); + + + co_return; +} diff --git a/src/test/neorados/watch_notify.cc b/src/test/neorados/watch_notify.cc new file mode 100644 index 000000000000..284b9436edc7 --- /dev/null +++ b/src/test/neorados/watch_notify.cc @@ -0,0 +1,168 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab +/* + * Ceph - scalable distributed file system + * + * Copyright (C) 2023 IBM + * + * See file COPYING for license information. + * + */ + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include "include/neorados/RADOS.hpp" +#include "include/buffer.h" + +#include "test/neorados/common_tests.h" + +#include "gtest/gtest.h" + +using std::uint64_t; + +namespace asio = boost::asio; +namespace buffer = ceph::buffer; +namespace container = boost::container; +namespace sys = boost::system; + +using namespace std::literals; + +using neorados::ReadOp; +using neorados::WriteOp; + +using std::uint64_t; + +class NeoRadosWatchNotifyTest : public NeoRadosTest { +protected: + buffer::list notify_bl; + container::flat_set notify_cookies; + const std::string notify_oid = "foo"s; + sys::error_code notify_err; + ceph::timespan notify_sleep = 0s; + + asio::awaitable handle_notify(uint64_t notify_id, uint64_t cookie, + uint64_t notifier_gid, buffer::list&& bl) { + std::cout << __func__ << " cookie " << cookie << " notify_id " << notify_id + << " notifier_gid " << notifier_gid << std::endl; + notify_bl = std::move(bl); + notify_cookies.insert(cookie); + if (notify_sleep > 0s) { + std::cout << "Waiting for " << notify_sleep << std::endl; + co_await wait_for(notify_sleep); + } + co_await rados().notify_ack(notify_oid, pool(), notify_id, cookie, + to_buffer_list("reply"sv), asio::use_awaitable); + } + + asio::awaitable handle_error(sys::error_code ec, uint64_t cookie) { + std::cout << __func__ << " cookie " << cookie + << " err " << ec.message() << std::endl; + ceph_assert(cookie > 1000); + co_await rados().unwatch(cookie, pool(), asio::use_awaitable); + notify_cookies.erase(cookie); + notify_err = ec; + try { + auto watchcookie + = co_await rados().watch(notify_oid, pool(), std::nullopt, + std::ref(*this), asio::use_awaitable); + notify_cookies.insert(watchcookie); + } catch (const sys::system_error& e) { + std::cout << "reconnect error: " << e.what() << std::endl; + } + } + +public: + void operator ()(sys::error_code ec, uint64_t notify_id, uint64_t cookie, + uint64_t notifier_id, buffer::list&& bl) { + asio::co_spawn( + asio_context, + [](NeoRadosWatchNotifyTest* t, sys::error_code ec, uint64_t notify_id, + uint64_t cookie, uint64_t notifier_id, buffer::list bl) + -> asio::awaitable { + if (ec) { + co_await t->handle_error(ec, cookie); + } else { + co_await t->handle_notify(notify_id, cookie, notifier_id, + std::move(bl)); + } + co_return; + }(this, ec, notify_id, cookie, notifier_id, std::move(bl)), + [](std::exception_ptr e) { + if (e) std::rethrow_exception(e); + }); + } +}; + +CORO_TEST_F(NeoRadosWatchNotify, WatchNotify, NeoRadosWatchNotifyTest) { + co_await create_obj(notify_oid); + auto handle = co_await rados().watch(notify_oid, pool(), std::nullopt, + std::ref(*this), + asio::use_awaitable); + EXPECT_TRUE(rados().check_watch(handle)); + std::vector watchers; + co_await execute(notify_oid, ReadOp{}.list_watchers(&watchers)); + EXPECT_EQ(1u, watchers.size()); + auto reply = co_await rados().notify(notify_oid, pool(), {}, {}, + asio::use_awaitable); + std::map, buffer::list> reply_map; + std::set> missed_set; + auto p = reply.cbegin(); + decode(reply_map, p); + decode(missed_set, p); + EXPECT_EQ(1u, notify_cookies.size()); + EXPECT_EQ(1u, notify_cookies.count(handle)); + EXPECT_EQ(1u, reply_map.size()); + EXPECT_EQ(5u, reply_map.begin()->second.length()); + EXPECT_EQ(0, strncmp("reply", reply_map.begin()->second.c_str(), 5)); + EXPECT_EQ(0u, missed_set.size()); + EXPECT_TRUE(rados().check_watch(handle)); + co_await rados().unwatch(handle, pool(), asio::use_awaitable); + + co_return; +} + +CORO_TEST_F(NeoRadosWatchNotify, WatchNotifyTimeout, NeoRadosWatchNotifyTest) { + co_await create_obj(notify_oid); + auto handle = co_await rados().watch(notify_oid, pool(), std::nullopt, + std::ref(*this), + asio::use_awaitable); + EXPECT_TRUE(rados().check_watch(handle)); + std::vector watchers; + co_await execute(notify_oid, ReadOp{}.list_watchers(&watchers)); + EXPECT_EQ(1u, watchers.size()); + + notify_sleep = 3s; + + std::cout << "Trying..." << std::endl; + co_await expect_error_code(rados().notify(notify_oid, pool(), {}, 1s, + asio::use_awaitable), + sys::errc::timed_out); + std::cout << "Timed out." << std::endl; + + EXPECT_TRUE(rados().check_watch(handle)); + co_await rados().unwatch(handle, pool(), asio::use_awaitable); + + std::cout << "Flushing..." << std::endl; + co_await rados().flush_watch(asio::use_awaitable); + std::cout << "Flushed..." << std::endl; + + // Give time for notify_ack to fire before pool gets deleted. + co_await wait_for(notify_sleep); + + co_return; +}