]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: Remove (most) uses of null_yield in datalog
authorAdam C. Emerson <aemerson@redhat.com>
Sat, 14 Jan 2023 00:24:33 +0000 (19:24 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 23 Jan 2023 17:24:31 +0000 (12:24 -0500)
The only ones remaining are in calls from top-level threads.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/driver/rados/rgw_datalog.cc
src/rgw/driver/rados/rgw_datalog.h
src/rgw/driver/rados/rgw_rados.cc
src/rgw/driver/rados/rgw_rados.h
src/rgw/driver/rados/rgw_reshard.cc
src/rgw/driver/rados/rgw_rest_log.cc
src/rgw/rgw_admin.cc
src/rgw/services/svc_bi.h
src/rgw/services/svc_bi_rados.cc
src/rgw/services/svc_bi_rados.h
src/rgw/services/svc_bucket_sobj.cc

index 3eeb820e2eba955f8390c61f0ed95449de35736d..7ca37abf684846318b8920a671c1f6a535b86db6 100644 (file)
@@ -3,6 +3,7 @@
 
 #include <vector>
 
+#include "common/async/yield_context.h"
 #include "common/debug.h"
 #include "common/containers.h"
 #include "common/errno.h"
@@ -115,10 +116,10 @@ public:
     cls_log_add_prepare_entry(e, utime_t(ut), {}, key, entry);
     std::get<centries>(out).push_back(std::move(e));
   }
-  int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
+  int push(const DoutPrefixProvider *dpp, int index, entries&& items, optional_yield y) override {
     lr::ObjectWriteOperation op;
     cls_log_add(op, std::get<centries>(items), true);
-    auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
+    auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                 << ": failed to push to " << oids[index] << cpp_strerror(-r)
@@ -127,11 +128,11 @@ public:
     return r;
   }
   int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now,
-          const std::string& key,
-          ceph::buffer::list&& bl) override {
+          const std::string& key, ceph::buffer::list&& bl,
+          optional_yield y) override {
     lr::ObjectWriteOperation op;
     cls_log_add(op, utime_t(now), {}, key, bl);
-    auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
+    auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                 << ": failed to push to " << oids[index]
@@ -142,12 +143,13 @@ public:
   int list(const DoutPrefixProvider *dpp, int index, int max_entries,
           std::vector<rgw_data_change_log_entry>& entries,
           std::optional<std::string_view> marker,
-          std::string* out_marker, bool* truncated) override {
+          std::string* out_marker, bool* truncated,
+          optional_yield y) override {
     std::list<cls_log_entry> log_entries;
     lr::ObjectReadOperation op;
     cls_log_list(op, {}, {}, std::string(marker.value_or("")),
                 max_entries, log_entries, out_marker, truncated);
-    auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
+    auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, y);
     if (r == -ENOENT) {
       *truncated = false;
       return 0;
@@ -176,11 +178,12 @@ public:
     }
     return 0;
   }
-  int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
+  int get_info(const DoutPrefixProvider *dpp, int index,
+              RGWDataChangesLogInfo *info, optional_yield y) override {
     cls_log_header header;
     lr::ObjectReadOperation op;
     cls_log_info(op, &header);
-    auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, null_yield);
+    auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, nullptr, y);
     if (r == -ENOENT) r = 0;
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
@@ -192,10 +195,11 @@ public:
     }
     return r;
   }
-  int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
+  int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
+          optional_yield y) override {
     lr::ObjectWriteOperation op;
     cls_log_trim(op, {}, {}, {}, std::string(marker));
-    auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, null_yield);
+    auto r = rgw_rados_operate(dpp, ioctx, oids[index], &op, y);
     if (r == -ENOENT) r = -ENODATA;
     if (r < 0 && r != -ENODATA) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
@@ -220,14 +224,14 @@ public:
   std::string_view max_marker() const override {
     return "99999999";
   }
-  int is_empty(const DoutPrefixProvider *dpp) override {
+  int is_empty(const DoutPrefixProvider *dpp, optional_yield y) override {
     for (auto shard = 0u; shard < oids.size(); ++shard) {
       std::list<cls_log_entry> log_entries;
       lr::ObjectReadOperation op;
       std::string out_marker;
       bool truncated;
       cls_log_list(op, {}, {}, {}, 1, log_entries, &out_marker, &truncated);
-      auto r = rgw_rados_operate(dpp, ioctx, oids[shard], &op, nullptr, null_yield);
+      auto r = rgw_rados_operate(dpp, ioctx, oids[shard], &op, nullptr, y);
       if (r == -ENOENT) {
        continue;
       }
@@ -266,8 +270,9 @@ public:
     }
     std::get<centries>(out).push_back(std::move(entry));
   }
-  int push(const DoutPrefixProvider *dpp, int index, entries&& items) override {
-    auto r = fifos[index].push(dpp, std::get<centries>(items), null_yield);
+  int push(const DoutPrefixProvider *dpp, int index, entries&& items,
+          optional_yield y) override {
+    auto r = fifos[index].push(dpp, std::get<centries>(items), y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                 << ": unable to push to FIFO: " << get_oid(index)
@@ -276,9 +281,9 @@ public:
     return r;
   }
   int push(const DoutPrefixProvider *dpp, int index, ceph::real_time,
-          const std::string&,
-          ceph::buffer::list&& bl) override {
-    auto r = fifos[index].push(dpp, std::move(bl), null_yield);
+          const std::string&, ceph::buffer::list&& bl,
+          optional_yield y) override {
+    auto r = fifos[index].push(dpp, std::move(bl), y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                 << ": unable to push to FIFO: " << get_oid(index)
@@ -288,12 +293,12 @@ public:
   }
   int list(const DoutPrefixProvider *dpp, int index, int max_entries,
           std::vector<rgw_data_change_log_entry>& entries,
-          std::optional<std::string_view> marker,
-          std::string* out_marker, bool* truncated) override {
+          std::optional<std::string_view> marker, std::string* out_marker,
+          bool* truncated, optional_yield y) override {
     std::vector<rgw::cls::fifo::list_entry> log_entries;
     bool more = false;
     auto r = fifos[index].list(dpp, max_entries, marker, &log_entries, &more,
-                              null_yield);
+                              y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                 << ": unable to list FIFO: " << get_oid(index)
@@ -322,9 +327,10 @@ public:
     }
     return 0;
   }
-  int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) override {
+  int get_info(const DoutPrefixProvider *dpp, int index,
+              RGWDataChangesLogInfo *info, optional_yield y) override {
     auto& fifo = fifos[index];
-    auto r = fifo.read_meta(dpp, null_yield);
+    auto r = fifo.read_meta(dpp, y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                 << ": unable to get FIFO metadata: " << get_oid(index)
@@ -332,7 +338,7 @@ public:
       return r;
     }
     rados::cls::fifo::info m;
-    fifo.meta(dpp, m, null_yield);
+    fifo.meta(dpp, m, y);
     auto p = m.head_part_num;
     if (p < 0) {
       info->marker = "";
@@ -340,7 +346,7 @@ public:
       return 0;
     }
     rgw::cls::fifo::part_info h;
-    r = fifo.get_part_info(dpp, p, &h, null_yield);
+    r = fifo.get_part_info(dpp, p, &h, y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                 << ": unable to get part info: " << get_oid(index) << "/" << p
@@ -351,8 +357,9 @@ public:
     info->last_update = h.max_time;
     return 0;
   }
-  int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) override {
-    auto r = fifos[index].trim(dpp, marker, false, null_yield);
+  int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
+          optional_yield y) override {
+    auto r = fifos[index].trim(dpp, marker, false, y);
     if (r < 0) {
       ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                 << ": unable to trim FIFO: " << get_oid(index)
@@ -366,6 +373,13 @@ public:
     if (marker == rgw::cls::fifo::marker(0, 0).to_string()) {
       rgw_complete_aio_completion(c, -ENODATA);
     } else {
+      // This null_yield is used for lazily opening FIFOs.
+      //
+      // shouldn't exist, but it can't be eliminated
+      // since your caller is an RGWCoroutine in the data sync code.
+      //
+      // It can be eliminated after Reef when we can get rid of
+      // AioCompletion entirely.
       fifos[index].trim(dpp, marker, false, c, null_yield);
     }
     return r;
@@ -375,12 +389,11 @@ public:
       rgw::cls::fifo::marker::max().to_string();
     return std::string_view(mm);
   }
-  int is_empty(const DoutPrefixProvider *dpp) override {
+  int is_empty(const DoutPrefixProvider *dpp, optional_yield y) override {
     std::vector<rgw::cls::fifo::list_entry> log_entries;
     bool more = false;
     for (auto shard = 0u; shard < fifos.size(); ++shard) {
-      auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more,
-                                null_yield);
+      auto r = fifos[shard].list(dpp, 1, {}, &log_entries, &more, y);
       if (r < 0) {
        ldpp_dout(dpp, -1) << __PRETTY_FUNCTION__
                   << ": unable to list FIFO: " << get_oid(shard)
@@ -479,6 +492,7 @@ int RGWDataChangesLog::start(const DoutPrefixProvider *dpp, const RGWZone* _zone
     return -r;
   }
 
+  // This null_yield is in startup code, so it doesn't matter that much.
   auto besr = logback_generations::init<DataLogBackends>(
     dpp, ioctx, metadata_log_oid(), [this](uint64_t gen_id, int shard) {
       return get_oid(gen_id, shard);
@@ -544,7 +558,8 @@ int RGWDataChangesLog::renew_entries(const DoutPrefixProvider *dpp)
 
     auto now = real_clock::now();
 
-    auto ret = be->push(dpp, index, std::move(entries));
+    // This null_yield can stay (for now) as we're in our own thread.
+    auto ret = be->push(dpp, index, std::move(entries), null_yield);
     if (ret < 0) {
       /* we don't really need to have a special handling for failed cases here,
        * as this is just an optimization. */
@@ -623,11 +638,11 @@ std::string RGWDataChangesLog::get_oid(uint64_t gen_id, int i) const {
 int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
                                 const RGWBucketInfo& bucket_info,
                                 const rgw::bucket_log_layout_generation& gen,
-                                int shard_id)
+                                int shard_id, optional_yield y)
 {
   auto& bucket = bucket_info.bucket;
 
-  if (!filter_bucket(dpp, bucket, null_yield)) {
+  if (!filter_bucket(dpp, bucket, y)) {
     return 0;
   }
 
@@ -705,7 +720,7 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
     ldpp_dout(dpp, 20) << "RGWDataChangesLog::add_entry() sending update with now=" << now << " cur_expiration=" << expiration << dendl;
 
     auto be = bes->head();
-    ret = be->push(dpp, index, now, change.key, std::move(bl));
+    ret = be->push(dpp, index, now, change.key, std::move(bl), y);
 
     now = real_clock::now();
 
@@ -730,9 +745,8 @@ int RGWDataChangesLog::add_entry(const DoutPrefixProvider *dpp,
 
 int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entries,
                          std::vector<rgw_data_change_log_entry>& entries,
-                         std::string_view marker,
-                         std::string* out_marker,
-                         bool* truncated)
+                         std::string_view marker, std::string* out_marker,
+                         bool* truncated, optional_yield y)
 {
   const auto [start_id, start_cursor] = cursorgen(marker);
   auto gen_id = start_id;
@@ -747,7 +761,7 @@ int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entr
     gen_id = be->gen_id;
     auto r = be->list(dpp, shard, max_entries, gentries,
                      gen_id == start_id ? start_cursor : std::string{},
-                     &out_cursor, truncated);
+                     &out_cursor, truncated, y);
     if (r < 0)
       return r;
 
@@ -772,22 +786,25 @@ int DataLogBackends::list(const DoutPrefixProvider *dpp, int shard, int max_entr
 int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
                                    std::vector<rgw_data_change_log_entry>& entries,
                                    std::string_view marker,
-                                   std::string* out_marker, bool* truncated)
+                                   std::string* out_marker, bool* truncated,
+                                   optional_yield y)
 {
   assert(shard < num_shards);
-  return bes->list(dpp, shard, max_entries, entries, marker, out_marker, truncated);
+  return bes->list(dpp, shard, max_entries, entries, marker, out_marker,
+                  truncated, y);
 }
 
 int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entries,
                                    std::vector<rgw_data_change_log_entry>& entries,
-                                   LogMarker& marker, bool *ptruncated)
+                                   LogMarker& marker, bool *ptruncated,
+                                   optional_yield y)
 {
   bool truncated;
   entries.clear();
   for (; marker.shard < num_shards && int(entries.size()) < max_entries;
        marker.shard++, marker.marker.clear()) {
     int ret = list_entries(dpp, marker.shard, max_entries - entries.size(),
-                          entries, marker.marker, NULL, &truncated);
+                          entries, marker.marker, NULL, &truncated, y);
     if (ret == -ENOENT) {
       continue;
     }
@@ -803,18 +820,20 @@ int RGWDataChangesLog::list_entries(const DoutPrefixProvider *dpp, int max_entri
   return 0;
 }
 
-int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info)
+int RGWDataChangesLog::get_info(const DoutPrefixProvider *dpp, int shard_id,
+                               RGWDataChangesLogInfo *info, optional_yield y)
 {
   assert(shard_id < num_shards);
   auto be = bes->head();
-  auto r = be->get_info(dpp, shard_id, info);
+  auto r = be->get_info(dpp, shard_id, info, y);
   if (!info->marker.empty()) {
     info->marker = gencursor(be->gen_id, info->marker);
   }
   return r;
 }
 
-int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
+int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+                                 std::string_view marker, optional_yield y)
 {
   auto [target_gen, cursor] = cursorgen(marker);
   std::unique_lock l(m);
@@ -827,7 +846,7 @@ int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, s
        be = upper_bound(be->gen_id)->second) {
     l.unlock();
     auto c = be->gen_id == target_gen ? cursor : be->max_marker();
-    r = be->trim(dpp, shard_id, c);
+    r = be->trim(dpp, shard_id, c, y);
     if (r == -ENOENT)
       r = -ENODATA;
     if (r == -ENODATA && be->gen_id < target_gen)
@@ -839,10 +858,11 @@ int DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id, s
   return r;
 }
 
-int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker)
+int RGWDataChangesLog::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+                                   std::string_view marker, optional_yield y)
 {
   assert(shard_id < num_shards);
-  return bes->trim_entries(dpp, shard_id, marker);
+  return bes->trim_entries(dpp, shard_id, marker, y);
 }
 
 class GenTrim : public rgw::cls::fifo::Completion<GenTrim> {
@@ -912,7 +932,9 @@ void DataLogBackends::trim_entries(const DoutPrefixProvider *dpp, int shard_id,
   be->trim(dpp, shard_id, cc,  GenTrim::call(std::move(gt)));
 }
 
-int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
+int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp,
+                                     std::optional<uint64_t>& through,
+                                     optional_yield y) {
   if (size() != 1) {
     std::vector<mapped_type> candidates;
     {
@@ -925,7 +947,7 @@ int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, std::option
 
     std::optional<uint64_t> highest;
     for (auto& be : candidates) {
-      auto r = be->is_empty(dpp);
+      auto r = be->is_empty(dpp, y);
       if (r < 0) {
        return r;
       } else if (r == 1) {
@@ -939,13 +961,13 @@ int DataLogBackends::trim_generations(const DoutPrefixProvider *dpp, std::option
     if (!highest) {
       return 0;
     }
-    auto ec = empty_to(dpp, *highest, null_yield);
+    auto ec = empty_to(dpp, *highest, y);
     if (ec) {
       return ceph::from_error_code(ec);
     }
   }
 
-  return ceph::from_error_code(remove_empty(dpp, null_yield));
+  return ceph::from_error_code(remove_empty(dpp, y));
 }
 
 
@@ -987,7 +1009,8 @@ void RGWDataChangesLog::renew_run() noexcept {
     if (run == runs_per_prune) {
       std::optional<uint64_t> through;
       ldpp_dout(&dp, 2) << "RGWDataChangesLog::ChangesRenewThread: pruning old generations" << dendl;
-      trim_generations(&dp, through);
+      // This null_yield can stay, for now, as it's in its own thread.
+      trim_generations(&dp, through, null_yield);
       if (r < 0) {
        derr << "RGWDataChangesLog::ChangesRenewThread: failed pruning r="
             << r << dendl;
@@ -1043,8 +1066,10 @@ int RGWDataChangesLog::change_format(const DoutPrefixProvider *dpp, log_type typ
   return ceph::from_error_code(bes->new_backing(dpp, type, y));
 }
 
-int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through) {
-  return bes->trim_generations(dpp, through);
+int RGWDataChangesLog::trim_generations(const DoutPrefixProvider *dpp,
+                                       std::optional<uint64_t>& through,
+                                       optional_yield y) {
+  return bes->trim_generations(dpp, through, y);
 }
 
 void RGWDataChangesLogInfo::dump(Formatter *f) const
index 1c9a00c1fffb956fe58d60874d4c2ffbb97f5be6..174cf86ded13b15f2109222081a96110dcc75fce 100644 (file)
@@ -18,6 +18,7 @@
 
 #include <fmt/format.h>
 
+#include "common/async/yield_context.h"
 #include "include/buffer.h"
 #include "include/encoding.h"
 #include "include/function2.hpp"
@@ -179,9 +180,10 @@ public:
   }
   int list(const DoutPrefixProvider *dpp, int shard, int max_entries,
           std::vector<rgw_data_change_log_entry>& entries,
-          std::string_view marker,
-          std::string* out_marker, bool* truncated);
-  int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker);
+          std::string_view marker, std::string* out_marker, bool* truncated,
+          optional_yield y);
+  int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+                  std::string_view marker, optional_yield y);
   void trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
                    librados::AioCompletion* c);
   void set_zero(RGWDataChangesBE* be) {
@@ -192,7 +194,9 @@ public:
   bs::error_code handle_new_gens(entries_t e) noexcept override;
   bs::error_code handle_empty_to(uint64_t new_tail) noexcept override;
 
-  int trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through);
+  int trim_generations(const DoutPrefixProvider *dpp,
+                      std::optional<uint64_t>& through,
+                      optional_yield y);
 };
 
 struct BucketGen {
@@ -294,22 +298,26 @@ public:
            librados::Rados* lr);
   int choose_oid(const rgw_bucket_shard& bs);
   int add_entry(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
-               const rgw::bucket_log_layout_generation& gen, int shard_id);
+               const rgw::bucket_log_layout_generation& gen, int shard_id,
+               optional_yield y);
   int get_log_shard_id(rgw_bucket& bucket, int shard_id);
   int list_entries(const DoutPrefixProvider *dpp, int shard, int max_entries,
                   std::vector<rgw_data_change_log_entry>& entries,
-                  std::string_view marker,
-                  std::string* out_marker, bool* truncated);
-  int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker);
+                  std::string_view marker, std::string* out_marker,
+                  bool* truncated, optional_yield y);
+  int trim_entries(const DoutPrefixProvider *dpp, int shard_id,
+                  std::string_view marker, optional_yield y);
   int trim_entries(const DoutPrefixProvider *dpp, int shard_id, std::string_view marker,
                   librados::AioCompletion* c); // :(
-  int get_info(const DoutPrefixProvider *dpp, int shard_id, RGWDataChangesLogInfo *info);
+  int get_info(const DoutPrefixProvider *dpp, int shard_id,
+              RGWDataChangesLogInfo *info, optional_yield y);
 
   using LogMarker = RGWDataChangesLogMarker;
 
   int list_entries(const DoutPrefixProvider *dpp, int max_entries,
                   std::vector<rgw_data_change_log_entry>& entries,
-                  LogMarker& marker, bool* ptruncated);
+                  LogMarker& marker, bool* ptruncated,
+                  optional_yield y);
 
   void mark_modified(int shard_id, const rgw_bucket_shard& bs, uint64_t gen);
   auto read_clear_modified() {
@@ -333,7 +341,9 @@ public:
 
 
   int change_format(const DoutPrefixProvider *dpp, log_type type, optional_yield y);
-  int trim_generations(const DoutPrefixProvider *dpp, std::optional<uint64_t>& through);
+  int trim_generations(const DoutPrefixProvider *dpp,
+                      std::optional<uint64_t>& through,
+                      optional_yield y);
 };
 
 class RGWDataChangesBE : public boost::intrusive_ref_counter<RGWDataChangesBE> {
@@ -362,19 +372,23 @@ public:
                       const std::string& key,
                       ceph::buffer::list&& entry,
                       entries& out) = 0;
-  virtual int push(const DoutPrefixProvider *dpp, int index, entries&& items) = 0;
+  virtual int push(const DoutPrefixProvider *dpp, int index, entries&& items,
+                  optional_yield y) = 0;
   virtual int push(const DoutPrefixProvider *dpp, int index, ceph::real_time now,
-                  const std::string& key,
-                  ceph::buffer::list&& bl) = 0;
+                  const std::string& key, ceph::buffer::list&& bl,
+                  optional_yield y) = 0;
   virtual int list(const DoutPrefixProvider *dpp, int shard, int max_entries,
                   std::vector<rgw_data_change_log_entry>& entries,
                   std::optional<std::string_view> marker,
-                  std::string* out_marker, bool* truncated) = 0;
-  virtual int get_info(const DoutPrefixProvider *dpp, int index, RGWDataChangesLogInfo *info) = 0;
-  virtual int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker) = 0;
-  virtual int trim(const DoutPrefixProvider *dpp, int index, std::string_view marker,
-                  librados::AioCompletion* c) = 0;
+                  std::string* out_marker, bool* truncated,
+                  optional_yield y) = 0;
+  virtual int get_info(const DoutPrefixProvider *dpp, int index,
+                      RGWDataChangesLogInfo *info, optional_yield y) = 0;
+  virtual int trim(const DoutPrefixProvider *dpp, int index,
+                  std::string_view marker, optional_yield y) = 0;
+  virtual int trim(const DoutPrefixProvider *dpp, int index,
+                  std::string_view marker, librados::AioCompletion* c) = 0;
   virtual std::string_view max_marker() const = 0;
   // 1 on empty, 0 on non-empty, negative on error.
-  virtual int is_empty(const DoutPrefixProvider *dpp) = 0;
+  virtual int is_empty(const DoutPrefixProvider *dpp, optional_yield y) = 0;
 };
index 6779e519c466db41c6d30903be997f6272c45de1..86177fb91eb5ef1fbdb076e95608db97c8a5b08e 100644 (file)
@@ -713,13 +713,13 @@ int RGWRados::get_max_chunk_size(const rgw_placement_rule& placement_rule, const
 void add_datalog_entry(const DoutPrefixProvider* dpp,
                        RGWDataChangesLog* datalog,
                        const RGWBucketInfo& bucket_info,
-                       uint32_t shard_id)
+                       uint32_t shard_id, optional_yield y)
 {
   const auto& logs = bucket_info.layout.logs;
   if (logs.empty()) {
     return;
   }
-  int r = datalog->add_entry(dpp, bucket_info, logs.back(), shard_id);
+  int r = datalog->add_entry(dpp, bucket_info, logs.back(), shard_id, y);
   if (r < 0) {
     ldpp_dout(dpp, -1) << "ERROR: failed writing data log" << dendl;
   } // datalog error is not fatal
@@ -899,7 +899,9 @@ void RGWIndexCompletionManager::process()
         continue;
       }
 
-      add_datalog_entry(&dpp, store->svc.datalog_rados, bucket_info, bs.shard_id);
+      // This null_yield can stay, for now, since we're in our own thread
+      add_datalog_entry(&dpp, store->svc.datalog_rados, bucket_info,
+                       bs.shard_id, null_yield);
     }
   }
 }
@@ -3209,7 +3211,8 @@ int RGWRados::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
   r = index_op->complete(dpp, poolid, epoch, size, accounted_size,
                         meta.set_mtime, etag, content_type,
                         storage_class, &acl_bl,
-                        meta.category, meta.remove_objs, meta.user_data, meta.appendable);
+                        meta.category, meta.remove_objs, y,
+                        meta.user_data, meta.appendable);
   tracepoint(rgw_rados, complete_exit, req_id.c_str());
   if (r < 0)
     goto done_cancel;
@@ -3249,12 +3252,12 @@ int RGWRados::Object::Write::_do_write_meta(const DoutPrefixProvider *dpp,
   }
   else {
     store->quota_handler->update_stats(meta.owner, obj.bucket, (orig_exists ? 0 : 1),
-                                     accounted_size, orig_size);  
+                                     accounted_size, orig_size);
   }
   return 0;
 
 done_cancel:
-  int ret = index_op->cancel(dpp, meta.remove_objs);
+  int ret = index_op->cancel(dpp, meta.remove_objs, y);
   if (ret < 0) {
     ldpp_dout(dpp, 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
   }
@@ -5181,7 +5184,7 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
     }
 
     add_datalog_entry(dpp, store->svc.datalog_rados,
-                      target->get_bucket_info(), bs->shard_id);
+                      target->get_bucket_info(), bs->shard_id, y);
 
     return 0;
   }
@@ -5256,7 +5259,7 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
 
   RGWRados::Bucket bop(store, bucket_info);
   RGWRados::Bucket::UpdateIndex index_op(&bop, obj);
-  
+
   index_op.set_zones_trace(params.zones_trace);
   index_op.set_bilog_flags(params.bilog_flags);
 
@@ -5279,15 +5282,15 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvi
       tombstone_entry entry{*state};
       obj_tombstone_cache->add(obj, entry);
     }
-    r = index_op.complete_del(dpp, poolid, ioctx.get_last_version(), state->mtime, params.remove_objs);
-    
+    r = index_op.complete_del(dpp, poolid, ioctx.get_last_version(), state->mtime, params.remove_objs, y);
+
     int ret = target->complete_atomic_modification(dpp);
     if (ret < 0) {
       ldpp_dout(dpp, 0) << "ERROR: complete_atomic_modification returned ret=" << ret << dendl;
     }
     /* other than that, no need to propagate error */
   } else {
-    int ret = index_op.cancel(dpp, params.remove_objs);
+    int ret = index_op.cancel(dpp, params.remove_objs, y);
     if (ret < 0) {
       ldpp_dout(dpp, 0) << "ERROR: index_op.cancel() returned ret=" << ret << dendl;
     }
@@ -5360,7 +5363,8 @@ int RGWRados::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& o
   return 0;
 }
 
-int RGWRados::delete_obj_index(const rgw_obj& obj, ceph::real_time mtime, const DoutPrefixProvider *dpp)
+int RGWRados::delete_obj_index(const rgw_obj& obj, ceph::real_time mtime,
+                              const DoutPrefixProvider *dpp, optional_yield y)
 {
   std::string oid, key;
   get_obj_bucket_and_oid_loc(obj, oid, key);
@@ -5375,7 +5379,7 @@ int RGWRados::delete_obj_index(const rgw_obj& obj, ceph::real_time mtime, const
   RGWRados::Bucket bop(this, bucket_info);
   RGWRados::Bucket::UpdateIndex index_op(&bop, obj);
 
-  return index_op.complete_del(dpp, -1 /* pool */, 0, mtime, NULL);
+  return index_op.complete_del(dpp, -1 /* pool */, 0, mtime, nullptr, y);
 }
 
 static void generate_fake_tag(const DoutPrefixProvider *dpp, rgw::sal::Driver* store, map<string, bufferlist>& attrset, RGWObjManifest& manifest, bufferlist& manifest_bl, bufferlist& tag_bl)
@@ -6013,9 +6017,9 @@ int RGWRados::set_attrs(const DoutPrefixProvider *dpp, void *ctx, RGWBucketInfo&
       int64_t poolid = ioctx.get_id();
       r = index_op.complete(dpp, poolid, epoch, state->size, state->accounted_size,
                             mtime, etag, content_type, storage_class, &acl_bl,
-                            RGWObjCategory::Main, NULL);
+                            RGWObjCategory::Main, nullptr, y);
     } else {
-      int ret = index_op.cancel(dpp, nullptr);
+      int ret = index_op.cancel(dpp, nullptr, y);
       if (ret < 0) {
         ldpp_dout(dpp, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl;
       }
@@ -6253,7 +6257,9 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64
                                             const string& content_type, const string& storage_class,
                                             bufferlist *acl_bl,
                                             RGWObjCategory category,
-                                            list<rgw_obj_index_key> *remove_objs, const string *user_data,
+                                            list<rgw_obj_index_key> *remove_objs,
+                                           optional_yield y,
+                                           const string *user_data,
                                             bool appendable)
 {
   if (blind) {
@@ -6293,7 +6299,7 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64
   ret = store->cls_obj_complete_add(*bs, obj, optag, poolid, epoch, ent, category, remove_objs, bilog_flags, zones_trace);
 
   add_datalog_entry(dpp, store->svc.datalog_rados,
-                    target->bucket_info, bs->shard_id);
+                    target->bucket_info, bs->shard_id, y);
 
   return ret;
 }
@@ -6301,7 +6307,8 @@ int RGWRados::Bucket::UpdateIndex::complete(const DoutPrefixProvider *dpp, int64
 int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp,
                                                 int64_t poolid, uint64_t epoch,
                                                 real_time& removed_mtime,
-                                                list<rgw_obj_index_key> *remove_objs)
+                                                list<rgw_obj_index_key> *remove_objs,
+                                               optional_yield y)
 {
   if (blind) {
     return 0;
@@ -6318,16 +6325,17 @@ int RGWRados::Bucket::UpdateIndex::complete_del(const DoutPrefixProvider *dpp,
   ret = store->cls_obj_complete_del(*bs, optag, poolid, epoch, obj, removed_mtime, remove_objs, bilog_flags, zones_trace);
 
   add_datalog_entry(dpp, store->svc.datalog_rados,
-                    target->bucket_info, bs->shard_id);
+                    target->bucket_info, bs->shard_id, y);
 
   return ret;
 }
 
 
 int RGWRados::Bucket::UpdateIndex::cancel(const DoutPrefixProvider *dpp,
-                                          list<rgw_obj_index_key> *remove_objs)
+                                          list<rgw_obj_index_key> *remove_objs,
+                                         optional_yield y)
 {
-  if (blind) {
+    if (blind) {
     return 0;
   }
   RGWRados *store = target->get_store();
@@ -6343,7 +6351,7 @@ int RGWRados::Bucket::UpdateIndex::cancel(const DoutPrefixProvider *dpp,
    * have no way to tell that they're all caught up
    */
   add_datalog_entry(dpp, store->svc.datalog_rados,
-                    target->bucket_info, bs->shard_id);
+                    target->bucket_info, bs->shard_id, y);
 
   return ret;
 }
@@ -6998,6 +7006,7 @@ int RGWRados::bucket_index_link_olh(const DoutPrefixProvider *dpp, RGWBucketInfo
                                     struct rgw_bucket_dir_entry_meta *meta,
                                     uint64_t olh_epoch,
                                     real_time unmod_since, bool high_precision_time,
+                                   optional_yield y,
                                     rgw_zone_set *_zones_trace, bool log_data_change)
 {
   rgw_rados_ref ref;
@@ -7032,7 +7041,7 @@ int RGWRados::bucket_index_link_olh(const DoutPrefixProvider *dpp, RGWBucketInfo
     return r;
   }
 
-  add_datalog_entry(dpp, svc.datalog_rados, bucket_info, bs.shard_id);
+  add_datalog_entry(dpp, svc.datalog_rados, bucket_info, bs.shard_id, y);
 
   return 0;
 }
@@ -7496,7 +7505,7 @@ int RGWRados::set_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx,
     }
     ret = bucket_index_link_olh(dpp, bucket_info, *state, target_obj->get_obj(),
                                delete_marker, op_tag, meta, olh_epoch, unmod_since,
-                               high_precision_time, zones_trace, log_data_change);
+                               high_precision_time, y, zones_trace, log_data_change);
     if (ret < 0) {
       ldpp_dout(dpp, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
       if (ret == -ECANCELED) {
@@ -9374,7 +9383,7 @@ int RGWRados::check_disk_state(const DoutPrefixProvider *dpp,
 
       if (loc.key.ns == RGW_OBJ_NS_MULTIPART) {
        ldout_bitx(bitx, dpp, 10) << "INFO: " << __func__ << " removing manifest part from index loc=" << loc << dendl_bitx;
-       r = delete_obj_index(loc, astate->mtime, dpp);
+       r = delete_obj_index(loc, astate->mtime, dpp, y);
        if (r < 0) {
          ldout_bitx(bitx, dpp, 0) <<
            "WARNING: " << __func__ << ": delete_obj_index returned r=" << r << dendl_bitx;
@@ -9662,7 +9671,7 @@ int RGWRados::delete_obj_aio(const DoutPrefixProvider *dpp, const rgw_obj& obj,
   handles.push_back(c);
 
   if (keep_index_consistent) {
-    ret = delete_obj_index(obj, astate->mtime, dpp);
+    ret = delete_obj_index(obj, astate->mtime, dpp, y);
     if (ret < 0) {
       ldpp_dout(dpp, -1) << "ERROR: failed to delete obj index with ret=" << ret << dendl;
       return ret;
index a3258ac8b72392a3ba32299f9380a51c3e244319..9588d5794ee7a1b5c8677344b97a15ae86d19b13 100644 (file)
@@ -959,13 +959,18 @@ public:
                    const std::string& etag, const std::string& content_type,
                    const std::string& storage_class,
                    bufferlist *acl_bl, RGWObjCategory category,
-                  std::list<rgw_obj_index_key> *remove_objs, const std::string *user_data = nullptr, bool appendable = false);
+                  std::list<rgw_obj_index_key> *remove_objs,
+                  optional_yield y,
+                  const std::string *user_data = nullptr,
+                  bool appendable = false);
       int complete_del(const DoutPrefixProvider *dpp,
                        int64_t poolid, uint64_t epoch,
                        ceph::real_time& removed_mtime, /* mtime of removed object */
-                       std::list<rgw_obj_index_key> *remove_objs);
+                       std::list<rgw_obj_index_key> *remove_objs,
+                      optional_yield y);
       int cancel(const DoutPrefixProvider *dpp,
-                 std::list<rgw_obj_index_key> *remove_objs);
+                 std::list<rgw_obj_index_key> *remove_objs,
+                optional_yield y);
 
       const std::string *get_optag() { return &optag; }
 
@@ -1232,7 +1237,8 @@ public:
   int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj);
 
   /** Remove an object from the bucket index */
-  int delete_obj_index(const rgw_obj& obj, ceph::real_time mtime, const DoutPrefixProvider *dpp);
+  int delete_obj_index(const rgw_obj& obj, ceph::real_time mtime,
+                      const DoutPrefixProvider *dpp, optional_yield y);
 
   /**
    * Set an attr on an object.
@@ -1302,6 +1308,7 @@ public:
                             const std::string& op_tag, struct rgw_bucket_dir_entry_meta *meta,
                             uint64_t olh_epoch,
                             ceph::real_time unmod_since, bool high_precision_time,
+                           optional_yield y,
                             rgw_zone_set *zones_trace = nullptr,
                             bool log_data_change = false);
   int bucket_index_unlink_instance(const DoutPrefixProvider *dpp,
index b2dec7af1c86e1645e8092b8d3df837f6cb4a94d..3b3ae3f975b8cb2927ccc0d3a918c2a722f58679 100644 (file)
@@ -648,7 +648,9 @@ static int commit_reshard(rgw::sal::RadosStore* store,
     // generation, and eventually transition to the next
     // TODO: use a log layout to support types other than BucketLogType::InIndex
     for (uint32_t shard_id = 0; shard_id < prev.current_index.layout.normal.num_shards; ++shard_id) {
-      ret = store->svc()->datalog_rados->add_entry(dpp, bucket_info, prev.logs.back(), shard_id);
+      // This null_yield can stay, for now, since we're in our own thread
+      ret = store->svc()->datalog_rados->add_entry(dpp, bucket_info, prev.logs.back(), shard_id,
+                                                  null_yield);
       if (ret < 0) {
         ldpp_dout(dpp, 1) << "WARNING: failed writing data log (bucket_info.bucket="
         << bucket_info.bucket << ", shard_id=" << shard_id << "of generation="
index 3563cf051bd7b83ba6672b928b32e44481056b1d..e0afce0b2ff0e22821d62e34f30b4c209d1563af 100644 (file)
@@ -688,10 +688,9 @@ void RGWOp_DATALog_List::execute(optional_yield y) {
 
   // Note that last_marker is updated to be the marker of the last
   // entry listed
-  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados->list_entries(this, shard_id,
-                                                    max_entries, entries,
-                                                    marker, &last_marker,
-                                                    &truncated);
+  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+    datalog_rados->list_entries(this, shard_id, max_entries, entries,
+                               marker, &last_marker, &truncated, y);
 }
 
 void RGWOp_DATALog_List::send_response() {
@@ -749,7 +748,8 @@ void RGWOp_DATALog_ShardInfo::execute(optional_yield y) {
     return;
   }
 
-  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados->get_info(this, shard_id, &info);
+  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+    datalog_rados->get_info(this, shard_id, &info, y);
 }
 
 void RGWOp_DATALog_ShardInfo::send_response() {
@@ -898,7 +898,8 @@ void RGWOp_DATALog_Delete::execute(optional_yield y) {
     return;
   }
 
-  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados->trim_entries(this, shard_id, marker);
+  op_ret = static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+    datalog_rados->trim_entries(this, shard_id, marker, y);
 }
 
 // not in header to avoid pulling in rgw_sync.h
index 7aad430d54cf5a748b907db7b150dbb274bda5fa..f8227a29ec43598ecbe747c9cfa626e226421352 100644 (file)
@@ -9975,10 +9975,11 @@ next:
       if (specified_shard_id) {
         ret = datalog_svc->list_entries(dpp(), shard_id, max_entries - count,
                                        entries, marker,
-                                       &marker, &truncated);
+                                       &marker, &truncated,
+                                       null_yield);
       } else {
         ret = datalog_svc->list_entries(dpp(), max_entries - count, entries,
-                                       log_marker, &truncated);
+                                       log_marker, &truncated, null_yield);
       }
       if (ret < 0) {
         cerr << "ERROR: datalog_svc->list_entries(): " << cpp_strerror(-ret) << std::endl;
@@ -10009,7 +10010,8 @@ next:
       list<cls_log_entry> entries;
 
       RGWDataChangesLogInfo info;
-      static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados->get_info(dpp(), i, &info);
+      static_cast<rgw::sal::RadosStore*>(driver)->svc()->
+       datalog_rados->get_info(dpp(), i, &info, null_yield);
 
       ::encode_json("info", info, formatter.get());
 
@@ -10072,7 +10074,7 @@ next:
     }
 
     auto datalog = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados;
-    ret = datalog->trim_entries(dpp(), shard_id, marker);
+    ret = datalog->trim_entries(dpp(), shard_id, marker, null_yield);
 
     if (ret < 0 && ret != -ENODATA) {
       cerr << "ERROR: trim_entries(): " << cpp_strerror(-ret) << std::endl;
@@ -10096,7 +10098,7 @@ next:
   if (opt_cmd == OPT::DATALOG_PRUNE) {
     auto datalog = static_cast<rgw::sal::RadosStore*>(driver)->svc()->datalog_rados;
     std::optional<uint64_t> through;
-    ret = datalog->trim_generations(dpp(), through);
+    ret = datalog->trim_generations(dpp(), through, null_yield);
 
     if (ret < 0) {
       cerr << "ERROR: trim_generations(): " << cpp_strerror(-ret) << std::endl;
index 690825d530e7ba6d6708bd12a395c9c55c4cf521..bd811e1623aa94128919f6ea23c382d7a4517f77 100644 (file)
@@ -39,6 +39,6 @@ public:
 
   virtual int handle_overwrite(const DoutPrefixProvider *dpp,
                                const RGWBucketInfo& info,
-                               const RGWBucketInfo& orig_info) = 0;
+                               const RGWBucketInfo& orig_info,
+                               optional_yield y) = 0;
 };
-
index 8e7cc5ae260dbaa90f2e93bc4132cbe2ab003d74..c1725f0e369e3f25f46c25145fa8971d8fa19074 100644 (file)
@@ -465,9 +465,10 @@ int RGWSI_BucketIndex_RADOS::get_reshard_status(const DoutPrefixProvider *dpp, c
   return 0;
 }
 
-int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp, 
+int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp,
                                               const RGWBucketInfo& info,
-                                              const RGWBucketInfo& orig_info)
+                                              const RGWBucketInfo& orig_info,
+                                             optional_yield y)
 {
   bool new_sync_enabled = info.datasync_flag_enabled();
   bool old_sync_enabled = orig_info.datasync_flag_enabled();
@@ -496,7 +497,7 @@ int RGWSI_BucketIndex_RADOS::handle_overwrite(const DoutPrefixProvider *dpp,
   }
 
   for (int i = 0; i < shards_num; ++i) {
-    ret = svc.datalog_rados->add_entry(dpp, info, bilog, i);
+    ret = svc.datalog_rados->add_entry(dpp, info, bilog, i, y);
     if (ret < 0) {
       ldpp_dout(dpp, -1) << "ERROR: failed writing data log (info.bucket=" << info.bucket << ", shard_id=" << i << ")" << dendl;
     } // datalog error is not fatal
index b1fc97d459f8ae669f4d061ec95a859e03fdf144..b541449963a90709e0935897bc07eb42c55e0c3d 100644 (file)
@@ -134,7 +134,8 @@ public:
                          std::list<cls_rgw_bucket_instance_entry> *status);
 
   int handle_overwrite(const DoutPrefixProvider *dpp, const RGWBucketInfo& info,
-                       const RGWBucketInfo& orig_info) override;
+                       const RGWBucketInfo& orig_info,
+                      optional_yield y) override;
 
   int open_bucket_index_shard(const DoutPrefixProvider *dpp,
                               const RGWBucketInfo& bucket_info,
index 01db1c36eb0c009bfbcb4c586b4788c367535e64..08a5280150354007d27f3cd2197d8149d8a4916d 100644 (file)
@@ -527,7 +527,7 @@ int RGWSI_Bucket_SObj::store_bucket_instance_info(RGWSI_Bucket_BI_Ctx& ctx,
   }
 
   if (orig_info && *orig_info && !exclusive) {
-    int r = svc.bi->handle_overwrite(dpp, info, *(orig_info.value()));
+    int r = svc.bi->handle_overwrite(dpp, info, *(orig_info.value()), y);
     if (r < 0) {
       ldpp_dout(dpp, 0) << "ERROR: " << __func__ << "(): svc.bi->handle_overwrite() of key=" << key << " returned r=" << r << dendl;
       return r;