]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: add generations to error repo with binary format
authorCasey Bodley <cbodley@redhat.com>
Fri, 22 Jan 2021 23:28:50 +0000 (18:28 -0500)
committerCasey Bodley <cbodley@redhat.com>
Thu, 4 Feb 2021 21:11:30 +0000 (16:11 -0500)
adds a backward-compatible binary encoding for error repo keys that can
contain a generation number along with the bucket and shard

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_basic_types.cc
src/rgw/rgw_basic_types.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_sync_error_repo.cc
src/rgw/rgw_sync_error_repo.h

index 01cca48c2185cf91b6dea8ea68e29df6a26b9539..6f8b5cfb26734ade8025567184fa645ce1ad1f7e 100644 (file)
@@ -70,10 +70,10 @@ std::string rgw_bucket::get_key(char tenant_delim, char id_delim, size_t reserve
 }
 
 std::string rgw_bucket_shard::get_key(char tenant_delim, char id_delim,
-                                      char shard_delim) const
+                                      char shard_delim, size_t reserve) const
 {
   static constexpr size_t shard_len{12}; // ":4294967295\0"
-  auto key = bucket.get_key(tenant_delim, id_delim, shard_len);
+  auto key = bucket.get_key(tenant_delim, id_delim, reserve + shard_len);
   if (shard_id >= 0 && shard_delim) {
     key.append(1, shard_delim);
     key.append(std::to_string(shard_id));
@@ -81,6 +81,18 @@ std::string rgw_bucket_shard::get_key(char tenant_delim, char id_delim,
   return key;
 }
 
+void encode(const rgw_bucket_shard& b, bufferlist& bl, uint64_t f)
+{
+  encode(b.bucket, bl, f);
+  encode(b.shard_id, bl, f);
+}
+
+void decode(rgw_bucket_shard& b, bufferlist::const_iterator& bl)
+{
+  decode(b.bucket, bl);
+  decode(b.shard_id, bl);
+}
+
 void encode_json_impl(const char *name, const rgw_zone_id& zid, Formatter *f)
 {
   encode_json(name, zid.id, f);
index 81094fe569dcf894c6dc344b1427631b3a209103..f9d85317878c4d770f50a6c4be2f0e1dff6eed71 100644 (file)
@@ -430,7 +430,8 @@ struct rgw_bucket_shard {
   rgw_bucket_shard(const rgw_bucket& _b, int _sid) : bucket(_b), shard_id(_sid) {}
 
   std::string get_key(char tenant_delim = '/', char id_delim = ':',
-                      char shard_delim = ':') const;
+                      char shard_delim = ':',
+                      size_t reserve = 0) const;
 
   bool operator<(const rgw_bucket_shard& b) const {
     if (bucket < b.bucket) {
@@ -448,6 +449,9 @@ struct rgw_bucket_shard {
   }
 };
 
+void encode(const rgw_bucket_shard& b, bufferlist& bl, uint64_t f=0);
+void decode(rgw_bucket_shard& b, bufferlist::const_iterator& bl);
+
 inline std::ostream& operator<<(std::ostream& out, const rgw_bucket_shard& bs) {
   if (bs.shard_id <= 0) {
     return out << bs.bucket;
index 5a1a9869bee9f5b0ea4c11924f425c755763fe32..48bf4210895b954697ff387c10eff721046e4b44 100644 (file)
@@ -70,6 +70,17 @@ void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("entries", entries, obj);
 };
 
+// print a bucket shard with [gen]
+std::string to_string(const rgw_bucket_shard& bs, std::optional<uint64_t> gen)
+{
+  constexpr auto digits10 = std::numeric_limits<uint64_t>::digits10;
+  constexpr auto reserve = 2 + digits10; // [value]
+  auto str = bs.get_key('/', ':', ':', reserve);
+  str.append(1, '[');
+  str.append(std::to_string(gen.value_or(0)));
+  str.append(1, ']');
+  return str;
+}
 
 class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
   static constexpr int MAX_CONCURRENT_SHARDS = 16;
@@ -1279,7 +1290,7 @@ public:
       marker_tracker(_marker_tracker), error_repo(error_repo),
       lease_cr(std::move(lease_cr)) {
     set_description() << "data sync single entry (source_zone=" << sc->source_zone << ") " << obligation;
-    tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", obligation.key);
+    tn = sync_env->sync_tracer->add_node(_tn_parent, "entry", to_string(obligation.bs, obligation.gen));
   }
 
   int operate() override {
@@ -1334,14 +1345,15 @@ public:
         // this was added when 'tenant/' was added to datalog entries, because
         // preexisting tenant buckets could never sync and would stay in the
         // error_repo forever
-        tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->key));
+        tn->log(0, SSTR("WARNING: skipping data log entry for missing bucket " << complete->bs));
         sync_status = 0;
       }
 
       if (sync_status < 0) {
         // write actual sync failures for 'radosgw-admin sync error list'
         if (sync_status != -EBUSY && sync_status != -EAGAIN) {
-          yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data", complete->key,
+          yield call(sync_env->error_logger->log_error_cr(sc->conn->get_remote_id(), "data",
+                                                          to_string(complete->bs, complete->gen),
                                                           -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
           if (retcode < 0) {
             tn->log(0, SSTR("ERROR: failed to log sync failure: retcode=" << retcode));
@@ -1349,15 +1361,17 @@ public:
         }
         if (complete->timestamp != ceph::real_time{}) {
           tn->log(10, SSTR("writing " << *complete << " to error repo for retry"));
-          yield call(rgw_error_repo_write_cr(sync_env->store->svc()->rados, error_repo,
-                                            complete->key, complete->timestamp));
+          yield call(rgw::error_repo::write_cr(sync_env->store->svc()->rados, error_repo,
+                                               rgw::error_repo::encode_key(complete->bs, complete->gen),
+                                               complete->timestamp));
           if (retcode < 0) {
             tn->log(0, SSTR("ERROR: failed to log sync failure in error repo: retcode=" << retcode));
           }
         }
       } else if (complete->retry) {
-        yield call(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo,
-                                            complete->key, complete->timestamp));
+        yield call(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
+                                              rgw::error_repo::encode_key(complete->bs, complete->gen),
+                                              complete->timestamp));
         if (retcode < 0) {
           tn->log(0, SSTR("ERROR: failed to remove omap key from error repo ("
              << error_repo << " retcode=" << retcode));
@@ -1447,11 +1461,11 @@ class RGWDataSyncShardCR : public RGWCoroutine {
                                        &bs.bucket, &bs.shard_id);
   }
   RGWCoroutine* sync_single_entry(const rgw_bucket_shard& src,
-                                  const std::string& key,
+                                  std::optional<uint64_t> gen,
                                   const std::string& marker,
                                   ceph::real_time timestamp, bool retry) {
     auto state = bucket_shard_cache->get(src);
-    auto obligation = rgw_data_sync_obligation{key, marker, timestamp, retry};
+    auto obligation = rgw_data_sync_obligation{src, gen, marker, timestamp, retry};
     return new RGWDataSyncSingleEntryCR(sc, std::move(state), std::move(obligation),
                                         &*marker_tracker, error_repo,
                                         lease_cr.get(), tn);
@@ -1577,7 +1591,7 @@ public:
             tn->log(0, SSTR("ERROR: cannot start syncing " << iter->first << ". Duplicate entry?"));
           } else {
             // fetch remote and write locally
-            spawn(sync_single_entry(source_bs, iter->first, iter->first,
+            spawn(sync_single_entry(source_bs, std::nullopt, iter->first,
                                     entry_timestamp, false), false);
           }
           sync_marker.marker = iter->first;
@@ -1659,7 +1673,7 @@ public:
             continue;
           }
           tn->log(20, SSTR("received async update notification: " << *modified_iter));
-          spawn(sync_single_entry(source_bs, *modified_iter, string(),
+          spawn(sync_single_entry(source_bs, std::nullopt, string(),
                                   ceph::real_time{}, false), false);
         }
 
@@ -1673,17 +1687,21 @@ public:
           iter = error_entries.begin();
           for (; iter != error_entries.end(); ++iter) {
             error_marker = iter->first;
-            entry_timestamp = rgw_error_repo_decode_value(iter->second);
-            retcode = parse_bucket_key(error_marker, source_bs);
+            entry_timestamp = rgw::error_repo::decode_value(iter->second);
+            std::optional<uint64_t> gen;
+            retcode = rgw::error_repo::decode_key(iter->first, source_bs, gen);
+            if (retcode == -EINVAL) {
+              // backward compatibility for string keys that don't encode a gen
+              retcode = parse_bucket_key(error_marker, source_bs);
+            }
             if (retcode < 0) {
               tn->log(1, SSTR("failed to parse bucket shard: " << error_marker));
-              spawn(rgw_error_repo_remove_cr(sync_env->store->svc()->rados, error_repo,
-                                             error_marker, entry_timestamp), false);
+              spawn(rgw::error_repo::remove_cr(sync_env->store->svc()->rados, error_repo,
+                                               error_marker, entry_timestamp), false);
               continue;
             }
-            tn->log(20, SSTR("handle error entry key=" << error_marker << " timestamp=" << entry_timestamp));
-            spawn(sync_single_entry(source_bs, error_marker, "",
-                                    entry_timestamp, true), false);
+            tn->log(20, SSTR("handle error entry key=" << to_string(source_bs, gen) << " timestamp=" << entry_timestamp));
+            spawn(sync_single_entry(source_bs, gen, "", entry_timestamp, true), false);
           }
           if (!omapvals->more) {
             error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
@@ -1717,7 +1735,7 @@ public:
           if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
             tn->log(0, SSTR("ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?"));
           } else {
-            spawn(sync_single_entry(source_bs, log_iter->entry.key, log_iter->log_id,
+            spawn(sync_single_entry(source_bs, std::nullopt, log_iter->log_id,
                                     log_iter->log_timestamp, false), false);
           }
 
index 70c8cfd02f2ce038edcefd5160e750cd3d18172d..b4fefedc6d4f2f704d8d1d3ca5e77d6d785a13a5 100644 (file)
 
 // represents an obligation to sync an entry up a given time
 struct rgw_data_sync_obligation {
-  std::string key;
+  rgw_bucket_shard bs;
+  std::optional<uint64_t> gen;
   std::string marker;
   ceph::real_time timestamp;
   bool retry = false;
 };
 
 inline std::ostream& operator<<(std::ostream& out, const rgw_data_sync_obligation& o) {
-  out << "key=" << o.key;
+  out << "key=" << o.bs;
+  if (o.gen) {
+    out << '[' << *o.gen << ']';
+  }
   if (!o.marker.empty()) {
     out << " marker=" << o.marker;
   }
index e952ce912307cd2e96ab233ed54209587b486c58..8fcd800408a7fe9104c1b2651611e7b49805dc42 100644 (file)
 #include "services/svc_rados.h"
 #include "cls/cmpomap/client.h"
 
-ceph::real_time rgw_error_repo_decode_value(const bufferlist& bl)
+namespace rgw::error_repo {
+
+// prefix for the binary encoding of keys. this particular value is not
+// valid as the first byte of a utf8 code point, so we use this to
+// differentiate the binary encoding from existing string keys for
+// backward-compatibility
+constexpr uint8_t binary_key_prefix = 0x80;
+
+struct key_type {
+  rgw_bucket_shard bs;
+  std::optional<uint64_t> gen;
+};
+
+void encode(const key_type& k, bufferlist& bl, uint64_t f=0)
+{
+  ENCODE_START(1, 1, bl);
+  encode(k.bs, bl);
+  encode(k.gen, bl);
+  ENCODE_FINISH(bl);
+}
+
+void decode(key_type& k, bufferlist::const_iterator& bl)
+{
+  DECODE_START(1, bl);
+  decode(k.bs, bl);
+  decode(k.gen, bl);
+  DECODE_FINISH(bl);
+}
+
+std::string encode_key(const rgw_bucket_shard& bs,
+                       std::optional<uint64_t> gen)
+{
+  using ceph::encode;
+  const auto key = key_type{bs, gen};
+  bufferlist bl;
+  encode(binary_key_prefix, bl);
+  encode(key, bl);
+  return bl.to_str();
+}
+
+int decode_key(std::string encoded,
+               rgw_bucket_shard& bs,
+               std::optional<uint64_t>& gen)
+{
+  using ceph::decode;
+  key_type key;
+  const auto bl = bufferlist::static_from_string(encoded);
+  auto p = bl.cbegin();
+  try {
+    uint8_t prefix;
+    decode(prefix, p);
+    if (prefix != binary_key_prefix) {
+      return -EINVAL;
+    }
+    decode(key, p);
+  } catch (const buffer::error&) {
+    return -EIO;
+  }
+  if (!p.end()) {
+    return -EIO; // buffer contained unexpected bytes
+  }
+  bs = std::move(key.bs);
+  gen = key.gen;
+  return 0;
+}
+
+ceph::real_time decode_value(const bufferlist& bl)
 {
   uint64_t value;
   try {
@@ -30,9 +96,9 @@ ceph::real_time rgw_error_repo_decode_value(const bufferlist& bl)
   return ceph::real_clock::zero() + ceph::timespan(value);
 }
 
-int rgw_error_repo_write(librados::ObjectWriteOperation& op,
-                         const std::string& key,
-                         ceph::real_time timestamp)
+int write(librados::ObjectWriteOperation& op,
+          const std::string& key,
+          ceph::real_time timestamp)
 {
   // overwrite the existing timestamp if value is greater
   const uint64_t value = timestamp.time_since_epoch().count();
@@ -41,9 +107,9 @@ int rgw_error_repo_write(librados::ObjectWriteOperation& op,
   return cmp_set_vals(op, Mode::U64, Op::GT, {{key, u64_buffer(value)}}, zero);
 }
 
-int rgw_error_repo_remove(librados::ObjectWriteOperation& op,
-                          const std::string& key,
-                          ceph::real_time timestamp)
+int remove(librados::ObjectWriteOperation& op,
+           const std::string& key,
+           ceph::real_time timestamp)
 {
   // remove the omap key if value >= existing
   const uint64_t value = timestamp.time_since_epoch().count();
@@ -67,7 +133,7 @@ class RGWErrorRepoWriteCR : public RGWSimpleCoroutine {
 
   int send_request() override {
     librados::ObjectWriteOperation op;
-    int r = rgw_error_repo_write(op, key, timestamp);
+    int r = write(op, key, timestamp);
     if (r < 0) {
       return r;
     }
@@ -85,10 +151,10 @@ class RGWErrorRepoWriteCR : public RGWSimpleCoroutine {
   }
 };
 
-RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados,
-                                      const rgw_raw_obj& obj,
-                                      const std::string& key,
-                                      ceph::real_time timestamp)
+RGWCoroutine* write_cr(RGWSI_RADOS* rados,
+                       const rgw_raw_obj& obj,
+                       const std::string& key,
+                       ceph::real_time timestamp)
 {
   return new RGWErrorRepoWriteCR(rados, obj, key, timestamp);
 }
@@ -110,7 +176,7 @@ class RGWErrorRepoRemoveCR : public RGWSimpleCoroutine {
 
   int send_request() override {
     librados::ObjectWriteOperation op;
-    int r = rgw_error_repo_remove(op, key, timestamp);
+    int r = remove(op, key, timestamp);
     if (r < 0) {
       return r;
     }
@@ -128,10 +194,12 @@ class RGWErrorRepoRemoveCR : public RGWSimpleCoroutine {
   }
 };
 
-RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados,
-                                       const rgw_raw_obj& obj,
-                                       const std::string& key,
-                                       ceph::real_time timestamp)
+RGWCoroutine* remove_cr(RGWSI_RADOS* rados,
+                        const rgw_raw_obj& obj,
+                        const std::string& key,
+                        ceph::real_time timestamp)
 {
   return new RGWErrorRepoRemoveCR(rados, obj, key, timestamp);
 }
+
+} // namespace rgw::error_repo
index 58b3b183eea5fdd9ff50a733028d948e13daf961..60525d281f0fb87d8022e14d5cf6a17041a4ebfa 100644 (file)
@@ -14,6 +14,7 @@
 
 #pragma once
 
+#include <optional>
 #include "include/rados/librados_fwd.hpp"
 #include "include/buffer_fwd.h"
 #include "common/ceph_time.h"
 class RGWSI_RADOS;
 class RGWCoroutine;
 struct rgw_raw_obj;
+struct rgw_bucket_shard;
+
+namespace rgw::error_repo {
+
+// binary-encode a bucket/shard/gen and return it as a string
+std::string encode_key(const rgw_bucket_shard& bs,
+                       std::optional<uint64_t> gen);
+
+// try to decode a key. returns -EINVAL if not in binary format
+int decode_key(std::string encoded,
+               rgw_bucket_shard& bs,
+               std::optional<uint64_t>& gen);
 
 // decode a timestamp as a uint64_t for CMPXATTR_MODE_U64
-ceph::real_time rgw_error_repo_decode_value(const ceph::bufferlist& bl);
+ceph::real_time decode_value(const ceph::bufferlist& bl);
 
 // write an omap key iff the given timestamp is newer
-int rgw_error_repo_write(librados::ObjectWriteOperation& op,
-                         const std::string& key,
-                         ceph::real_time timestamp);
-RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados,
-                                      const rgw_raw_obj& obj,
-                                      const std::string& key,
-                                      ceph::real_time timestamp);
+int write(librados::ObjectWriteOperation& op,
+          const std::string& key,
+          ceph::real_time timestamp);
+RGWCoroutine* write_cr(RGWSI_RADOS* rados,
+                       const rgw_raw_obj& obj,
+                       const std::string& key,
+                       ceph::real_time timestamp);
 
 // remove an omap key iff there isn't a newer timestamp
-int rgw_error_repo_remove(librados::ObjectWriteOperation& op,
-                          const std::string& key,
-                          ceph::real_time timestamp);
-RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados,
-                                       const rgw_raw_obj& obj,
-                                       const std::string& key,
-                                       ceph::real_time timestamp);
-
+int remove(librados::ObjectWriteOperation& op,
+           const std::string& key,
+           ceph::real_time timestamp);
+RGWCoroutine* remove_cr(RGWSI_RADOS* rados,
+                        const rgw_raw_obj& obj,
+                        const std::string& key,
+                        ceph::real_time timestamp);
+
+} // namespace rgw::error_repo