]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Normalize DataLog function signatures
authorAdam C. Emerson <aemerson@redhat.com>
Tue, 12 May 2020 22:50:14 +0000 (18:50 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Wed, 9 Sep 2020 02:09:40 +0000 (22:09 -0400)
Pull out the stuff we aren't using so FIFO and Timelog Datalog can use
the same interface.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_rest_log.cc
src/rgw/rgw_rest_log.h
src/rgw/services/svc_datalog_rados.cc
src/rgw/services/svc_datalog_rados.h

index bf28b6f6133a7b60b4833b374225a8e60004465e..c16899f4426b2908f31d4f496141168a3db73218 100644 (file)
@@ -8637,12 +8637,16 @@ next:
     RGWDataChangesLog::LogMarker log_marker;
 
     do {
-      list<rgw_data_change_log_entry> entries;
+      std::vector<rgw_data_change_log_entry> entries;
       if (specified_shard_id) {
-        ret = datalog_svc->list_entries(shard_id, {}, {}, max_entries - count,
-                                       entries, marker, &marker, &truncated);
+        ret = datalog_svc->list_entries(shard_id, max_entries - count,
+                                       entries,
+                                       marker.empty() ?
+                                       std::nullopt :
+                                       std::make_optional(marker),
+                                       &marker, &truncated);
       } else {
-        ret = datalog_svc->list_entries({}, {}, max_entries - count, entries,
+        ret = datalog_svc->list_entries(max_entries - count, entries,
                                        log_marker, &truncated);
       }
       if (ret < 0) {
@@ -8652,8 +8656,7 @@ next:
 
       count += entries.size();
 
-      for (list<rgw_data_change_log_entry>::iterator iter = entries.begin(); iter != entries.end(); ++iter) {
-        rgw_data_change_log_entry& entry = *iter;
+      for (const auto& entry : entries) {
         if (!extra_info) {
           encode_json("entry", entry.entry, formatter.get());
         } else {
@@ -8735,7 +8738,7 @@ next:
     // loop until -ENODATA
     do {
       auto datalog = store->svc()->datalog_rados;
-      ret = datalog->trim_entries(shard_id, {}, {}, {}, marker);
+      ret = datalog->trim_entries(shard_id, marker);
     } while (ret == 0);
 
     if (ret < 0 && ret != -ENODATA) {
index 0e6765b48c2baa7fd180a945b78dfcadcb3bc855..e1120a291eba32d100b1803c16662f5f1786d384 100644 (file)
 
 #include <boost/format.hpp>
 
+#undef FMT_HEADER_ONLY
+#define FMT_HEADER_ONLY 1
+#include "fmt/format.h"
+
 #include "common/errno.h"
 #include "common/ceph_json.h"
 #include "include/scope_guard.h"
@@ -2037,9 +2041,7 @@ RGWDataChangesLog::RGWDataChangesLog(RGWSI_Zone *zone_svc, RGWSI_Cls *cls_svc)
   }
 
   for (int i = 0; i < num_shards; i++) {
-    char buf[16];
-    snprintf(buf, sizeof(buf), "%s.%d", prefix.c_str(), i);
-    oids[i] = buf;
+    oids[i] = get_oid(i);
   }
 
   renew_thread = new ChangesRenewThread(cct, this);
@@ -2158,6 +2160,15 @@ bool RGWDataChangesLog::filter_bucket(const rgw_bucket& bucket, optional_yield y
   return bucket_filter(bucket, y);
 }
 
+std::string RGWDataChangesLog::get_oid(int i) const {
+  std::string_view prefix = cct->_conf->rgw_data_log_obj_prefix;
+  if (prefix.empty()) {
+    prefix = "data_log"sv;
+  }
+  return fmt::format("{}.{}", prefix, i);
+}
+
+
 int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id) {
   auto& bucket = bucket_info.bucket;
 
@@ -2185,7 +2196,9 @@ int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id)
 
   status->lock.lock();
 
-  ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name << " shard_id=" << shard_id << " now=" << now << " cur_expiration=" << status->cur_expiration << dendl;
+  ldout(cct, 20) << "RGWDataChangesLog::add_entry() bucket.name=" << bucket.name
+                << " shard_id=" << shard_id << " now=" << now
+                << " cur_expiration=" << status->cur_expiration << dendl;
 
   if (now < status->cur_expiration) {
     /* no need to send, recently completed */
@@ -2228,7 +2241,7 @@ int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id)
     expiration += ceph::make_timespan(cct->_conf->rgw_data_log_window);
 
     status->lock.unlock();
-  
+
     bufferlist bl;
     rgw_data_change change;
     change.entity_type = ENTITY_TYPE_BUCKET;
@@ -2261,24 +2274,22 @@ int RGWDataChangesLog::add_entry(const RGWBucketInfo& bucket_info, int shard_id)
   return ret;
 }
 
-int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
-                                   list<rgw_data_change_log_entry>& entries,
-                                   const string& marker,
-                                   string *out_marker,
-                                   bool *truncated) {
-  if (shard >= num_shards)
-    return -EINVAL;
-
-  list<cls_log_entry> log_entries;
+int RGWDataChangesLog::list_entries(int shard, int max_entries,
+                                   std::vector<rgw_data_change_log_entry>& entries,
+                                   std::optional<std::string_view> marker,
+                                   std::string* out_marker, bool* truncated)
+{
+  assert(shard < num_shards);
+  std::list<cls_log_entry> log_entries;
 
-  int ret = svc.cls->timelog.list(oids[shard], start_time, end_time,
-                                max_entries, log_entries, marker,
-                                out_marker, truncated, null_yield);
+  int ret = svc.cls->timelog.list(oids[shard], {}, {},
+                                 max_entries, log_entries,
+                                 std::string(marker.value_or("")),
+                                 out_marker, truncated, null_yield);
   if (ret < 0)
     return ret;
 
-  list<cls_log_entry>::iterator iter;
-  for (iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
+  for (auto iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
     rgw_data_change_log_entry log_entry;
     log_entry.log_id = iter->id;
     real_time rt = iter->timestamp.to_real_time();
@@ -2296,15 +2307,17 @@ int RGWDataChangesLog::list_entries(int shard, const real_time& start_time, cons
   return 0;
 }
 
-int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
-             list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated) {
+int RGWDataChangesLog::list_entries(int max_entries,
+                                   std::vector<rgw_data_change_log_entry>& entries,
+                                   LogMarker& marker, bool *ptruncated)
+{
   bool truncated;
   entries.clear();
 
   for (; marker.shard < num_shards && (int)entries.size() < max_entries;
-       marker.shard++, marker.marker.clear()) {
-    int ret = list_entries(marker.shard, start_time, end_time, max_entries - entries.size(), entries,
-                          marker.marker, NULL, &truncated);
+       marker.shard++, marker.marker.reset()) {
+    int ret = list_entries(marker.shard, max_entries - entries.size(),
+                          entries, marker.marker, NULL, &truncated);
     if (ret == -ENOENT) {
       continue;
     }
@@ -2324,9 +2337,7 @@ int RGWDataChangesLog::list_entries(const real_time& start_time, const real_time
 
 int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
 {
-  if (shard_id >= num_shards)
-    return -EINVAL;
-
+  assert(shard_id < num_shards);
   string oid = oids[shard_id];
 
   cls_log_header header;
@@ -2341,14 +2352,19 @@ int RGWDataChangesLog::get_info(int shard_id, RGWDataChangesLogInfo *info)
   return 0;
 }
 
-int RGWDataChangesLog::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
-                                    const string& start_marker, const string& end_marker)
+int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker)
 {
-  if (shard_id > num_shards)
-    return -EINVAL;
+  assert(shard_id < num_shards);
+  return svc.cls->timelog.trim(oids[shard_id], {}, {},
+                               {}, std::string(marker), nullptr, null_yield);
+}
 
-  return svc.cls->timelog.trim(oids[shard_id], start_time, end_time,
-                               start_marker, end_marker, nullptr, null_yield);
+int RGWDataChangesLog::trim_entries(int shard_id, std::string_view marker,
+                                   librados::AioCompletion* c)
+{
+  assert(shard_id < num_shards);
+  return svc.cls->timelog.trim(oids[shard_id], {}, {},
+                              {}, std::string(marker), c, null_yield);
 }
 
 bool RGWDataChangesLog::going_down()
@@ -2844,8 +2860,8 @@ public:
   RGWBucketInstanceMetadataHandler() {}
 
   void init(RGWSI_Zone *zone_svc,
-           RGWSI_Bucket *bucket_svc,
-           RGWSI_BucketIndex *bi_svc) {
+           RGWSI_Bucket *bucket_svc,
+           RGWSI_BucketIndex *bi_svc) override {
     base_init(bucket_svc->ctx(),
               bucket_svc->get_bi_be_handler().get());
     svc.zone = zone_svc;
index cd0ed2641a2920b2b95fb196fa440cd8f3f7810c..5e862bcf46bcd4a4a09ae124fea4275e81f77af0 100644 (file)
@@ -484,7 +484,7 @@ struct BucketChangeObserver;
 
 struct RGWDataChangesLogMarker {
   int shard = 0;
-  std::string marker;
+  std::optional<std::string> marker;
 
   RGWDataChangesLogMarker() = default;
 };
@@ -550,23 +550,24 @@ public:
   ~RGWDataChangesLog();
 
   int choose_oid(const rgw_bucket_shard& bs);
-  const std::string& get_oid(int shard_id) const { return oids[shard_id]; }
+  std::string get_oid(int shard_id) const;
   int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
   int get_log_shard_id(rgw_bucket& bucket, int shard_id);
   int renew_entries();
-  int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
-                  list<rgw_data_change_log_entry>& entries,
-                  const string& marker,
-                  string *out_marker,
-                  bool *truncated);
-  int trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
-                   const string& start_marker, const string& end_marker);
+  int list_entries(int shard, int max_entries,
+                  std::vector<rgw_data_change_log_entry>& entries,
+                  std::optional<std::string_view> marker,
+                  std::string* out_marker, bool* truncated);
+  int trim_entries(int shard_id, std::string_view marker);
+  int trim_entries(int shard_id, std::string_view marker,
+                  librados::AioCompletion* c); // :(
   int get_info(int shard_id, RGWDataChangesLogInfo *info);
 
   using LogMarker = RGWDataChangesLogMarker;
 
-  int list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
-               list<rgw_data_change_log_entry>& entries, LogMarker& marker, bool *ptruncated);
+  int list_entries(int max_entries,
+                  std::vector<rgw_data_change_log_entry>& entries,
+                  LogMarker& marker, bool* ptruncated);
 
   void mark_modified(int shard_id, const rgw_bucket_shard& bs);
   void read_clear_modified(map<int, set<string> > &modified);
index d89ae20c99bf8dc327c18baa5446d0a7d893584c..c3ed31b4f582cf3e5a88356cb058cb1ec02442b8 100644 (file)
@@ -596,7 +596,7 @@ void RGWOp_DATALog_List::execute() {
 
   // Note that last_marker is updated to be the marker of the last
   // entry listed
-  http_ret = store->svc()->datalog_rados->list_entries(shard_id, {}, {},
+  http_ret = store->svc()->datalog_rados->list_entries(shard_id,
                                                       max_entries, entries,
                                                       marker, &last_marker,
                                                       &truncated);
@@ -615,9 +615,7 @@ void RGWOp_DATALog_List::send_response() {
   s->formatter->dump_bool("truncated", truncated);
   {
     s->formatter->open_array_section("entries");
-    for (list<rgw_data_change_log_entry>::iterator iter = entries.begin();
-        iter != entries.end(); ++iter) {
-      rgw_data_change_log_entry& entry = *iter;
+    for (const auto& entry : entries) {
       if (!extra_info) {
         encode_json("entry", entry.entry, s->formatter);
       } else {
@@ -757,8 +755,7 @@ void RGWOp_DATALog_Delete::execute() {
     return;
   }
 
-  http_ret = store->svc()->datalog_rados->trim_entries(shard_id, {}, {}, {},
-                                                      marker);
+  http_ret = store->svc()->datalog_rados->trim_entries(shard_id, marker);
 }
 
 // not in header to avoid pulling in rgw_sync.h
index 1b6cb7217f181bf236f13f4bbca9bc2236dd6fef..2642c95fc040e9b9debe1244097b74fe22d20bb2 100644 (file)
@@ -194,8 +194,8 @@ public:
 };
 
 class RGWOp_DATALog_List : public RGWRESTOp {
-  list<rgw_data_change_log_entry> entries;
-  string last_marker;
+  std::vector<rgw_data_change_log_entry> entries;
+  std::string last_marker;
   bool truncated;
   bool extra_info;
 public:
index 3371c6e1b9343426fca9a0f87739c2d4222499fc..e3274d3f0cc462d266f20fdeebe98e2e6ef50c8a 100644 (file)
@@ -46,7 +46,7 @@ int RGWSI_DataLog_RADOS::get_log_shard_id(rgw_bucket& bucket, int shard_id)
   return log->get_log_shard_id(bucket, shard_id);
 }
 
-const std::string& RGWSI_DataLog_RADOS::get_oid(int shard_id) const
+std::string RGWSI_DataLog_RADOS::get_oid(int shard_id) const
 {
   return log->get_oid(shard_id);
 }
@@ -61,25 +61,23 @@ int RGWSI_DataLog_RADOS::add_entry(const RGWBucketInfo& bucket_info, int shard_i
   return log->add_entry(bucket_info, shard_id);
 }
 
-int RGWSI_DataLog_RADOS::list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
-                 list<rgw_data_change_log_entry>& entries,
-                 const string& marker,
-                 string *out_marker,
-                 bool *truncated)
+int RGWSI_DataLog_RADOS::list_entries(int shard, int max_entries,
+                                     std::vector<rgw_data_change_log_entry>& entries,
+                                     std::optional<std::string_view> marker,
+                                     std::string* out_marker,
+                                     bool* truncated)
 {
-  return log->list_entries(shard, start_time, end_time, max_entries,
+  return log->list_entries(shard, max_entries,
                            entries, marker, out_marker, truncated);
 }
 
-int RGWSI_DataLog_RADOS::list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
-                                     list<rgw_data_change_log_entry>& entries, RGWDataChangesLogMarker& marker, bool *ptruncated)
+int RGWSI_DataLog_RADOS::list_entries(int max_entries,
+                                     std::vector<rgw_data_change_log_entry>& entries, RGWDataChangesLogMarker& marker, bool *ptruncated)
 {
-  return log->list_entries(start_time, end_time, max_entries,
-                          entries, marker, ptruncated);
+  return log->list_entries(max_entries, entries, marker, ptruncated);
 }
 
-int RGWSI_DataLog_RADOS::trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
-                                      const string& start_marker, const string& end_marker)
+int RGWSI_DataLog_RADOS::trim_entries(int shard_id, std::string_view marker)
 {
-  return log->trim_entries(shard_id, start_time, end_time, start_marker, end_marker);
+  return log->trim_entries(shard_id, marker);
 }
index 5317035c09fef35c0a710d28cee1c8f3b1d0e7f3..78a6860fad4c79046ae5e14e3e6809749bbe2c9f 100644 (file)
@@ -54,19 +54,17 @@ public:
   void set_observer(rgw::BucketChangeObserver *observer);
 
   int get_log_shard_id(rgw_bucket& bucket, int shard_id);
-  const std::string& get_oid(int shard_id) const;
+  std::string get_oid(int shard_id) const;
 
   int get_info(int shard_id, RGWDataChangesLogInfo *info);
 
   int add_entry(const RGWBucketInfo& bucket_info, int shard_id);
-  int list_entries(int shard, const real_time& start_time, const real_time& end_time, int max_entries,
-                  list<rgw_data_change_log_entry>& entries,
-                  const string& marker,
-                  string *out_marker,
-                  bool *truncated);
-  int list_entries(const real_time& start_time, const real_time& end_time, int max_entries,
-                  list<rgw_data_change_log_entry>& entries, RGWDataChangesLogMarker& marker, bool *ptruncated);
-  int trim_entries(int shard_id, const real_time& start_time, const real_time& end_time,
-                   const string& start_marker, const string& end_marker);
+  int list_entries(int shard, int max_entries,
+                  std::vector<rgw_data_change_log_entry>& entries,
+                  std::optional<std::string_view> marker,
+                  std::string* out_marker, bool* truncated);
+  int list_entries(int max_entries,
+                  std::vector<rgw_data_change_log_entry>& entries,
+                  RGWDataChangesLogMarker& marker, bool *ptruncated);
+  int trim_entries(int shard_id, std::string_view marker);
 };
-