asio::use_awaitable);
co_return;
}
- asio::awaitable<void> push(const DoutPrefixProvider *dpp, int index,
- ceph::real_time now, const std::string& key,
- buffer::list&& bl) override {
- co_await r.execute(
- oids[index], loc,
- neorados::WriteOp{}.exec(nlog::add(now, {}, key, std::move(bl))),
- asio::use_awaitable);
- co_return;
+ void push(const DoutPrefixProvider *dpp, int index,
+ ceph::real_time now, const std::string& key,
+ buffer::list&& bl, asio::yield_context y) override {
+ r.execute(oids[index], loc,
+ neorados::WriteOp{}.exec(nlog::add(now, {}, key, std::move(bl))),
+ y);
+ return;
}
asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
};
class RGWDataChangesFIFO final : public RGWDataChangesBE {
- using centries = std::vector<buffer::list>;
+ using centries = std::deque<buffer::list>;
tiny_vector<LazyFIFO> fifos;
public:
entries&& items) override {
co_return co_await fifos[index].push(dpp, std::get<centries>(items));
}
- asio::awaitable<void> push(const DoutPrefixProvider* dpp, int index,
- ceph::real_time, const std::string&,
- buffer::list&& bl) override {
- co_return co_await fifos[index].push(dpp, std::move(bl));
+ void push(const DoutPrefixProvider* dpp, int index,
+ ceph::real_time, const std::string&,
+ buffer::list&& bl, asio::yield_context y) override {
+ fifos[index].push(dpp, std::move(bl), y);
}
asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
std::string>>
return choose_oid(bs);
}
-asio::awaitable<bool>
-RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
- const rgw_bucket& bucket) const
+bool RGWDataChangesLog::filter_bucket(const DoutPrefixProvider *dpp,
+ const rgw_bucket& bucket,
+ asio::yield_context y) const
{
if (!bucket_filter) {
- co_return true;
+ return true;
}
- co_return co_await asio::spawn(
- co_await asio::this_coro::executor,
- [this, dpp, &bucket](asio::yield_context yc) {
- optional_yield y(yc);
- return bucket_filter(bucket, y, dpp);
- }, asio::use_awaitable);
+ return bucket_filter(bucket, y, dpp);
}
std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
const RGWBucketInfo& bucket_info,
const rgw::bucket_log_layout_generation& gen,
int shard_id)
+{
+ co_await asio::spawn(
+ co_await asio::this_coro::executor,
+ [this, dpp, &bucket_info, &gen, shard_id](asio::yield_context y) {
+ return add_entry(dpp, bucket_info, gen, shard_id, y);
+ }, asio::use_awaitable);
+ co_return;
+}
+
+
+void RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
+ const RGWBucketInfo& bucket_info,
+ const rgw::bucket_log_layout_generation& gen,
+ int shard_id, asio::yield_context y)
{
if (!log_data) {
- co_return;
+ return;
}
auto& bucket = bucket_info.bucket;
- if (!co_await filter_bucket(dpp, bucket)) {
- co_return;
+ if (!filter_bucket(dpp, bucket, y)) {
+ return;
}
if (observer) {
auto be = bes->head();
// Failure on push is fatal if we're bypassing semaphores.
- co_await be->push(dpp, index, now, change.key, std::move(bl));
- co_return;
+ be->push(dpp, index, now, change.key, std::move(bl), y);
+ return;
}
mark_modified(index, bs, gen.gen);
if (need_sem_set) {
using neorados::WriteOp;
using neorados::cls::sem_set::increment;
- co_await rados->execute(get_sem_set_oid(index), loc,
- WriteOp{}.exec(increment(std::move(key))),
- asio::use_awaitable);
+ rados->execute(get_sem_set_oid(index), loc,
+ WriteOp{}.exec(increment(std::move(key))), y);
}
- co_return;
+ return;
}
if (status->pending) {
- co_await status->cond.async_wait(sl, asio::use_awaitable);
+ status->cond.async_wait(sl, y);
sl.unlock();
- co_return;
+ return;
}
status->cond.notify(sl);
auto be = bes->head();
// Failure on push isn't fatal.
try {
- co_await be->push(dpp, index, now, change.key, std::move(bl));
+ be->push(dpp, index, now, change.key, std::move(bl), y);
} catch (const std::exception& e) {
ldpp_dout(dpp, 5) << "RGWDataChangesLog::add_entry(): Backend push failed "
<< "with exception: " << e.what() << dendl;
status->cond.notify(sl);
sl.unlock();
- co_return;
+ return;
}
int RGWDataChangesLog::add_entry(const DoutPrefixProvider* dpp,
{
std::exception_ptr eptr;
if (y) {
- auto& yield = y.get_yield_context();
try {
- asio::co_spawn(yield.get_executor(),
- add_entry(dpp, bucket_info, gen, shard_id),
- yield);
+ add_entry(dpp, bucket_info, gen, shard_id, y.get_yield_context());
} catch (const std::exception&) {
eptr = std::current_exception();
}
} else {
maybe_warn_about_blocking(dpp);
- eptr = asio::co_spawn(rados->get_executor(),
- add_entry(dpp, bucket_info, gen, shard_id),
- async::use_blocked);
+ eptr = asio::spawn(rados->get_executor(),
+ [this, dpp, &bucket_info, &gen,
+ &shard_id](asio::yield_context y) {
+ add_entry(dpp, bucket_info, gen, shard_id, y);
+ },
+ async::use_blocked);
}
return ceph::from_exception(eptr);
}
std::function<bool(const rgw_bucket& bucket, optional_yield y,
const DoutPrefixProvider *dpp)> bucket_filter;
bool going_down() const;
- asio::awaitable<bool> filter_bucket(const DoutPrefixProvider* dpp,
- const rgw_bucket& bucket) const;
+ bool filter_bucket(const DoutPrefixProvider* dpp,
+ const rgw_bucket& bucket,
+ asio::yield_context y) const;
asio::awaitable<void> renew_entries(const DoutPrefixProvider *dpp);
uint64_t watchcookie = 0;
const RGWBucketInfo& bucket_info,
const rgw::bucket_log_layout_generation& gen,
int shard_id);
+ void add_entry(const DoutPrefixProvider *dpp,
+ const RGWBucketInfo& bucket_info,
+ const rgw::bucket_log_layout_generation& gen,
+ int shard_id, asio::yield_context y);
int add_entry(const DoutPrefixProvider *dpp,
const RGWBucketInfo& bucket_info,
const rgw::bucket_log_layout_generation& gen,
}
public:
using entries = std::variant<std::vector<cls::log::entry>,
- std::vector<ceph::buffer::list>>;
+ std::deque<ceph::buffer::list>>;
const uint64_t gen_id;
ceph::buffer::list&& entry, entries& out) = 0;
virtual asio::awaitable<void> push(const DoutPrefixProvider *dpp, int index,
entries&& items) = 0;
- virtual asio::awaitable<void> push(const DoutPrefixProvider *dpp, int index,
- ceph::real_time now,
- const std::string& key,
- ceph::buffer::list&& bl) = 0;
+ virtual void push(const DoutPrefixProvider *dpp, int index,
+ ceph::real_time now,
+ const std::string& key,
+ ceph::buffer::list&& bl,
+ asio::yield_context y) = 0;
virtual asio::awaitable<std::tuple<std::span<rgw_data_change_log_entry>,
std::string>>
list(const DoutPrefixProvider* dpp, int shard,
#include <string_view>
#include <boost/asio/awaitable.hpp>
+#include <boost/asio/spawn.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/use_awaitable.hpp>
co_return;
}
+ void lazy_init(const DoutPrefixProvider *dpp, asio::yield_context y) {
+ std::unique_lock l(m);
+ if (fifo) {
+ return;
+ } else {
+ l.unlock();
+ // FIFO supports multiple clients by design, so it's safe to
+ // race to create them.
+ auto fifo_tmp = fifo::FIFO::create(dpp, r, oid, loc, y);
+ l.lock();
+ if (!fifo) {
+ // We won the race
+ fifo = std::move(fifo_tmp);
+ }
+ }
+ l.unlock();
+ return;
+ }
+
public:
LazyFIFO(neorados::RADOS& r, std::string oid, neorados::IOContext loc)
: r(r), oid(std::move(oid)), loc(std::move(loc)) {}
- template <typename... Args>
- asio::awaitable<void> push(const DoutPrefixProvider *dpp, Args&& ...args) {
+ asio::awaitable<void> push(const DoutPrefixProvider *dpp,
+ std::deque<ceph::buffer::list> entries) {
co_await lazy_init(dpp);
- co_return co_await fifo->push(dpp, std::forward<Args>(args)...,
- asio::use_awaitable);
+ co_return co_await fifo->push(dpp, std::move(entries), asio::use_awaitable);
+ }
+
+ void push(const DoutPrefixProvider *dpp,
+ ceph::buffer::list entry,
+ asio::yield_context y) {
+ lazy_init(dpp, y);
+ fifo->push(dpp, std::move(entry), y);
}
asio::awaitable<std::tuple<std::span<fifo::entry>, std::optional<std::string>>>