From: Adam Emerson Date: Tue, 2 Apr 2024 19:47:29 +0000 (-0400) Subject: rgw/multisite/datalog: Semaphores and Recovery X-Git-Tag: v20.3.0~169^2~12 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=f56420f6aa7c9d29dfb406c997c9672b639403a0;p=ceph.git rgw/multisite/datalog: Semaphores and Recovery Increment in add_entry, decrement in renew_entry, and recover on startup. Signed-off-by: Adam Emerson --- diff --git a/src/rgw/driver/rados/rgw_datalog.cc b/src/rgw/driver/rados/rgw_datalog.cc index 9a498666a0594..31ef9c350f428 100644 --- a/src/rgw/driver/rados/rgw_datalog.cc +++ b/src/rgw/driver/rados/rgw_datalog.cc @@ -8,7 +8,10 @@ #include #include +#include #include +#include +#include #include #include "include/fs_types.h" @@ -24,6 +27,7 @@ #include "neorados/cls/fifo.h" #include "neorados/cls/log.h" +#include "neorados/cls/sem_set.h" #include "rgw_asio_thread.h" #include "rgw_bucket.h" @@ -351,6 +355,13 @@ RGWDataChangesLog::RGWDataChangesLog(CephContext* cct) prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {} +RGWDataChangesLog::RGWDataChangesLog(CephContext* cct, bool log_data, + neorados::RADOS* rados) + : cct(cct), rados(rados), log_data(log_data), + num_shards(cct->_conf->rgw_data_log_num_shards), + prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {} + + void DataLogBackends::handle_init(entries_t e) { std::unique_lock l(m); for (const auto& [gen_id, gen] : e) { @@ -413,72 +424,270 @@ void DataLogBackends::handle_empty_to(uint64_t new_tail) { } -int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* _zone, +int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, + const RGWZone* zone, const RGWZoneParams& zoneparams, - rgw::sal::RadosStore* _store) + rgw::sal::RadosStore* store) { - zone = _zone; - store = _store; - ceph_assert(zone); - auto defbacking = to_log_type( - cct->_conf.get_val("rgw_default_data_log_backing")); - // Should be guaranteed by `set_enum_allowed` - ceph_assert(defbacking); - auto log_pool = zoneparams.log_pool; + log_data = zone->log_data; + rados = &store->get_neorados(); try { - std::exception_ptr eptr; - std::tie(eptr, loc) = - asio::co_spawn(store->get_io_context(), - rgw::init_iocontext(dpp, store->get_neorados(), - log_pool, rgw::create, - asio::use_awaitable), - async::use_blocked); + // Blocking in startup code, not ideal, but won't hurt anything. + std::exception_ptr eptr + = asio::co_spawn(store->get_io_context(), + start(dpp, zoneparams.log_pool), + async::use_blocked); if (eptr) { std::rethrow_exception(eptr); } } catch (const sys::system_error& e) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ - << ": Failed to initialized ioctx: " << e.what() - << ", pool=" << log_pool << dendl; + << ": Failed to start datalog: " << e.what() + << dendl; return ceph::from_error_code(e.code()); + } catch (const std::exception& e) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": Failed to start datalog: " << e.what() + << dendl; + return -EIO; + } + return 0; +} + +asio::awaitable +RGWDataChangesLog::start(const DoutPrefixProvider *dpp, + const rgw_pool& log_pool, + bool recovery, + bool watch, + bool renew) +{ + if (!log_data) { + co_return; + } + auto defbacking = to_log_type( + cct->_conf.get_val("rgw_default_data_log_backing")); + // Should be guaranteed by `set_enum_allowed` + ceph_assert(defbacking); + try { + loc = co_await rgw::init_iocontext(dpp, *rados, log_pool, + rgw::create, asio::use_awaitable); } catch (const std::exception& e) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ": Failed to initialized ioctx: " << e.what() << ", pool=" << log_pool << dendl; - return -EIO; + throw; } - // Blocking in startup code, not ideal, but won't hurt anything. try { - std::exception_ptr eptr; - std::tie(eptr, bes) = - asio::co_spawn( - store->get_io_context().get_executor(), - logback_generations::init( - dpp, store->get_neorados(), metadata_log_oid(), loc, - [this](uint64_t gen_id, int shard) { - return get_oid(gen_id, shard); - }, num_shards, *defbacking, *this), - async::use_blocked); - if (eptr) { - std::rethrow_exception(eptr); - } - } catch (const sys::system_error& e) { - lderr(cct) << __PRETTY_FUNCTION__ - << ": Error initializing backends: " - << e.what() << dendl; - return ceph::from_error_code(e.code()); + bes = co_await logback_generations::init( + dpp, *rados, metadata_log_oid(), loc, + [this](uint64_t gen_id, int shard) { + return get_oid(gen_id, shard); + }, num_shards, *defbacking, *this); } catch (const std::exception& e) { ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ << ": Error initializing backends: " << e.what() << dendl; - return -EIO; + throw; } - asio::co_spawn(store->get_io_context().get_executor(), - renew_run(), asio::detached); - return 0; + if (renew) { + asio::co_spawn(co_await asio::this_coro::executor, + renew_run(renew_signal), + asio::bind_cancellation_slot(renew_signal->slot(), + asio::detached)); + } + if (watch) { + // Establish watch here so we won't be 'started up' until we're watching. + const auto oid = get_sem_set_oid(0); + auto established = co_await establish_watch(dpp, oid); + if (!established) { + throw sys::system_error{ENOTCONN, sys::generic_category(), + "Unable to establish recovery watch!"}; + } + asio::co_spawn(co_await asio::this_coro::executor, + watch_loop(watch_signal), + asio::bind_cancellation_slot(watch_signal->slot(), + asio::detached)); + } + if (recovery) { + // Recovery can run concurrent with normal operation, so we don't + // have to block startup while we do all that I/O. + asio::co_spawn(co_await asio::this_coro::executor, + recover(dpp, recovery_signal), + asio::bind_cancellation_slot(recovery_signal->slot(), + asio::detached)); + } + co_return; +} + +asio::awaitable +RGWDataChangesLog::establish_watch(const DoutPrefixProvider* dpp, + std::string_view oid) { + const auto queue_depth = num_shards * 128; + try { + co_await rados->execute(oid, loc, neorados::WriteOp{}.create(false), + asio::use_awaitable); + watchcookie = co_await rados->watch(oid, loc, asio::use_awaitable, + std::nullopt, queue_depth); + } catch (const std::exception& e) { + ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__ + << ": Unable to start watch! Error: " + << e.what() << dendl; + watchcookie = 0; + } + + if (watchcookie == 0) { + // Dump our current working set. + co_await renew_entries(dpp); + } + + co_return watchcookie != 0; +} + +struct recovery_check { + uint64_t shard = 0; + + recovery_check() = default; + + recovery_check(uint64_t shard) : shard(shard) {} + + void encode(buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(shard, bl); + ENCODE_FINISH(bl); + } + + void decode(buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(shard, bl); + DECODE_FINISH(bl); + } + + operator uint64_t() { + return shard; + } +}; +WRITE_CLASS_ENCODER(recovery_check); + + +struct recovery_reply { + std::unordered_set reply_set; + + recovery_reply() = default; + + recovery_reply(std::unordered_set reply_set) + : reply_set(std::move(reply_set)) {} + + void encode(buffer::list& bl) const { + ENCODE_START(1, 1, bl); + encode(reply_set, bl); + ENCODE_FINISH(bl); + } + + void decode(buffer::list::const_iterator& bl) { + DECODE_START(1, bl); + decode(reply_set, bl); + DECODE_FINISH(bl); + } + + operator std::unordered_set&() { + return reply_set; + } +}; +WRITE_CLASS_ENCODER(recovery_reply); + + +asio::awaitable +RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp, + std::string_view oid) { + auto notification = co_await rados->next_notification(watchcookie, + asio::use_awaitable); + int shard = 0; + // Don't send a reply if we get a bogus notification, we don't + // want recovery to delete semaphores improperly. + try { + recovery_check rc; + decode(rc, notification.bl); + shard = rc; + } catch (const std::exception& e) { + ldpp_dout(dpp, 2) << "Got malformed notification!" << dendl; + co_return; + } + if (shard >= num_shards) { + ldpp_dout(dpp, 2) << "Got unknown shard " << shard << dendl; + co_return; + } + recovery_reply reply; + std::unique_lock l(lock); + for (const auto& bg : cur_cycle) { + if (choose_oid(bg.shard) == shard) { + reply.reply_set.insert(bg.get_key()); + } + } + std::copy(semaphores[shard].begin(), + semaphores[shard].end(), + std::inserter(reply.reply_set, reply.reply_set.end())); + l.unlock(); + buffer::list replybl; + encode(reply, replybl); + try { + co_await rados->notify_ack(oid, loc, notification.notify_id, watchcookie, + std::move(replybl), asio::use_awaitable); + } catch (const std::exception& e) { + ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__ + << ": Failed ack. Whatever server is in " + << "recovery won't decrement semaphores: " + << e.what() << dendl; + } +} + +asio::awaitable RGWDataChangesLog::watch_loop(decltype(watch_signal)) { + const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: "); + const auto oid = get_sem_set_oid(0); + bool need_rewatch = false; + + while (!going_down()) { + try { + co_await process_notification(&dp, oid); + } catch (const sys::system_error& e) { + if (e.code() == neorados::errc::notification_overflow) { + ldpp_dout(&dp, 10) << __PRETTY_FUNCTION__ + << ": Notification overflow. Whatever server is in " + << "recovery won't decrement semaphores." << dendl; + continue; + } + if (going_down()) { + break; + } else { + need_rewatch = true; + } + } + if (need_rewatch) { + try { + if (watchcookie) { + auto wc = watchcookie; + watchcookie = 0; + co_await rados->unwatch(wc, loc, asio::use_awaitable); + } + } catch (const std::exception& e) { + // Watch may not exist, don't care. + } + bool rewatched = false; + ldpp_dout(&dp, 10) << __PRETTY_FUNCTION__ + << ": Trying to re-establish watch" << dendl; + + rewatched = co_await establish_watch(&dp, oid); + while (!rewatched) { + boost::asio::steady_timer t(co_await asio::this_coro::executor, 500ms); + co_await t.async_wait(asio::use_awaitable); + ldpp_dout(&dp, 10) << __PRETTY_FUNCTION__ + << ": Trying to re-establish watch" << dendl; + rewatched = co_await establish_watch(&dp, oid); + } + } + } } int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) { @@ -492,21 +701,27 @@ int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) { asio::awaitable RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp) { - if (!zone->log_data) + if (!log_data) { co_return; + } - /* we can't keep the bucket name as part of the cls::log::entry, and we need - * it later, so we keep two lists under the map */ + /* we can't keep the bucket name as part of the datalog entry, and + * we need it later, so we keep two lists under the map */ bc::flat_map, RGWDataChangesBE::entries>> m; std::unique_lock l(lock); decltype(cur_cycle) entries; entries.swap(cur_cycle); + for (const auto& [bs, gen] : entries) { + unsigned index = choose_oid(bs); + semaphores[index].insert(BucketGen{bs, gen}.get_key()); + } l.unlock(); auto ut = real_clock::now(); auto be = bes->head(); + for (const auto& [bs, gen] : entries) { auto index = choose_oid(bs); @@ -522,17 +737,54 @@ RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp) be->prepare(ut, change.key, std::move(bl), m[index].second); } + auto push_failed = false; for (auto& [index, p] : m) { auto& [buckets, entries] = p; auto now = real_clock::now(); - co_await be->push(dpp, index, std::move(entries)); + // Failure on push isn't fatal. + try { + co_await be->push(dpp, index, std::move(entries)); + } catch (const std::exception& e) { + push_failed = true; + ldpp_dout(dpp, 5) << "RGWDataChangesLog::renew_entries(): Backend push failed " + << "with exception: " << e.what() << dendl; + } + auto expiration = now; expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window); for (auto& [bs, gen] : buckets) { update_renewed(bs, gen, expiration); } } + if (push_failed) { + co_return; + } + + namespace sem_set = neorados::cls::sem_set; + // If we didn't error in pushing, we can now decrement the semaphores + l.lock(); + for (auto index = 0u; index < unsigned(num_shards); ++index) { + using neorados::WriteOp; + auto& keys = semaphores[index]; + while (!keys.empty()) { + bc::flat_set batch; + // Can't use a move iterator here, since the keys have to stay + // until they're safely on the OSD to avoid the risk of + // double-decrement from recovery. + auto to_copy = std::min(sem_max_keys, keys.size()); + std::copy_n(keys.begin(), to_copy, + std::inserter(batch, batch.end())); + auto op = WriteOp{}.exec(sem_set::decrement(std::move(batch))); + l.unlock(); + co_await rados->execute(get_sem_set_oid(index), loc, std::move(op), + asio::use_awaitable); + l.lock(); + auto iter = keys.cbegin(); + std::advance(iter, to_copy); + keys.erase(keys.cbegin(), iter); + } + } co_return; } @@ -542,18 +794,16 @@ auto RGWDataChangesLog::_get_change(const rgw_bucket_shard& bs, { ChangeStatusPtr status; if (!changes.find({bs, gen}, status)) { - status = std::make_shared(store->get_io_context() - .get_executor()); + status = std::make_shared(rados->get_executor()); changes.add({bs, gen}, status); } return status; } -void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs, - const rgw::bucket_log_layout_generation& gen) +bool RGWDataChangesLog::register_renew(BucketGen bg) { std::scoped_lock l{lock}; - cur_cycle.insert({bs, gen.gen}); + return cur_cycle.insert(bg).second; } void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs, @@ -591,7 +841,6 @@ RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp, optional_yield y(yc); return bucket_filter(bucket, y, dpp); }, asio::use_awaitable); - co_return true; } std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const { @@ -600,13 +849,17 @@ std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const { fmt::format("{}.{}", prefix, i)); } +std::string RGWDataChangesLog::get_sem_set_oid(int i) const { + return fmt::format("_sem_set{}.{}", prefix, i); +} + asio::awaitable RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, const RGWBucketInfo& bucket_info, const rgw::bucket_log_layout_generation& gen, int shard_id) { - if (!zone->log_data) { + if (!log_data) { co_return; } @@ -623,6 +876,27 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, rgw_bucket_shard bs(bucket, shard_id); int index = choose_oid(bs); + if (!(watchcookie && rados->check_watch(watchcookie))) { + auto now = real_clock::now(); + ldpp_dout(dpp, 2) << "RGWDataChangesLog::add_entry(): " + << "Bypassing window optimization and pushing directly: " + << "bucket.name=" << bucket.name + << " shard_id=" << shard_id << " now=" + << now << " cur_expiration=" << dendl; + + buffer::list bl; + rgw_data_change change; + change.entity_type = ENTITY_TYPE_BUCKET; + change.key = bs.get_key(); + change.timestamp = now; + change.gen = gen.gen; + encode(change, bl); + + auto be = bes->head(); + // Failure on push is fatal if we're bypassing semaphores. + co_await be->push(dpp, index, now, change.key, std::move(bl)); + co_return; + } mark_modified(index, bs, gen.gen); @@ -639,17 +913,26 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl; + if (now < status->cur_expiration) { /* no need to send, recently completed */ sl.unlock(); - register_renew(bs, gen); + auto bg = BucketGen{bs, gen.gen}; + auto key = bg.get_key(); + auto need_sem_set = register_renew(std::move(bg)); + if (need_sem_set) { + using neorados::WriteOp; + using neorados::cls::sem_set::increment; + co_await rados->execute(get_sem_set_oid(index), loc, + WriteOp{}.exec(increment(std::move(key))), + asio::use_awaitable); + } co_return; } if (status->pending) { co_await status->cond.async_wait(sl, asio::use_awaitable); sl.unlock(); - register_renew(bs, gen); co_return; } @@ -675,7 +958,14 @@ RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl; auto be = bes->head(); - co_await be->push(dpp, index, now, change.key, std::move(bl)); + // Failure on push isn't fatal. + try { + co_await be->push(dpp, index, now, change.key, std::move(bl)); + } catch (const std::exception& e) { + ldpp_dout(dpp, 5) << "RGWDataChangesLog::add_entry(): Backend push failed " + << "with exception: " << e.what() << dendl; + } + now = real_clock::now(); @@ -708,7 +998,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp, } } else { maybe_warn_about_blocking(dpp); - eptr = asio::co_spawn(store->get_io_context().get_executor(), + eptr = asio::co_spawn(rados->get_executor(), add_entry(dpp, bucket_info, gen, shard_id), async::use_blocked); } @@ -808,7 +1098,7 @@ int RGWDataChangesLog::list_entries( } } else { maybe_warn_about_blocking(dpp); - std::tie(eptr, out) = asio::co_spawn(store->get_io_context().get_executor(), + std::tie(eptr, out) = asio::co_spawn(rados->get_executor(), bes->list(dpp, shard, entries, std::string{marker}), async::use_blocked); @@ -887,10 +1177,10 @@ int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,int max_entrie } else { maybe_warn_about_blocking(dpp); std::tie(eptr, out) = - asio::co_spawn(store->get_io_context().get_executor(), - list_entries(dpp, max_entries, - RGWDataChangesLogMarker{marker}), - async::use_blocked); + asio::co_spawn(rados->get_executor(), + list_entries(dpp, max_entries, + RGWDataChangesLogMarker{marker}), + async::use_blocked); } if (eptr) { return ceph::from_exception(eptr); @@ -923,7 +1213,7 @@ int RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id, } } else { maybe_warn_about_blocking(dpp); - std::tie(eptr, *info) = asio::co_spawn(store->get_io_context().get_executor(), + std::tie(eptr, *info) = asio::co_spawn(rados->get_executor(), be->get_info(dpp, shard_id), async::use_blocked); } @@ -977,7 +1267,7 @@ int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, } } else { maybe_warn_about_blocking(dpp); - eptr = asio::co_spawn(store->get_io_context().get_executor(), + eptr = asio::co_spawn(rados->get_executor(), bes->trim_entries(dpp, shard_id, marker), async::use_blocked); } @@ -987,7 +1277,7 @@ int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id, std::string_view marker, librados::AioCompletion* c) { - asio::co_spawn(store->get_io_context().get_executor(), + asio::co_spawn(rados->get_executor(), bes->trim_entries(dpp, shard_id, marker), c); return 0; @@ -1028,7 +1318,6 @@ asio::awaitable DataLogBackends::trim_generations( co_return; } - bool RGWDataChangesLog::going_down() const { return down_flag; @@ -1039,11 +1328,21 @@ RGWDataChangesLog::~RGWDataChangesLog() { } void RGWDataChangesLog::shutdown() { + if (down_flag) { + return; + } down_flag = true; renew_stop(); + // Revisit this later + if (renew_signal) + renew_signal->emit(asio::cancellation_type::terminal); + if (recovery_signal) + recovery_signal->emit(asio::cancellation_type::terminal); + if (recovery_signal) + recovery_signal->emit(asio::cancellation_type::terminal); } -asio::awaitable RGWDataChangesLog::renew_run() { +asio::awaitable RGWDataChangesLog::renew_run(decltype(renew_signal)) { static constexpr auto runs_per_prune = 150; auto run = 0; renew_timer.emplace(co_await asio::this_coro::executor); @@ -1128,7 +1427,8 @@ std::string RGWDataChangesLog::max_marker() const { "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~"); } -int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y) { +int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, + log_type type,optional_yield y) { std::exception_ptr eptr; if (y) { auto& yield = y.get_yield_context(); @@ -1141,7 +1441,7 @@ int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type typ } } else { maybe_warn_about_blocking(dpp); - eptr = asio::co_spawn(store->get_io_context().get_executor(), + eptr = asio::co_spawn(rados->get_executor(), bes->new_backing(dpp, type), async::use_blocked); } @@ -1164,13 +1464,175 @@ int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, } else { maybe_warn_about_blocking(dpp); - eptr = asio::co_spawn(store->get_io_context().get_executor(), + eptr = asio::co_spawn(rados->get_executor(), bes->trim_generations(dpp, through), async::use_blocked); } return ceph::from_exception(eptr); } +asio::awaitable +RGWDataChangesLog::read_all_sems(int index, + bc::flat_map* out) +{ + namespace sem_set = neorados::cls::sem_set; + std::string cursor; + do { + try { + co_await rados->execute( + get_sem_set_oid(index), loc, + neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, cursor, out, + &cursor)), + nullptr, asio::use_awaitable); + } catch (const sys::system_error& e) { + if (e.code() == sys::errc::no_such_file_or_directory) { + break; + } else { + throw; + } + } + } while (!cursor.empty()); + co_return; +} + +asio::awaitable +RGWDataChangesLog::synthesize_entries( + const DoutPrefixProvider* dpp, + int index, + const bc::flat_map& semcount) +{ + const auto timestamp = real_clock::now(); + auto be = bes->head(); + auto push_failed = false; + + RGWDataChangesBE::entries batch; + for (const auto& [key, sem] : semcount) { + try { + BucketGen bg{key}; + rgw_data_change change; + buffer::list bl; + change.entity_type = ENTITY_TYPE_BUCKET; + change.key = bg.shard.get_key(); + change.timestamp = timestamp; + change.gen = bg.gen; + encode(change, bl); + be->prepare(timestamp, change.key, std::move(bl), batch); + } catch (const sys::error_code& e) { + push_failed = true; + ldpp_dout(dpp, -1) << "RGWDataChangesLog::synthesize_entries(): Unable to " + << "parse Bucketgen key: " << key << "Got exception: " + << e.what() << dendl; + } + } + try { + co_await be->push(dpp, index, std::move(batch)); + } catch (const std::exception& e) { + push_failed = true; + ldpp_dout(dpp, 5) << "RGWDataChangesLog::synthesize_entries(): Backend push " + << "failed with exception: " << e.what() << dendl; + } + co_return !push_failed; +} + +asio::awaitable +RGWDataChangesLog::gather_working_sets( + const DoutPrefixProvider* dpp, + int index, + bc::flat_map& semcount) +{ + buffer::list bl; + recovery_check rc = index; + encode(rc, bl); + auto [reply_map, missed_set] = co_await rados->notify( + get_sem_set_oid(0), loc, bl, 60s, asio::use_awaitable); + // If we didn't get an answer from someone, don't decrement anything. + if (!missed_set.empty()) { + ldpp_dout(dpp, 5) << "RGWDataChangesLog::gather_working_sets(): Missed responses: " + << missed_set << dendl; + co_return false; + } + for (const auto& [source, reply] : reply_map) { + recovery_reply keys; + try { + decode(keys, reply); + } catch (const std::exception& e) { + ldpp_dout(dpp, -1) + << "RGWDataChangesLog::gather_working_sets(): Failed decoding reply from: " + << source << dendl; + co_return false; + } + for (const auto& key : keys.reply_set) { + auto iter = semcount.find(key); + if (iter == semcount.end()) { + continue; + } + if (iter->second == 1) { + semcount.erase(iter); + } else { + --(iter->second); + } + } + } + co_return true; +} + +asio::awaitable +RGWDataChangesLog::decrement_sems( + int index, + bc::flat_map&& semcount) +{ + namespace sem_set = neorados::cls::sem_set; + while (!semcount.empty()) { + bc::flat_set batch; + for (auto j = 0u; j < sem_max_keys && !semcount.empty(); ++j) { + auto iter = std::begin(semcount); + batch.insert(iter->first); + semcount.erase(std::move(iter)); + } + co_await rados->execute( + get_sem_set_oid(index), loc, neorados::WriteOp{}.exec( + sem_set::decrement(std::move(batch))), + asio::use_awaitable); + } +} + +asio::awaitable +RGWDataChangesLog::recover(const DoutPrefixProvider* dpp, + decltype(recovery_signal)) { + using neorados::WriteOp; + bc::flat_map> semcounts; + for (int index = 0; index < num_shards; ++index) { + // Gather entries in the shard + auto& semcount = semcounts[index]; + co_await read_all_sems(index, &semcount); + // If we have none, no point doing the rest + if (semcount.empty()) { + continue; + } + // Synthesize entries to push + auto pushed = co_await synthesize_entries(dpp, index, semcount); + if (!pushed) { + // If pushing failed, don't decrement any semaphores + ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard " + << index << " failed, skipping decrement" << dendl; + continue; + } + + // Check with other running RGWs, make sure not to decrement + // anything they have in flight. This doesn't cause an issue for + // partial upgrades, since older versions won't be using the + // semaphores at all. + auto notified = co_await gather_working_sets(dpp, index, semcount); + if (!notified) { + ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering " + << "working sets for shard " << index + << " failed, skipping decrement" << dendl; + continue; + } + co_await decrement_sems(index, std::move(semcount)); + } +} + void RGWDataChangesLogInfo::dump(Formatter *f) const { encode_json("marker", marker, f); diff --git a/src/rgw/driver/rados/rgw_datalog.h b/src/rgw/driver/rados/rgw_datalog.h index 550612844e311..a97e4f381a4d9 100644 --- a/src/rgw/driver/rados/rgw_datalog.h +++ b/src/rgw/driver/rados/rgw_datalog.h @@ -39,12 +39,15 @@ #include "common/lru_map.h" #include "cls/log/cls_log_types.h" +#include "neorados/cls/sem_set.h" #include "rgw_basic_types.h" #include "rgw_log_backing.h" #include "rgw_sync_policy.h" #include "rgw_trim_bilog.h" #include "rgw_zone.h" +#include "common/async/spawn_group.h" + namespace asio = boost::asio; namespace bc = boost::container; @@ -334,14 +337,22 @@ inline bool operator <(const BucketGen& l, const BucketGen& r) { } class RGWDataChangesLog { + friend class DataLogTest; friend DataLogBackends; CephContext *cct; - rgw::sal::RadosStore* store = nullptr; + neorados::RADOS* rados; neorados::IOContext loc; rgw::BucketChangeObserver *observer = nullptr; - const RGWZone* zone; + bool log_data = false; std::unique_ptr bes; + std::shared_ptr renew_signal = + std::make_shared(); + std::shared_ptr watch_signal = + std::make_shared(); + std::shared_ptr recovery_signal = + std::make_shared(); + const int num_shards; std::string get_prefix() { return "data_log"; } std::string metadata_log_oid() { @@ -370,18 +381,19 @@ class RGWDataChangesLog { using ChangeStatusPtr = std::shared_ptr; lru_map changes; + const uint64_t sem_max_keys = neorados::cls::sem_set::max_keys; bc::flat_set cur_cycle; + std::vector> semaphores{unsigned(num_shards)}; ChangeStatusPtr _get_change(const rgw_bucket_shard& bs, uint64_t gen); - void register_renew(const rgw_bucket_shard& bs, - const rgw::bucket_log_layout_generation& gen); + bool register_renew(BucketGen bg); void update_renewed(const rgw_bucket_shard& bs, uint64_t gen, ceph::real_time expiration); std::optional renew_timer; - asio::awaitable renew_run(); + asio::awaitable renew_run(decltype(renew_signal) renew_signal); void renew_stop(); std::function renew_entries(const DoutPrefixProvider *dpp); + uint64_t watchcookie = 0; + public: RGWDataChangesLog(CephContext* cct); + // For testing. + RGWDataChangesLog(CephContext* cct, bool log_data, + neorados::RADOS* rados); ~RGWDataChangesLog(); - int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, const RGWZoneParams& zoneparams, + asio::awaitable start(const DoutPrefixProvider* dpp, + const rgw_pool& log_pool, + // For testing + bool recovery = true, + bool watch = true, + bool renew = true); + + int start(const DoutPrefixProvider *dpp, const RGWZone* _zone, + const RGWZoneParams& zoneparams, rgw::sal::RadosStore* store); + asio::awaitable establish_watch(const DoutPrefixProvider* dpp, + std::string_view oid); + asio::awaitable process_notification(const DoutPrefixProvider* dpp, + std::string_view oid); + asio::awaitable watch_loop(decltype(watch_signal)); int choose_oid(const rgw_bucket_shard& bs); asio::awaitable add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, @@ -451,13 +481,29 @@ public: // a marker that compares greater than any other std::string max_marker() const; std::string get_oid(uint64_t gen_id, int shard_id) const; + std::string get_sem_set_oid(int shard_id) const; - int change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y); + int change_format(const DoutPrefixProvider *dpp, log_type type, + optional_yield y); int trim_generations(const DoutPrefixProvider *dpp, std::optional& through, optional_yield y); void shutdown(); + asio::awaitable read_all_sems(int index, + bc::flat_map* out); + asio::awaitable + synthesize_entries(const DoutPrefixProvider* dpp, int index, + const bc::flat_map& semcount); + asio::awaitable + gather_working_sets(const DoutPrefixProvider* dpp, + int index, + bc::flat_map& semcount); + asio::awaitable + decrement_sems(int index, + bc::flat_map&& semcount); + asio::awaitable recover(const DoutPrefixProvider* dpp, + decltype(recovery_signal)); }; class RGWDataChangesBE : public boost::intrusive_ref_counter {