]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Use `run_coro` to call coroutines at use
authorAdam C. Emerson <aemerson@redhat.com>
Fri, 18 Apr 2025 07:31:35 +0000 (03:31 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Wed, 6 Aug 2025 20:21:27 +0000 (16:21 -0400)
This avoids having two entry points with different error checking
preparation, etc. to get out of sync or have a fix get forgotten.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h
src/rgw/driver/rados/rgw_rest_log.cc
src/rgw/driver/rados/rgw_trim_datalog.cc
src/rgw/radosgw-admin/radosgw-admin.cc
src/test/rgw/test_datalog.cc

index b1f6643a5d68955aa3e92fd01d38ec35bd17ae2a..1328f15b8b3ec2bd677cda833ba60ef8f0053c52 100644 (file)
@@ -1083,7 +1083,7 @@ DataLogBackends::list(const DoutPrefixProvider *dpp, int shard,
 }
 
 asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
-                          std::string>>
+                          std::string, bool>>
 RGWDataChangesLog::list_entries(const DoutPrefixProvider* dpp, int shard,
                                int max_entries, std::string marker)
 {
@@ -1095,71 +1095,24 @@ RGWDataChangesLog::list_entries(const DoutPrefixProvider* dpp, int shard,
   }
   if (max_entries <= 0) {
     co_return std::make_tuple(std::vector<rgw_data_change_log_entry>{},
-                             std::string{});
+                             std::string{}, false);
   }
   std::vector<rgw_data_change_log_entry> entries(max_entries);
   entries.resize(max_entries);
   auto [spanentries, outmark] = co_await bes->list(dpp, shard, entries, marker);
   entries.resize(spanentries.size());
-  co_return std::make_tuple(std::move(entries), std::move(outmark));
-}
-
-int RGWDataChangesLog::list_entries(
-  const DoutPrefixProvider *dpp, int shard,
-  int max_entries, std::vector<rgw_data_change_log_entry>& entries,
-  std::string_view marker, std::string* out_marker, bool* truncated,
-  std::string* errstr, optional_yield y)
-{
-  std::tuple<std::span<rgw_data_change_log_entry>,
-            std::string> out;
-  if (shard >= num_shards) [[unlikely]] {
-    if (errstr) {
-    *errstr = fmt::format("{} is not a valid shard. Valid shards are integers in [0, {})",
-                        shard, num_shards);
-    }
-    return -EINVAL;
-  }
-  if (std::ssize(entries) < max_entries) {
-    entries.resize(max_entries);
-  }
-  try {
-    if (y) {
-      auto& yield = y.get_yield_context();
-      out = asio::co_spawn(yield.get_executor(),
-                          bes->list(dpp, shard, entries,
-                                    std::string{marker}),
-                          yield);
-    } else {
-      maybe_warn_about_blocking(dpp);
-      out = asio::co_spawn(rados->get_executor(),
-                          bes->list(dpp, shard, entries,
-                                    std::string{marker}),
-                          async::use_blocked);
-    }
-  } catch (const std::exception&) {
-    return ceph::from_exception(std::current_exception());
-  }
-  auto& [outries, outmark] = out;
-  if (auto size = std::ssize(outries); size < std::ssize(entries)) {
-    entries.resize(size);
-  }
-  if (truncated) {
-    *truncated = !outmark.empty();
-  }
-  if (out_marker) {
-    *out_marker = std::move(outmark);
-  }
-  return 0;
+  bool truncated = !outmark.empty();
+  co_return std::make_tuple(std::move(entries), std::move(outmark), truncated);
 }
 
 asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
-                          RGWDataChangesLogMarker>>
+                          RGWDataChangesLogMarker, bool>>
 RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,
                                int max_entries, RGWDataChangesLogMarker marker)
 {
   if (max_entries <= 0) {
     co_return std::make_tuple(std::vector<rgw_data_change_log_entry>{},
-                             RGWDataChangesLogMarker{});
+                             RGWDataChangesLogMarker{}, false);
   }
 
   std::vector<rgw_data_change_log_entry> entries(max_entries);
@@ -1183,78 +1136,25 @@ RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,
   if (!remaining.empty()) {
     entries.resize(entries.size() - remaining.size());
   }
-  co_return std::make_tuple(std::move(entries), std::move(marker));
+  bool truncated = marker;
+  co_return std::make_tuple(std::move(entries), std::move(marker), truncated);
 }
 
-int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp,int max_entries,
-                                   std::vector<rgw_data_change_log_entry>& entries,
-                                   RGWDataChangesLogMarker& marker, bool *ptruncated,
-                                   optional_yield y)
-{
-  std::tuple<std::vector<rgw_data_change_log_entry>,
-            RGWDataChangesLogMarker> out;
-  if (std::ssize(entries) < max_entries) {
-    entries.resize(max_entries);
-  }
-  try {
-    if (y) {
-      auto& yield = y.get_yield_context();
-      out = asio::co_spawn(yield.get_executor(),
-                          list_entries(dpp, max_entries,
-                                       RGWDataChangesLogMarker{marker}),
-                          yield);
-  } else {
-      maybe_warn_about_blocking(dpp);
-      out = asio::co_spawn(rados->get_executor(),
-                          list_entries(dpp, max_entries,
-                                       RGWDataChangesLogMarker{marker}),
-                          async::use_blocked);
-    }
-  } catch (const std::exception&) {
-    return ceph::from_exception(std::current_exception());
-  }
-  auto& [outries, outmark] = out;
-  if (auto size = std::ssize(outries); size < std::ssize(entries)) {
-    entries.resize(size);
-  }
-  if (ptruncated) {
-    *ptruncated = (outmark.shard > 0 || !outmark.marker.empty());
-  }
-  marker = std::move(outmark);
-  return 0;
-}
-
-int RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id,
-                               RGWDataChangesLogInfo* info,
-                               std::string* errstr, optional_yield y)
+asio::awaitable<RGWDataChangesLogInfo>
+RGWDataChangesLog::get_info(const DoutPrefixProvider* dpp, int shard_id)
 {
   if (shard_id >= num_shards) [[unlikely]] {
-    if (errstr) {
-      *errstr = fmt::format(
+    throw sys::system_error{-EINVAL, sys::generic_category(),
+      fmt::format(
        "{} is not a valid shard. Valid shards are integers in [0, {})",
-       shard_id, num_shards);
-    }
+       shard_id, num_shards)};
   }
   auto be = bes->head();
-  try {
-    if (y) {
-      auto& yield = y.get_yield_context();
-      *info = asio::co_spawn(yield.get_executor(),
-                            be->get_info(dpp, shard_id),
-                            yield);
-    } else {
-      maybe_warn_about_blocking(dpp);
-      *info = asio::co_spawn(rados->get_executor(),
-                            be->get_info(dpp, shard_id),
-                            async::use_blocked);
-    }
-  } catch (const std::exception&) {
-    return ceph::from_exception(std::current_exception());
-  }
-  if (!info->marker.empty()) {
-    info->marker = gencursor(be->gen_id, info->marker);
+  auto info = co_await be->get_info(dpp, shard_id);
+  if (!info.marker.empty()) {
+    info.marker = gencursor(be->gen_id, info.marker);
   }
-  return 0;
+  co_return info;
 }
 
 asio::awaitable<void> DataLogBackends::trim_entries(
@@ -1284,46 +1184,27 @@ asio::awaitable<void> DataLogBackends::trim_entries(
   co_return;
 }
 
-int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
-                                   std::string_view marker, std::string* errstr,
-                                   optional_yield y)
+asio::awaitable<void>
+RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+                                   std::string_view marker)
 {
   if (shard_id >= num_shards) [[unlikely]] {
-    if (errstr) {
-      *errstr = fmt::format(
+    throw sys::system_error{-EINVAL, sys::generic_category(),
+      fmt::format(
        "{} is not a valid shard. Valid shards are integers in [0, {})",
-       shard_id, num_shards);
-    }
-  }
-  try {
-    if (y) {
-      auto& yield = y.get_yield_context();
-      asio::co_spawn(yield.get_executor(),
-                    bes->trim_entries(dpp, shard_id, marker),
-                    yield);
-    } else {
-      maybe_warn_about_blocking(dpp);
-      asio::co_spawn(rados->get_executor(),
-                    bes->trim_entries(dpp, shard_id, marker),
-                    async::use_blocked);
-    }
-  } catch (const std::exception& e) {
-    return ceph::from_exception(std::current_exception());
+       shard_id, num_shards)};
   }
-  return 0;
+  auto be = bes->head();
+  co_return co_await bes->trim_entries(dpp, shard_id, marker);
 }
 
-int RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id,
-                                   std::string_view marker,
-                                   librados::AioCompletion* c)
+void RGWDataChangesLog::trim_entries(const DoutPrefixProvider* dpp, int shard_id,
+                                    std::string_view marker,
+                                    librados::AioCompletion* c)
 {
-  if (shard_id >= num_shards) [[unlikely]] {
-    return -EINVAL;
-  }
   asio::co_spawn(rados->get_executor(),
-                bes->trim_entries(dpp, shard_id, marker),
+                trim_entries(dpp, shard_id, marker),
                 c);
-  return 0;
 }
 
 
@@ -1493,7 +1374,7 @@ asio::awaitable<void> RGWDataChangesLog::renew_run() {
        std::optional<uint64_t> through;
        ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl;
        operation = "trim_generations"sv;
-       co_await bes->trim_generations(&dp, through);
+       co_await trim_generations(&dp, through);
        operation = {};
        if (through) {
          ldpp_dout(&dp, 2)
@@ -1560,54 +1441,22 @@ std::string RGWDataChangesLog::max_marker() const {
                   "~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~");
 }
 
-int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp,
-                                    log_type type,optional_yield y)
+asio::awaitable<void>
+RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type type)
 {
-  try {
-    if (y) {
-      auto& yield = y.get_yield_context();
-      asio::co_spawn(yield.get_executor(),
-                    bes->new_backing(dpp, type),
-                    yield);
-    } else {
-      maybe_warn_about_blocking(dpp);
-      asio::co_spawn(rados->get_executor(),
-                    bes->new_backing(dpp, type),
-                    async::use_blocked);
-    }
-  } catch (const std::exception&) {
-    return ceph::from_exception(std::current_exception());
-  }
-  return 0;;
+  co_return co_await bes->new_backing(dpp, type);
 }
 
-int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
-                                       std::optional<uint64_t>& through,
-                                       optional_yield y)
+asio::awaitable<void>
+RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
+                                   std::optional<uint64_t>& through)
 {
-  try {
-    if (y) {
-      auto& yield = y.get_yield_context();
-      asio::co_spawn(yield.get_executor(),
-                    bes->trim_generations(dpp, through),
-                    yield);
-    } else {
-      maybe_warn_about_blocking(dpp);
-      asio::co_spawn(rados->get_executor(),
-                    bes->trim_generations(dpp, through),
-                    async::use_blocked);
-    }
-  } catch (const std::exception& e) {
-    return ceph::from_exception(std::current_exception());
-  }
-
-  return 0;
+  co_return co_await bes->trim_generations(dpp, through);
 }
 
 asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
                          std::string>>
 RGWDataChangesLog::read_sems(int index, std::string cursor) {
-  namespace sem_set = neorados::cls::sem_set;
   bc::flat_map<std::string, uint64_t> out;
   try {
     co_await rados->execute(
index da9fc5fd68afd8e37fd18ae249d5c4c76883eae0..51f3b19584d9675818179d0fe5814ec6ae1b4c7e 100644 (file)
@@ -470,30 +470,25 @@ public:
                int shard_id, optional_yield y);
   int get_log_shard_id(rgw_bucket& bucket, int shard_id);
   asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
-                            std::string>>
+                            std::string, bool>>
   list_entries(const DoutPrefixProvider* dpp, int shard, int max_entries,
               std::string marker);
-  int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
-                  std::vector<rgw_data_change_log_entry>& entries,
-                  std::string_view marker, std::string* out_marker,
-                  bool* truncated, std::string* errstr, optional_yield y);
   asio::awaitable<std::tuple<std::vector<rgw_data_change_log_entry>,
-                            RGWDataChangesLogMarker>>
+                            RGWDataChangesLogMarker, bool>>
   list_entries(const DoutPrefixProvider *dpp, int max_entries,
               RGWDataChangesLogMarker marker);
-  int list_entries(const DoutPrefixProvider *dpp, int max_entries,
-                  std::vector<rgw_data_change_log_entry>& entries,
-                  RGWDataChangesLogMarker& marker, bool* ptruncated,
-                  optional_yield y);
-
-  int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
-                  std::string_view marker, std::string* errstr, optional_yield y);
-  int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
-                  std::string_view marker, librados::AioCompletion* c);
-  int get_info(const DoutPrefixProvider *dpp, int shard_id,
-              RGWDataChangesLogInfo *info, std::string* errstr,
-              optional_yield y);
-
+  asio::awaitable<RGWDataChangesLogInfo>
+  get_info(const DoutPrefixProvider* dpp, int shard_id);
+  asio::awaitable<void>
+  trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+              std::string_view marker);
+  void trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+                   std::string_view marker, librados::AioCompletion* c);
+  asio::awaitable<void>
+  trim_generations(const DoutPrefixProvider *dpp,
+                  std::optional<uint64_t>& through);
+  asio::awaitable<void>
+  change_format(const DoutPrefixProvider *dpp, log_type type);
   void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen);
   auto read_clear_modified() {
     std::unique_lock wl{modified_lock};
@@ -516,11 +511,6 @@ public:
   std::string get_sem_set_oid(int shard_id) const;
 
 
-  int change_format(const DoutPrefixProvider *dpp, log_type type,
-                   optional_yield y);
-  int trim_generations(const DoutPrefixProvider *dpp,
-                      std::optional<uint64_t>& through,
-                      optional_yield y);
   asio::awaitable<std::pair<bc::flat_map<std::string, uint64_t>,
                            std::string>>
   read_sems(int index, std::string cursor);
index d3d71465d0a18a6b5b43b188fa95beaa5604007f..61a54d9a02446fd189eb5610dc6becc54e981570 100644 (file)
@@ -15,6 +15,7 @@
 
 #include "common/ceph_json.h"
 #include "common/strtol.h"
+#include "rgw/async_utils.h"
 #include "rgw_rest.h"
 #include "rgw_op.h"
 #include "rgw_rest_s3.h"
@@ -688,13 +689,22 @@ void RGWOp_DATALog_List::execute(optional_yield y) {
 
   // Note that last_marker is updated to be the marker of the last
   // entry listed
-  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
-    datalog_rados->list_entries(this, shard_id, max_entries, entries,
-                               marker, &last_marker, &truncated, nullptr, y);
+  auto store = static_cast<rgw::sal::RadosStore*>(driver);
+  op_ret = rgw::run_coro(
+    this,
+    store->get_io_context(),
+    store->svc()->datalog_rados->list_entries(this, shard_id,
+                                             max_entries, marker),
+    std::tie(entries, last_marker, truncated),
+    "RGWDataChangesLog::list_entries", y);
+
 
   RGWDataChangesLogInfo info;
-  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
-    datalog_rados->get_info(this, shard_id, &info, nullptr, y);
+  op_ret = rgw::run_coro(
+    this,
+    store->get_io_context(),
+    store->svc()->datalog_rados->get_info(this, shard_id),
+    info, "RGWDataChangesLog::get_info", y);
 
   last_update = info.last_update;
 }
@@ -756,8 +766,10 @@ void RGWOp_DATALog_ShardInfo::execute(optional_yield y) {
     return;
   }
 
-  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
-    datalog_rados->get_info(this, shard_id, &info, nullptr, y);
+  auto store = static_cast<rgw::sal::RadosStore*>(driver);
+  op_ret = rgw::run_coro(this, store->get_io_context(),
+                        store->svc()->datalog_rados->get_info(this, shard_id),
+                        info, "RGWDataChangesLog::get_info", y);
 }
 
 void RGWOp_DATALog_ShardInfo::send_response() {
@@ -906,8 +918,11 @@ void RGWOp_DATALog_Delete::execute(optional_yield y) {
     return;
   }
 
-  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
-    datalog_rados->trim_entries(this, shard_id, marker, nullptr, y);
+  auto store = static_cast<rgw::sal::RadosStore*>(driver);
+  op_ret = rgw::run_coro(
+    this, store->get_io_context(),
+    store->svc()->datalog_rados->trim_entries(this, shard_id, marker),
+    "RGWDataChangesLog::trim_entries", y);
 }
 
 // not in header to avoid pulling in rgw_sync.h
index 5dcddb659e1e51c69fb90022eaa285c1041744b5..4f1462ce669e92d55cd8b10db29cddb913a1c6ca 100644 (file)
@@ -45,8 +45,10 @@ class DatalogTrimImplCR : public RGWSimpleCoroutine {
   int send_request(const DoutPrefixProvider *dpp) override {
     set_status() << "sending request";
     cn = stack->create_completion_notifier();
-    return store->svc()->datalog_rados->trim_entries(dpp, shard, marker,
-                                                    cn->completion());
+    // Call cannot fail, all errors will be reported through the completion
+    store->svc()->datalog_rados->trim_entries(dpp, shard, marker,
+                                             cn->completion());
+    return 0;
   }
   int request_complete() override {
     int r = cn->completion()->get_return_value();
index 62d07867fdbc9631b1bddbff8add68e7500d4bfc..377665ef5b344849fcce213fa3fd904cbe4d5462 100644 (file)
@@ -2,7 +2,7 @@
 // vim: ts=8 sw=2 smarttab ft=cpp
 
 /*
- * Copyright (C) 2025 IBM 
+ * Copyright (C) 2025 IBM
  */
 
 #include <cerrno>
@@ -48,6 +48,9 @@ extern "C" {
 
 #include "radosgw-admin/orphan.h"
 #include "radosgw-admin/sync_checkpoint.h"
+
+#include "rgw/async_utils.h"
+
 #include "rgw_user.h"
 #include "rgw_otp.h"
 #include "rgw_rados.h"
@@ -123,6 +126,7 @@ static const DoutPrefixProvider* dpp() {
   } while (0)
 
 using namespace std;
+using rgw::run_coro;
 
 inline int posix_errortrans(int r)
 {
@@ -3529,23 +3533,6 @@ void init_realm_param(CephContext *cct, string& var, std::optional<string>& opt_
   }
 }
 
-int run_coro(asio::awaitable<void> coro, std::string_view name) {
-  try {
-    // Blocking in startup code, not ideal, but won't hurt anything.
-    asio::co_spawn(static_cast<rgw::sal::RadosStore*>(driver)->get_io_context(),
-                  std::move(coro),
-                  async::use_blocked);
-  } catch (boost::system::system_error& e) {
-    ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl;
-    return ceph::from_error_code(e.code());
-  } catch (std::exception& e) {
-    ldpp_dout(dpp(), -1) << name << ": failed: " << e.what() << dendl;
-    return -EIO;
-  }
-  return 0;
-}
-
-
 // This has an uncaught exception. Even if the exception is caught, the program
 // would need to be terminated, so the warning is simply suppressed.
 // coverity[root_function:SUPPRESS]
@@ -10990,10 +10977,13 @@ next:
     if (specified_shard_id) {
       shard = shard_id;
     }
-    ret = run_coro(datalog->admin_sem_list(shard, max_entries, marker,
+    std::string err;
+    ret = run_coro(dpp(), context_pool,
+                  datalog->admin_sem_list(shard, max_entries, marker,
                                           cout, *formatter),
-                  "datalog seamphore list");
+                  &err);
     if (ret < 0) {
+      std::cerr << "datalog semaphore list: " << err << std::endl;
       return ret;
     }
   }
@@ -11003,18 +10993,21 @@ next:
       std::cerr << "Specify the semaphore key with --marker." << std::endl;
       return -EINVAL;
     }
+    std::string errstr;
     auto datalog = static_cast<rgw::sal::RadosStore*>(driver)
       ->svc()->datalog_rados;
-    ret = run_coro(datalog->admin_sem_reset(marker, count.value_or(0)),
-                  "datalog seamphore reset");
+    ret = rgw::run_coro(dpp(), context_pool,
+                       datalog->admin_sem_reset(marker, count.value_or(0)),
+                       &errstr);
     if (ret < 0) {
+      std::cerr << "datalog semaphore reset: " << errstr << std::endl;
       return ret;
     }
   }
 
   if (opt_cmd == OPT::DATALOG_LIST) {
     formatter->open_array_section("entries");
-    bool truncated;
+    bool truncated = false;
     int count = 0;
     if (max_entries < 0)
       max_entries = 1000;
@@ -11046,13 +11039,20 @@ next:
     do {
       std::vector<rgw_data_change_log_entry> entries;
       if (specified_shard_id) {
-        ret = datalog_svc->list_entries(dpp(), shard_id, max_entries - count,
-                                       entries, marker,
-                                       &marker, &truncated,
-                                       &errstr, null_yield);
+       ret = run_coro(
+         dpp(),
+         context_pool,
+         datalog_svc->list_entries(dpp(), shard_id, max_entries - count,
+                                   marker),
+         std::tie(entries, marker, truncated),
+         &errstr);
       } else {
-        ret = datalog_svc->list_entries(dpp(), max_entries - count, entries,
-                                       log_marker, &truncated, null_yield);
+       ret = run_coro(
+         dpp(),
+         context_pool,
+         datalog_svc->list_entries(dpp(), max_entries - count, log_marker),
+         std::tie(entries, log_marker, truncated),
+         &errstr);
       }
       if (ret < 0) {
         cerr << "ERROR: datalog_svc->list_entries(): " << errstr << ": "
@@ -11083,9 +11083,18 @@ next:
     for (; i < g_ceph_context->_conf->rgw_data_log_num_shards; i++) {
       vector<cls::log::entry> entries;
 
+      std::string errstr;
       RGWDataChangesLogInfo info;
-      static_cast<rgw::sal::RadosStore*>(driver)->svc()->
-       datalog_rados->get_info(dpp(), i, &info, nullptr, null_yield);
+
+      int r = run_coro(dpp(), context_pool,
+                      static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+                      datalog_rados->get_info(dpp(), i),
+                      info, &errstr);
+
+      if (r < 0) {
+       std::cerr << "datalog status: " << errstr << std::endl;
+       return -r;
+      }
 
       ::encode_json("info", info, formatter.get());
 
@@ -11147,11 +11156,14 @@ next:
       return EINVAL;
     }
 
+    std::string errstr;
     auto datalog = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados;
-    ret = datalog->trim_entries(dpp(), shard_id, marker, nullptr, null_yield);
+    ret = run_coro(dpp(), context_pool,
+                  datalog->trim_entries(dpp(), shard_id, marker),
+                  &errstr);
 
     if (ret < 0 && ret != -ENODATA) {
-      cerr << "ERROR: trim_entries(): " << cpp_strerror(-ret) << std::endl;
+      cerr << "ERROR: trim_entries(): " << errstr << std::endl;
       return -ret;
     }
   }
@@ -11162,9 +11174,12 @@ next:
       return -EINVAL;
     }
     auto datalog = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados;
-    ret = datalog->change_format(dpp(), *opt_log_type, null_yield);
+    std::string errstr;
+    ret = run_coro(dpp(), context_pool,
+                  datalog->change_format(dpp(), *opt_log_type),
+                  &errstr);
     if (ret < 0) {
-      cerr << "ERROR: change_format(): " << cpp_strerror(-ret) << std::endl;
+      cerr << "ERROR: change_format(): " << errstr << std::endl;
       return -ret;
     }
   }
@@ -11172,10 +11187,13 @@ next:
   if (opt_cmd == OPT::DATALOG_PRUNE) {
     auto datalog = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados;
     std::optional<uint64_t> through;
-    ret = datalog->trim_generations(dpp(), through, null_yield);
+    std::string errstr;
+    ret = run_coro(dpp(), context_pool,
+                  datalog->trim_generations(dpp(), through),
+                  &errstr);
 
     if (ret < 0) {
-      cerr << "ERROR: trim_generations(): " << cpp_strerror(-ret) << std::endl;
+      cerr << "ERROR: trim_generations(): " << errstr << std::endl;
       return -ret;
     }
 
index 0400fed85b4d86ae75caeb7cc5e34738bb43ce08..14b232b8034eaf2098b07036db360e3addedebe2 100644 (file)
@@ -132,7 +132,7 @@ protected:
     RGWDataChangesLogMarker marker;
     do {
       std::vector<rgw_data_change_log_entry> entries;
-      std::tie(entries, marker) =
+      std::tie(entries, marker, std::ignore) =
        co_await datalog->list_entries(dpp, 1'000,
                                       std::move(marker));
       for (const auto& entry : entries) {