#include <vector>
#include <boost/asio/awaitable.hpp>
+#include <boost/asio/bind_cancellation_slot.hpp>
#include <boost/asio/co_spawn.hpp>
+#include <boost/container/flat_set.hpp>
+#include <boost/container/flat_map.hpp>
#include <boost/system/system_error.hpp>
#include "include/fs_types.h"
#include "neorados/cls/fifo.h"
#include "neorados/cls/log.h"
+#include "neorados/cls/sem_set.h"
#include "rgw_asio_thread.h"
#include "rgw_bucket.h"
prefix(get_prefix()),
changes(cct->_conf->rgw_data_log_changes_size) {}
+RGWDataChangesLog::RGWDataChangesLog(CephContext* cct, bool log_data,
+ neorados::RADOS* rados)
+ : cct(cct), rados(rados), log_data(log_data),
+ num_shards(cct->_conf->rgw_data_log_num_shards),
+ prefix(get_prefix()), changes(cct->_conf->rgw_data_log_changes_size) {}
+
+
void DataLogBackends::handle_init(entries_t e) {
std::unique_lock l(m);
for (const auto& [gen_id, gen] : e) {
}
-int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* _zone,
+int RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
+ const RGWZone* zone,
const RGWZoneParams& zoneparams,
- rgw::sal::RadosStore* _store)
+ rgw::sal::RadosStore* store)
{
- zone = _zone;
- store = _store;
- ceph_assert(zone);
- auto defbacking = to_log_type(
- cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
- // Should be guaranteed by `set_enum_allowed`
- ceph_assert(defbacking);
- auto log_pool = zoneparams.log_pool;
+ log_data = zone->log_data;
+ rados = &store->get_neorados();
try {
- std::exception_ptr eptr;
- std::tie(eptr, loc) =
- asio::co_spawn(store->get_io_context(),
- rgw::init_iocontext(dpp, store->get_neorados(),
- log_pool, rgw::create,
- asio::use_awaitable),
- async::use_blocked);
+ // Blocking in startup code, not ideal, but won't hurt anything.
+ std::exception_ptr eptr
+ = asio::co_spawn(store->get_io_context(),
+ start(dpp, zoneparams.log_pool),
+ async::use_blocked);
if (eptr) {
std::rethrow_exception(eptr);
}
} catch (const sys::system_error& e) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
- << ": Failed to initialized ioctx: " << e.what()
- << ", pool=" << log_pool << dendl;
+ << ": Failed to start datalog: " << e.what()
+ << dendl;
return ceph::from_error_code(e.code());
+ } catch (const std::exception& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+ << ": Failed to start datalog: " << e.what()
+ << dendl;
+ return -EIO;
+ }
+ return 0;
+}
+
+asio::awaitable<void>
+RGWDataChangesLog::start(const DoutPrefixProvider *dpp,
+ const rgw_pool& log_pool,
+ bool recovery,
+ bool watch,
+ bool renew)
+{
+ if (!log_data) {
+ co_return;
+ }
+ auto defbacking = to_log_type(
+ cct->_conf.get_val<std::string>("rgw_default_data_log_backing"));
+ // Should be guaranteed by `set_enum_allowed`
+ ceph_assert(defbacking);
+ try {
+ loc = co_await rgw::init_iocontext(dpp, *rados, log_pool,
+ rgw::create, asio::use_awaitable);
} catch (const std::exception& e) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": Failed to initialized ioctx: " << e.what()
<< ", pool=" << log_pool << dendl;
- return -EIO;
+ throw;
}
- // Blocking in startup code, not ideal, but won't hurt anything.
try {
- std::exception_ptr eptr;
- std::tie(eptr, bes) =
- asio::co_spawn(
- store->get_io_context().get_executor(),
- logback_generations::init<DataLogBackends>(
- dpp, store->get_neorados(), metadata_log_oid(), loc,
- [this](uint64_t gen_id, int shard) {
- return get_oid(gen_id, shard);
- }, num_shards, *defbacking, *this),
- async::use_blocked);
- if (eptr) {
- std::rethrow_exception(eptr);
- }
- } catch (const sys::system_error& e) {
- lderr(cct) << __PRETTY_FUNCTION__
- << ": Error initializing backends: "
- << e.what() << dendl;
- return ceph::from_error_code(e.code());
+ bes = co_await logback_generations::init<DataLogBackends>(
+ dpp, *rados, metadata_log_oid(), loc,
+ [this](uint64_t gen_id, int shard) {
+ return get_oid(gen_id, shard);
+ }, num_shards, *defbacking, *this);
} catch (const std::exception& e) {
ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
<< ": Error initializing backends: " << e.what()
<< dendl;
- return -EIO;
+ throw;
}
- asio::co_spawn(store->get_io_context().get_executor(),
- renew_run(), asio::detached);
- return 0;
+ if (renew) {
+ asio::co_spawn(co_await asio::this_coro::executor,
+ renew_run(renew_signal),
+ asio::bind_cancellation_slot(renew_signal->slot(),
+ asio::detached));
+ }
+ if (watch) {
+ // Establish watch here so we won't be 'started up' until we're watching.
+ const auto oid = get_sem_set_oid(0);
+ auto established = co_await establish_watch(dpp, oid);
+ if (!established) {
+ throw sys::system_error{ENOTCONN, sys::generic_category(),
+ "Unable to establish recovery watch!"};
+ }
+ asio::co_spawn(co_await asio::this_coro::executor,
+ watch_loop(watch_signal),
+ asio::bind_cancellation_slot(watch_signal->slot(),
+ asio::detached));
+ }
+ if (recovery) {
+ // Recovery can run concurrent with normal operation, so we don't
+ // have to block startup while we do all that I/O.
+ asio::co_spawn(co_await asio::this_coro::executor,
+ recover(dpp, recovery_signal),
+ asio::bind_cancellation_slot(recovery_signal->slot(),
+ asio::detached));
+ }
+ co_return;
+}
+
+asio::awaitable<bool>
+RGWDataChangesLog::establish_watch(const DoutPrefixProvider* dpp,
+ std::string_view oid) {
+ const auto queue_depth = num_shards * 128;
+ try {
+ co_await rados->execute(oid, loc, neorados::WriteOp{}.create(false),
+ asio::use_awaitable);
+ watchcookie = co_await rados->watch(oid, loc, asio::use_awaitable,
+ std::nullopt, queue_depth);
+ } catch (const std::exception& e) {
+ ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
+ << ": Unable to start watch! Error: "
+ << e.what() << dendl;
+ watchcookie = 0;
+ }
+
+ if (watchcookie == 0) {
+ // Dump our current working set.
+ co_await renew_entries(dpp);
+ }
+
+ co_return watchcookie != 0;
+}
+
+struct recovery_check {
+ uint64_t shard = 0;
+
+ recovery_check() = default;
+
+ recovery_check(uint64_t shard) : shard(shard) {}
+
+ void encode(buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(shard, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(shard, bl);
+ DECODE_FINISH(bl);
+ }
+
+ operator uint64_t() {
+ return shard;
+ }
+};
+WRITE_CLASS_ENCODER(recovery_check);
+
+
+struct recovery_reply {
+ std::unordered_set<std::string> reply_set;
+
+ recovery_reply() = default;
+
+ recovery_reply(std::unordered_set<std::string> reply_set)
+ : reply_set(std::move(reply_set)) {}
+
+ void encode(buffer::list& bl) const {
+ ENCODE_START(1, 1, bl);
+ encode(reply_set, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(buffer::list::const_iterator& bl) {
+ DECODE_START(1, bl);
+ decode(reply_set, bl);
+ DECODE_FINISH(bl);
+ }
+
+ operator std::unordered_set<std::string>&() {
+ return reply_set;
+ }
+};
+WRITE_CLASS_ENCODER(recovery_reply);
+
+
+asio::awaitable<void>
+RGWDataChangesLog::process_notification(const DoutPrefixProvider* dpp,
+ std::string_view oid) {
+ auto notification = co_await rados->next_notification(watchcookie,
+ asio::use_awaitable);
+ int shard = 0;
+ // Don't send a reply if we get a bogus notification, we don't
+ // want recovery to delete semaphores improperly.
+ try {
+ recovery_check rc;
+ decode(rc, notification.bl);
+ shard = rc;
+ } catch (const std::exception& e) {
+ ldpp_dout(dpp, 2) << "Got malformed notification!" << dendl;
+ co_return;
+ }
+ if (shard >= num_shards) {
+ ldpp_dout(dpp, 2) << "Got unknown shard " << shard << dendl;
+ co_return;
+ }
+ recovery_reply reply;
+ std::unique_lock l(lock);
+ for (const auto& bg : cur_cycle) {
+ if (choose_oid(bg.shard) == shard) {
+ reply.reply_set.insert(bg.get_key());
+ }
+ }
+ std::copy(semaphores[shard].begin(),
+ semaphores[shard].end(),
+ std::inserter(reply.reply_set, reply.reply_set.end()));
+ l.unlock();
+ buffer::list replybl;
+ encode(reply, replybl);
+ try {
+ co_await rados->notify_ack(oid, loc, notification.notify_id, watchcookie,
+ std::move(replybl), asio::use_awaitable);
+ } catch (const std::exception& e) {
+ ldpp_dout(dpp, 10) << __PRETTY_FUNCTION__
+ << ": Failed ack. Whatever server is in "
+ << "recovery won't decrement semaphores: "
+ << e.what() << dendl;
+ }
+}
+
+asio::awaitable<void> RGWDataChangesLog::watch_loop(decltype(watch_signal)) {
+ const DoutPrefix dp(cct, dout_subsys, "rgw data changes log: ");
+ const auto oid = get_sem_set_oid(0);
+ bool need_rewatch = false;
+
+ while (!going_down()) {
+ try {
+ co_await process_notification(&dp, oid);
+ } catch (const sys::system_error& e) {
+ if (e.code() == neorados::errc::notification_overflow) {
+ ldpp_dout(&dp, 10) << __PRETTY_FUNCTION__
+ << ": Notification overflow. Whatever server is in "
+ << "recovery won't decrement semaphores." << dendl;
+ continue;
+ }
+ if (going_down()) {
+ break;
+ } else {
+ need_rewatch = true;
+ }
+ }
+ if (need_rewatch) {
+ try {
+ if (watchcookie) {
+ auto wc = watchcookie;
+ watchcookie = 0;
+ co_await rados->unwatch(wc, loc, asio::use_awaitable);
+ }
+ } catch (const std::exception& e) {
+ // Watch may not exist, don't care.
+ }
+ bool rewatched = false;
+ ldpp_dout(&dp, 10) << __PRETTY_FUNCTION__
+ << ": Trying to re-establish watch" << dendl;
+
+ rewatched = co_await establish_watch(&dp, oid);
+ while (!rewatched) {
+ boost::asio::steady_timer t(co_await asio::this_coro::executor, 500ms);
+ co_await t.async_wait(asio::use_awaitable);
+ ldpp_dout(&dp, 10) << __PRETTY_FUNCTION__
+ << ": Trying to re-establish watch" << dendl;
+ rewatched = co_await establish_watch(&dp, oid);
+ }
+ }
+ }
}
int RGWDataChangesLog::choose_oid(const rgw_bucket_shard& bs) {
asio::awaitable<void>
RGWDataChangesLog::renew_entries(const DoutPrefixProvider* dpp)
{
- if (!zone->log_data)
+ if (!log_data) {
co_return;
+ }
- /* we can't keep the bucket name as part of the cls::log::entry, and we need
- * it later, so we keep two lists under the map */
+ /* we can't keep the bucket name as part of the datalog entry, and
+ * we need it later, so we keep two lists under the map */
bc::flat_map<int, std::pair<std::vector<BucketGen>,
RGWDataChangesBE::entries>> m;
std::unique_lock l(lock);
decltype(cur_cycle) entries;
entries.swap(cur_cycle);
+ for (const auto& [bs, gen] : entries) {
+ unsigned index = choose_oid(bs);
+ semaphores[index].insert(BucketGen{bs, gen}.get_key());
+ }
l.unlock();
auto ut = real_clock::now();
auto be = bes->head();
+
for (const auto& [bs, gen] : entries) {
auto index = choose_oid(bs);
be->prepare(ut, change.key, std::move(bl), m[index].second);
}
+ auto push_failed = false;
for (auto& [index, p] : m) {
auto& [buckets, entries] = p;
auto now = real_clock::now();
- co_await be->push(dpp, index, std::move(entries));
+ // Failure on push isn't fatal.
+ try {
+ co_await be->push(dpp, index, std::move(entries));
+ } catch (const std::exception& e) {
+ push_failed = true;
+ ldpp_dout(dpp, 5) << "RGWDataChangesLog::renew_entries(): Backend push failed "
+ << "with exception: " << e.what() << dendl;
+ }
+
auto expiration = now;
expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
for (auto& [bs, gen] : buckets) {
update_renewed(bs, gen, expiration);
}
}
+ if (push_failed) {
+ co_return;
+ }
+
+ namespace sem_set = neorados::cls::sem_set;
+ // If we didn't error in pushing, we can now decrement the semaphores
+ l.lock();
+ for (auto index = 0u; index < unsigned(num_shards); ++index) {
+ using neorados::WriteOp;
+ auto& keys = semaphores[index];
+ while (!keys.empty()) {
+ bc::flat_set<std::string> batch;
+ // Can't use a move iterator here, since the keys have to stay
+ // until they're safely on the OSD to avoid the risk of
+ // double-decrement from recovery.
+ auto to_copy = std::min(sem_max_keys, keys.size());
+ std::copy_n(keys.begin(), to_copy,
+ std::inserter(batch, batch.end()));
+ auto op = WriteOp{}.exec(sem_set::decrement(std::move(batch)));
+ l.unlock();
+ co_await rados->execute(get_sem_set_oid(index), loc, std::move(op),
+ asio::use_awaitable);
+ l.lock();
+ auto iter = keys.cbegin();
+ std::advance(iter, to_copy);
+ keys.erase(keys.cbegin(), iter);
+ }
+ }
co_return;
}
{
ChangeStatusPtr status;
if (!changes.find({bs, gen}, status)) {
- status = std::make_shared<ChangeStatus>(store->get_io_context()
- .get_executor());
+ status = std::make_shared<ChangeStatus>(rados->get_executor());
changes.add({bs, gen}, status);
}
return status;
}
-void RGWDataChangesLog::register_renew(const rgw_bucket_shard& bs,
- const rgw::bucket_log_layout_generation& gen)
+bool RGWDataChangesLog::register_renew(BucketGen bg)
{
std::scoped_lock l{lock};
- cur_cycle.insert({bs, gen.gen});
+ return cur_cycle.insert(bg).second;
}
void RGWDataChangesLog::update_renewed(const rgw_bucket_shard& bs,
optional_yield y(yc);
return bucket_filter(bucket, y, dpp);
}, asio::use_awaitable);
- co_return true;
}
std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
fmt::format("{}.{}", prefix, i));
}
+std::string RGWDataChangesLog::get_sem_set_oid(int i) const {
+ return fmt::format("_sem_set{}.{}", prefix, i);
+}
+
asio::awaitable<void>
RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
const RGWBucketInfo& bucket_info,
const rgw::bucket_log_layout_generation& gen,
int shard_id)
{
- if (!zone->log_data) {
+ if (!log_data) {
co_return;
}
rgw_bucket_shard bs(bucket, shard_id);
int index = choose_oid(bs);
+ if (!(watchcookie && rados->check_watch(watchcookie))) {
+ auto now = real_clock::now();
+ ldpp_dout(dpp, 2) << "RGWDataChangesLog::add_entry(): "
+ << "Bypassing window optimization and pushing directly: "
+ << "bucket.name=" << bucket.name
+ << " shard_id=" << shard_id << " now="
+ << now << " cur_expiration=" << dendl;
+
+ buffer::list bl;
+ rgw_data_change change;
+ change.entity_type = ENTITY_TYPE_BUCKET;
+ change.key = bs.get_key();
+ change.timestamp = now;
+ change.gen = gen.gen;
+ encode(change, bl);
+
+ auto be = bes->head();
+ // Failure on push is fatal if we're bypassing semaphores.
+ co_await be->push(dpp, index, now, change.key, std::move(bl));
+ co_return;
+ }
mark_modified(index, bs, gen.gen);
<< " shard_id=" << shard_id << " now=" << now
<< " cur_expiration=" << status->cur_expiration << dendl;
+
if (now < status->cur_expiration) {
/* no need to send, recently completed */
sl.unlock();
- register_renew(bs, gen);
+ auto bg = BucketGen{bs, gen.gen};
+ auto key = bg.get_key();
+ auto need_sem_set = register_renew(std::move(bg));
+ if (need_sem_set) {
+ using neorados::WriteOp;
+ using neorados::cls::sem_set::increment;
+ co_await rados->execute(get_sem_set_oid(index), loc,
+ WriteOp{}.exec(increment(std::move(key))),
+ asio::use_awaitable);
+ }
co_return;
}
if (status->pending) {
co_await status->cond.async_wait(sl, asio::use_awaitable);
sl.unlock();
- register_renew(bs, gen);
co_return;
}
ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
auto be = bes->head();
- co_await be->push(dpp, index, now, change.key, std::move(bl));
+ // Failure on push isn't fatal.
+ try {
+ co_await be->push(dpp, index, now, change.key, std::move(bl));
+ } catch (const std::exception& e) {
+ ldpp_dout(dpp, 5) << "RGWDataChangesLog::add_entry(): Backend push failed "
+ << "with exception: " << e.what() << dendl;
+ }
+
now = real_clock::now();
}
} else {
maybe_warn_about_blocking(dpp);
- eptr = asio::co_spawn(store->get_io_context().get_executor(),
+ eptr = asio::co_spawn(rados->get_executor(),
add_entry(dpp, bucket_info, gen, shard_id),
async::use_blocked);
}
}
} else {
maybe_warn_about_blocking(dpp);
- std::tie(eptr, out) = asio::co_spawn(store->get_io_context().get_executor(),
+ std::tie(eptr, out) = asio::co_spawn(rados->get_executor(),
bes->list(dpp, shard, entries,
std::string{marker}),
async::use_blocked);
} else {
maybe_warn_about_blocking(dpp);
std::tie(eptr, out) =
- asio::co_spawn(store->get_io_context().get_executor(),
- list_entries(dpp, max_entries,
- RGWDataChangesLogMarker{marker}),
- async::use_blocked);
+ asio::co_spawn(rados->get_executor(),
+ list_entries(dpp, max_entries,
+ RGWDataChangesLogMarker{marker}),
+ async::use_blocked);
}
if (eptr) {
return ceph::from_exception(eptr);
}
} else {
maybe_warn_about_blocking(dpp);
- std::tie(eptr, *info) = asio::co_spawn(store->get_io_context().get_executor(),
+ std::tie(eptr, *info) = asio::co_spawn(rados->get_executor(),
be->get_info(dpp, shard_id),
async::use_blocked);
}
}
} else {
maybe_warn_about_blocking(dpp);
- eptr = asio::co_spawn(store->get_io_context().get_executor(),
+ eptr = asio::co_spawn(rados->get_executor(),
bes->trim_entries(dpp, shard_id, marker),
async::use_blocked);
}
int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id,
std::string_view marker,
librados::AioCompletion* c) {
- asio::co_spawn(store->get_io_context().get_executor(),
+ asio::co_spawn(rados->get_executor(),
bes->trim_entries(dpp, shard_id, marker),
c);
return 0;
co_return;
}
-
bool RGWDataChangesLog::going_down() const
{
return down_flag;
}
void RGWDataChangesLog::shutdown() {
+ if (down_flag) {
+ return;
+ }
down_flag = true;
renew_stop();
+ // Revisit this later
+ if (renew_signal)
+ renew_signal->emit(asio::cancellation_type::terminal);
+ if (recovery_signal)
+ recovery_signal->emit(asio::cancellation_type::terminal);
+ if (recovery_signal)
+ recovery_signal->emit(asio::cancellation_type::terminal);
}
-asio::awaitable<void> RGWDataChangesLog::renew_run() {
+asio::awaitable<void> RGWDataChangesLog::renew_run(decltype(renew_signal)) {
static constexpr auto runs_per_prune = 150;
auto run = 0;
renew_timer.emplace(co_await asio::this_coro::executor);
"~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
}
-int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y) {
+int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp,
+ log_type type,optional_yield y) {
std::exception_ptr eptr;
if (y) {
auto& yield = y.get_yield_context();
}
} else {
maybe_warn_about_blocking(dpp);
- eptr = asio::co_spawn(store->get_io_context().get_executor(),
+ eptr = asio::co_spawn(rados->get_executor(),
bes->new_backing(dpp, type),
async::use_blocked);
}
} else {
maybe_warn_about_blocking(dpp);
- eptr = asio::co_spawn(store->get_io_context().get_executor(),
+ eptr = asio::co_spawn(rados->get_executor(),
bes->trim_generations(dpp, through),
async::use_blocked);
}
return ceph::from_exception(eptr);
}
+asio::awaitable<void>
+RGWDataChangesLog::read_all_sems(int index,
+ bc::flat_map<std::string, uint64_t>* out)
+{
+ namespace sem_set = neorados::cls::sem_set;
+ std::string cursor;
+ do {
+ try {
+ co_await rados->execute(
+ get_sem_set_oid(index), loc,
+ neorados::ReadOp{}.exec(sem_set::list(sem_max_keys, cursor, out,
+ &cursor)),
+ nullptr, asio::use_awaitable);
+ } catch (const sys::system_error& e) {
+ if (e.code() == sys::errc::no_such_file_or_directory) {
+ break;
+ } else {
+ throw;
+ }
+ }
+ } while (!cursor.empty());
+ co_return;
+}
+
+asio::awaitable<bool>
+RGWDataChangesLog::synthesize_entries(
+ const DoutPrefixProvider* dpp,
+ int index,
+ const bc::flat_map<std::string, uint64_t>& semcount)
+{
+ const auto timestamp = real_clock::now();
+ auto be = bes->head();
+ auto push_failed = false;
+
+ RGWDataChangesBE::entries batch;
+ for (const auto& [key, sem] : semcount) {
+ try {
+ BucketGen bg{key};
+ rgw_data_change change;
+ buffer::list bl;
+ change.entity_type = ENTITY_TYPE_BUCKET;
+ change.key = bg.shard.get_key();
+ change.timestamp = timestamp;
+ change.gen = bg.gen;
+ encode(change, bl);
+ be->prepare(timestamp, change.key, std::move(bl), batch);
+ } catch (const sys::error_code& e) {
+ push_failed = true;
+ ldpp_dout(dpp, -1) << "RGWDataChangesLog::synthesize_entries(): Unable to "
+ << "parse Bucketgen key: " << key << "Got exception: "
+ << e.what() << dendl;
+ }
+ }
+ try {
+ co_await be->push(dpp, index, std::move(batch));
+ } catch (const std::exception& e) {
+ push_failed = true;
+ ldpp_dout(dpp, 5) << "RGWDataChangesLog::synthesize_entries(): Backend push "
+ << "failed with exception: " << e.what() << dendl;
+ }
+ co_return !push_failed;
+}
+
+asio::awaitable<bool>
+RGWDataChangesLog::gather_working_sets(
+ const DoutPrefixProvider* dpp,
+ int index,
+ bc::flat_map<std::string, uint64_t>& semcount)
+{
+ buffer::list bl;
+ recovery_check rc = index;
+ encode(rc, bl);
+ auto [reply_map, missed_set] = co_await rados->notify(
+ get_sem_set_oid(0), loc, bl, 60s, asio::use_awaitable);
+ // If we didn't get an answer from someone, don't decrement anything.
+ if (!missed_set.empty()) {
+ ldpp_dout(dpp, 5) << "RGWDataChangesLog::gather_working_sets(): Missed responses: "
+ << missed_set << dendl;
+ co_return false;
+ }
+ for (const auto& [source, reply] : reply_map) {
+ recovery_reply keys;
+ try {
+ decode(keys, reply);
+ } catch (const std::exception& e) {
+ ldpp_dout(dpp, -1)
+ << "RGWDataChangesLog::gather_working_sets(): Failed decoding reply from: "
+ << source << dendl;
+ co_return false;
+ }
+ for (const auto& key : keys.reply_set) {
+ auto iter = semcount.find(key);
+ if (iter == semcount.end()) {
+ continue;
+ }
+ if (iter->second == 1) {
+ semcount.erase(iter);
+ } else {
+ --(iter->second);
+ }
+ }
+ }
+ co_return true;
+}
+
+asio::awaitable<void>
+RGWDataChangesLog::decrement_sems(
+ int index,
+ bc::flat_map<std::string, uint64_t>&& semcount)
+{
+ namespace sem_set = neorados::cls::sem_set;
+ while (!semcount.empty()) {
+ bc::flat_set<std::string> batch;
+ for (auto j = 0u; j < sem_max_keys && !semcount.empty(); ++j) {
+ auto iter = std::begin(semcount);
+ batch.insert(iter->first);
+ semcount.erase(std::move(iter));
+ }
+ co_await rados->execute(
+ get_sem_set_oid(index), loc, neorados::WriteOp{}.exec(
+ sem_set::decrement(std::move(batch))),
+ asio::use_awaitable);
+ }
+}
+
+asio::awaitable<void>
+RGWDataChangesLog::recover(const DoutPrefixProvider* dpp,
+ decltype(recovery_signal)) {
+ using neorados::WriteOp;
+ bc::flat_map<uint64_t, bc::flat_map<std::string, uint64_t>> semcounts;
+ for (int index = 0; index < num_shards; ++index) {
+ // Gather entries in the shard
+ auto& semcount = semcounts[index];
+ co_await read_all_sems(index, &semcount);
+ // If we have none, no point doing the rest
+ if (semcount.empty()) {
+ continue;
+ }
+ // Synthesize entries to push
+ auto pushed = co_await synthesize_entries(dpp, index, semcount);
+ if (!pushed) {
+ // If pushing failed, don't decrement any semaphores
+ ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Pushing shard "
+ << index << " failed, skipping decrement" << dendl;
+ continue;
+ }
+
+ // Check with other running RGWs, make sure not to decrement
+ // anything they have in flight. This doesn't cause an issue for
+ // partial upgrades, since older versions won't be using the
+ // semaphores at all.
+ auto notified = co_await gather_working_sets(dpp, index, semcount);
+ if (!notified) {
+ ldpp_dout(dpp, 5) << "RGWDataChangesLog::recover(): Gathering "
+ << "working sets for shard " << index
+ << " failed, skipping decrement" << dendl;
+ continue;
+ }
+ co_await decrement_sems(index, std::move(semcount));
+ }
+}
+
void RGWDataChangesLogInfo::dump(Formatter *f) const
{
encode_json("marker", marker, f);