]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: implement timestamp-based epochs. 62469/head
authorIgor Gomon <igomon@bloomberg.net>
Tue, 11 Mar 2025 18:41:41 +0000 (18:41 +0000)
committerIgor Gomon <igomon@bloomberg.net>
Tue, 16 Sep 2025 18:29:32 +0000 (18:29 +0000)
Signed-off-by: Igor Gomon <igomon@bloomberg.net>
src/cls/rgw/cls_rgw.cc
src/cls/rgw/cls_rgw_ops.h
src/cls/rgw/cls_rgw_types.cc
src/cls/rgw/cls_rgw_types.h
src/common/strtol.cc
src/common/strtol.h
src/rgw/driver/rados/rgw_rados.cc
src/test/rgw/rgw_multi/tests.py
src/test/rgw/rgw_multi/tests_es.py

index 53aa89ff6362f889198dc0f892e90da244b4be13..673f218e3c4912957d63d862e94eedfe8745236d 100644 (file)
@@ -321,19 +321,25 @@ static int get_obj_vals(cls_method_context_t hctx,
  */
 static void decreasing_str(uint64_t num, string *str)
 {
+  // This buffer must be big enough to hold the string representation of
+  // the largest unsigned 64-bit integer value (+ 1 more char).
   char buf[32];
   if (num < 0x10) { /* 16 */
-    snprintf(buf, sizeof(buf), "9%02lld", 15 - (long long)num);
+    snprintf(buf, sizeof(buf), "9%02" PRIu64, 0xF - num);
   } else if (num < 0x100) { /* 256 */
-    snprintf(buf, sizeof(buf), "8%03lld", 255 - (long long)num);
+    snprintf(buf, sizeof(buf), "8%03" PRIu64, 0xFF - num);
   } else if (num < 0x1000) /* 4096 */ {
-    snprintf(buf, sizeof(buf), "7%04lld", 4095 - (long long)num);
+    snprintf(buf, sizeof(buf), "7%04" PRIu64, 0xFFF - num);
   } else if (num < 0x10000) /* 65536 */ {
-    snprintf(buf, sizeof(buf), "6%05lld", 65535 - (long long)num);
+    snprintf(buf, sizeof(buf), "6%05" PRIu64, 0xFFFF - num);
   } else if (num < 0x100000000) /* 4G */ {
-    snprintf(buf, sizeof(buf), "5%010lld", 0xFFFFFFFF - (long long)num);
+    snprintf(buf, sizeof(buf), "5%010" PRIu64, 0xFFFFFFFF - num);
+  } else if (num < 0x10000000000) /* 1T */ {
+    snprintf(buf, sizeof(buf), "4%015" PRIu64, 0xFFFFFFFFFF - num);
+  } else if (num < 0x1000000000000) /* 281T */ {
+    snprintf(buf, sizeof(buf), "3%018" PRIu64, 0xFFFFFFFFFFFF - num);
   } else {
-    snprintf(buf, sizeof(buf), "4%020lld",  (long long)-num);
+    snprintf(buf, sizeof(buf), "2%020" PRIu64,  std::numeric_limits<uint64_t>::max() - num);
   }
 
   *str = buf;
@@ -498,11 +504,21 @@ static int decode_list_index_key(const string& index_key, cls_rgw_obj_key *key,
     if (val[0] == 'i') {
       key->instance = val.substr(1);
     } else if (val[0] == 'v') {
+      // what we are dealing here with is the string representation of the versioned epoch (as converted to by
+      // decreasing_str() func); the first char is always 'v' to indicate that it is the versioned epoch; the
+      // second char is a digit in [9-2] range that is used to separate value ranges - in order to make
+      // string representation sort in the opposite direction and to decrease string length - to speed up
+      // the lexicographical comparison; hence +2 (1 for the value indicator and one for the range prefix);
       string err;
-      const char *s = val.c_str() + 1;
-      *ver = strict_strtoll(s, 10, &err);
-      if (!err.empty()) {
-        CLS_LOG(0, "ERROR: %s: bad index_key (%s): could not parse val (v=%s)", __func__, escape_str(index_key).c_str(), s);
+      if (val.size() > 2) {
+        const char *s = val.c_str() + 2;
+        *ver = strict_strtoull(s, 10, &err);
+        if (!err.empty()) {
+          CLS_LOG(0, "ERROR: %s: bad index_key (%s): could not parse val (v=%s)", __func__, escape_str(index_key).c_str(), s);
+          return -EIO;
+        }
+      } else {
+        CLS_LOG(0, "ERROR: %s: bad index_key (%s): empty val", __func__, escape_str(index_key).c_str());
         return -EIO;
       }
     }
@@ -1627,19 +1643,20 @@ public:
     return 0;
   }
 
-  bool start_modify(uint64_t candidate_epoch) {
-    if (candidate_epoch) {
-      if (candidate_epoch < olh_data_entry.epoch) {
-        return false; /* olh cannot be modified, old epoch */
-      }
-      olh_data_entry.epoch = candidate_epoch;
-    } else {
-      if (olh_data_entry.epoch == 0) {
-        olh_data_entry.epoch = 2; /* versioned epoch should start with 2, 1 is reserved to converted plain entries */
-      } else {
-        olh_data_entry.epoch++;
-      }
+  /**
+   * This is called when a new instance of an object (in a versioned bucket) is added (via PUT) or an existing instance is removed.
+   * A part of that process is to update the OLH entry (in the bucket index) with the correct modification timestamp (epoch).
+   * This timestamp is then used later on to guard against OLH updates for add/remove instance ops that happened *before*
+   * the latest op that updated the OLH entry.
+   * @param candidate_epoch - this is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
+   */
+  bool start_modify (uint64_t candidate_epoch) {
+    // only update the olh.epoch if it is newer than the current one.
+    if (candidate_epoch < olh_data_entry.epoch) {
+      return false; /* olh cannot be modified, old epoch */
     }
+
+    olh_data_entry.epoch = candidate_epoch;
     return true;
   }
 
@@ -1889,81 +1906,95 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
 
   const uint64_t prev_epoch = olh.get_epoch();
 
-  if (!olh.start_modify(op.olh_epoch)) {
-    ret = obj.write(op.olh_epoch, false, header);
-    if (ret < 0) {
-      return ret;
-    }
-    if (removing) {
-      olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
-    }
-    return write_header_while_logrecord(hctx, header);
-  }
-
-  // promote this version to current if it's a newer epoch, or if it matches the
-  // current epoch and sorts after the current instance
-  const bool promote = (olh.get_epoch() > prev_epoch) ||
-      (olh.get_epoch() == prev_epoch &&
-       olh.get_entry().key.instance >= op.key.instance);
+  // op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
+  uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch :
+    duration_cast<std::chrono::nanoseconds>(obj.mtime().time_since_epoch()).count();
+  if (olh.start_modify(candidate_epoch)) {
+    // promote this version to current if it's a newer epoch, or if it matches the
+    // current epoch and sorts after the current instance
+    const bool promote = (olh.get_epoch() > prev_epoch) ||
+        (olh.get_epoch() == prev_epoch &&
+            olh.get_entry().key.instance >= op.key.instance);
+    const bool epoch_collision = olh.get_epoch() == prev_epoch;
+
+    if (olh_found) {
+      const string &olh_tag = olh.get_tag();
+      if (op.olh_tag != olh_tag) {
+        if (!olh.pending_removal()) {
+          CLS_LOG(5, "NOTICE: op.olh_tag (%s) != olh.tag (%s)", op.olh_tag.c_str(), olh_tag.c_str());
+          return -ECANCELED;
+        }
+        /* if pending removal, this is a new olh instance */
+        olh.set_tag(op.olh_tag);
+      }
+      if (epoch_collision) {
+        auto const &s_key = op.key.to_string();
+        CLS_LOG(1, "NOTICE: versioned epoch collision (%lu) for object %s", prev_epoch, s_key.c_str());
+      }
+      if (promote && olh.exists()) {
+        rgw_bucket_olh_entry &olh_entry = olh.get_entry();
+        /* found olh, previous instance is no longer the latest, need to update */
+        if (!(olh_entry.key == op.key)) {
+          BIVerObjEntry old_obj(hctx, olh_entry.key);
 
-  if (olh_found) {
-    const string& olh_tag = olh.get_tag();
-    if (op.olh_tag != olh_tag) {
-      if (!olh.pending_removal()) {
-        CLS_LOG(5, "NOTICE: op.olh_tag (%s) != olh.tag (%s)", op.olh_tag.c_str(), olh_tag.c_str());
-        return -ECANCELED;
+          ret = old_obj.demote_current(header);
+          if (ret < 0) {
+            CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
+            return ret;
+          }
+        }
+      }
+      olh.set_pending_removal(false);
+    } else {
+      bool instance_only = (op.key.instance.empty() && op.delete_marker);
+      cls_rgw_obj_key key(op.key.name);
+      ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header);
+      if (ret < 0) {
+        CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
+        return ret;
       }
-      /* if pending removal, this is a new olh instance */
       olh.set_tag(op.olh_tag);
+      if (op.key.instance.empty()) {
+        obj.set_epoch(1);
+      }
     }
-    if (promote && olh.exists()) {
-      rgw_bucket_olh_entry& olh_entry = olh.get_entry();
-      /* found olh, previous instance is no longer the latest, need to update */
-      if (!(olh_entry.key == op.key)) {
-        BIVerObjEntry old_obj(hctx, olh_entry.key);
 
-        ret = old_obj.demote_current(header);
-        if (ret < 0) {
-          CLS_LOG(0, "ERROR: could not demote current on previous key ret=%d", ret);
-          return ret;
-        }
-      }
+    /* update the olh log */
+    olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker);
+    if (removing) {
+      olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
     }
-    olh.set_pending_removal(false);
-  } else {
-    bool instance_only = (op.key.instance.empty() && op.delete_marker);
-    cls_rgw_obj_key key(op.key.name);
-    ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header);
+
+    if (promote) {
+      olh.update(op.key, op.delete_marker);
+    }
+    olh.set_exists(true);
+
+    /* write the instance and list entries */
+    ret = obj.write(olh.get_epoch(), promote, header);
     if (ret < 0) {
-      CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
       return ret;
     }
-    olh.set_tag(op.olh_tag);
-    if (op.key.instance.empty()){
-      obj.set_epoch(1);
-    }
-  }
 
-  /* update the olh log */
-  olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker);
-  if (removing) {
-    olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
+    ret = olh.write(header);
   }
+  else {
+    ret = obj.write(candidate_epoch, false, header);
+    if (ret < 0) {
+      return ret;
+    }
 
-  if (promote) {
-    olh.update(op.key, op.delete_marker);
-  }
-  olh.set_exists(true);
+    // no point here in adding CLS_RGW_OLH_OP_LINK_OLH to the pending log as we know that
+    // the epoch is already stale compared to the current - so no point in applying it;
 
-  ret = olh.write(header);
-  if (ret < 0) {
-    CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
-    return ret;
+    if (removing) {
+      olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch);
+      ret = olh.write(header);
+    }
   }
 
-  /* write the instance and list entries */
-  ret = obj.write(olh.get_epoch(), promote, header);
   if (ret < 0) {
+    CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
     return ret;
   }
 
@@ -1978,7 +2009,7 @@ static int rgw_bucket_link_olh(cls_method_context_t hctx, bufferlist *in, buffer
   rgw_bucket_dir_entry& entry = obj.get_dir_entry();
 
   rgw_bucket_entry_ver ver;
-  ver.epoch = (op.olh_epoch ? op.olh_epoch : olh.get_epoch());
+  ver.epoch = candidate_epoch;
 
   string *powner = NULL;
   string *powner_display_name = NULL;
@@ -2061,73 +2092,76 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
     obj.set_epoch(1);
   }
 
-  if (!olh.start_modify(op.olh_epoch)) {
-    ret = obj.unlink_list_entry(header);
-    if (ret < 0) {
-      return ret;
-    }
-
-    if (obj.is_delete_marker()) {
-      return 0;
-    }
+  // op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
+  uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch :
+    duration_cast<std::chrono::nanoseconds>(real_clock::now().time_since_epoch()).count();
+  if (olh.start_modify(candidate_epoch)) {
+    rgw_bucket_olh_entry &olh_entry = olh.get_entry();
+    cls_rgw_obj_key &olh_key = olh_entry.key;
+    CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__,
+            olh_key.name.c_str(), olh_key.instance.c_str(), olh_entry.delete_marker);
+
+    if (olh_key == dest_key) {
+      /* this is the current head, need to update the OLH! */
+      cls_rgw_obj_key next_key;
+      bool found = false;
+      ret = obj.find_next_key(&next_key, &found);
+      if (ret < 0) {
+        CLS_LOG(0, "ERROR: obj.find_next_key() returned ret=%d", ret);
+        return ret;
+      }
 
-    olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, op.olh_epoch);
-    return olh.write(header);
-  }
+      if (found) {
+        BIVerObjEntry next(hctx, next_key);
+        ret = next.write(olh.get_epoch(), true, header);
+        if (ret < 0) {
+          CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
+          return ret;
+        }
 
-  rgw_bucket_olh_entry& olh_entry = olh.get_entry();
-  cls_rgw_obj_key& olh_key = olh_entry.key;
-  CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__,
-             olh_key.name.c_str(), olh_key.instance.c_str(), olh_entry.delete_marker);
+        CLS_LOG(20, "%s: updating olh log: link olh -> %s[%s] (is_delete=%d)", __func__,
+                next_key.name.c_str(), next_key.instance.c_str(), (int) next.is_delete_marker());
 
-  if (olh_key == dest_key) {
-    /* this is the current head, need to update the OLH! */
-    cls_rgw_obj_key next_key;
-    bool found = false;
-    ret = obj.find_next_key(&next_key, &found);
-    if (ret < 0) {
-      CLS_LOG(0, "ERROR: obj.find_next_key() returned ret=%d", ret);
-      return ret;
+        olh.update(next_key, next.is_delete_marker());
+        olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker());
+      } else {
+        // next_key is empty, but we need to preserve its name in case this entry
+        // gets resharded, because this key is used for hash placement
+        next_key.name = dest_key.name;
+        olh.update(next_key, false);
+        olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false);
+        olh.set_exists(false);
+        olh.set_pending_removal(true);
+      }
     }
 
-    if (found) {
-      BIVerObjEntry next(hctx, next_key);
-      ret = next.write(olh.get_epoch(), true, header);
+    if (!obj.is_delete_marker()) {
+      olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
+    } else {
+      /* this is a delete marker, it's our responsibility to remove its
+       * instance entry */
+      ret = obj.unlink(header, op.key);
       if (ret < 0) {
-        CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
         return ret;
       }
-
-      CLS_LOG(20, "%s: updating olh log: link olh -> %s[%s] (is_delete=%d)", __func__,
-              next_key.name.c_str(), next_key.instance.c_str(), (int)next.is_delete_marker());
-
-      olh.update(next_key, next.is_delete_marker());
-      olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker());
-    } else {
-      // next_key is empty, but we need to preserve its name in case this entry
-      // gets resharded, because this key is used for hash placement
-      next_key.name = dest_key.name;
-      olh.update(next_key, false);
-      olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false);
-      olh.set_exists(false);
-      olh.set_pending_removal(true);
     }
-  }
 
-  if (!obj.is_delete_marker()) {
-    olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
-  } else {
-    /* this is a delete marker, it's our responsibility to remove its
-     * instance entry */
-    ret = obj.unlink(header, op.key);
+    ret = obj.unlink_list_entry(header);
     if (ret < 0) {
       return ret;
     }
   }
+  else {
+    ret = obj.unlink_list_entry(header);
+    if (ret < 0) {
+      return ret;
+    }
 
-  ret = obj.unlink_list_entry(header);
-  if (ret < 0) {
-    return ret;
+    if (obj.is_delete_marker()) {
+      return 0;
+    }
+
+    olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch);
   }
 
   ret = olh.write(header);
@@ -2144,7 +2178,7 @@ static int rgw_bucket_unlink_instance(cls_method_context_t hctx, bufferlist *in,
   }
 
   rgw_bucket_entry_ver ver;
-  ver.epoch = (op.olh_epoch ? op.olh_epoch : olh.get_epoch());
+  ver.epoch = candidate_epoch;
 
   real_time mtime = obj.mtime(); /* mtime has no real meaning in
                                   * instance removal context */
index 225df29fe510f84c665d724cea57ee71a676c3e8..9d0267acf4b0e4c087ddf0f8dd7ce24657934082 100644 (file)
@@ -232,9 +232,16 @@ WRITE_CLASS_ENCODER(rgw_cls_link_olh_op)
 struct rgw_cls_unlink_instance_op {
   cls_rgw_obj_key key;
   std::string op_tag;
+  // this represents a remote epoch during multisite sync
   uint64_t olh_epoch;
   bool log_op;
   uint16_t bilog_flags;
+  // cls ops include olh_tag so the OLH class code can guard sensitive updates—only proceed if op.olh_tag equals
+  // the OLH’s stored tag. If it doesn’t, the op fails and the caller refreshes state/retries.
+  // for context: in real clusters, out‑of‑order replication or topology changes can recreate/move an OLH
+  // (eg, resharding or certain multisite flows). The tag changes with that new OLH “generation,” so stale
+  // writers carrying the old tag get refused instead of overwriting the new state. A concrete example of failures
+  // tied to OLH attributes shows how wrong attributes/tags cause bad GET behavior, which is why the guard exists.
   std::string olh_tag;
   rgw_zone_set zones_trace;
 
index 9fd60aaff3f3099412579731e35c436f91603be7..bfcd42c7ff31eade9246e504497ac1d2b6582e0a 100644 (file)
@@ -465,6 +465,9 @@ void rgw_bucket_olh_entry::dump(Formatter *f) const
   encode_json("key", key, f);
   encode_json("delete_marker", delete_marker, f);
   encode_json("epoch", epoch, f);
+  ceph::real_time tp {std::chrono::nanoseconds (epoch)};
+  utime_t ut(tp);
+  encode_json("epoch_timestamp", ut, f);
   encode_json("pending_log", pending_log, f);
   encode_json("tag", tag, f);
   encode_json("exists", exists, f);
index 1bfcbcc97b89eaf3657dfe0c0cc2cd3dfb06dca9..e41d03d4458beee2b79c2688e7039471832f7132 100644 (file)
@@ -511,14 +511,20 @@ WRITE_CLASS_ENCODER(rgw_cls_bi_entry)
 
 enum OLHLogOp {
   CLS_RGW_OLH_OP_UNKNOWN         = 0,
+  // link OLH entry to a specific object version
   CLS_RGW_OLH_OP_LINK_OLH        = 1,
+  // deletes OLH object from the data pool and removes OLH entry from the bucket index
   CLS_RGW_OLH_OP_UNLINK_OLH      = 2, /* object does not exist */
+  // remove a specific instance of an object, such as <obj_name>.<obj_version>
   CLS_RGW_OLH_OP_REMOVE_INSTANCE = 3,
 };
 
 struct rgw_bucket_olh_log_entry {
   uint64_t epoch;
   OLHLogOp op;
+  // Once the OLH Log Entries are processed for a given epoch (by apply_olh_log()) the corresponding olh.pending.*
+  // xattrs are removed from the corresponding OLH object (in the data pool). The pending xattrs to be removed
+  // are those that match op_tag.
   std::string op_tag;
   cls_rgw_obj_key key;
   bool delete_marker;
@@ -555,8 +561,17 @@ WRITE_CLASS_ENCODER(rgw_bucket_olh_log_entry)
 struct rgw_bucket_olh_entry {
   cls_rgw_obj_key key;
   bool delete_marker;
+  // the epoch represents the latest modification timestamp for the S3 object identified by the key;
   uint64_t epoch;
+  // epoch -> op list mapping: stores pending modifications to the S3 object identified by the key;
+  // this is basically a per-S3-object WAL whose main purpose is crash safety and idempotency; operations
+  // PUT/DELETE that modify S3 object history write to this log first; it is being
+  // replayed by the apply_olh_log() on the same zone;
+  // usually there's only 1 op per epoch key but more than 1 op would be associated with an epoch in case
+  // of versioned DELETE for the current instance: [remove instance, link]
   std::map<uint64_t, std::vector<struct rgw_bucket_olh_log_entry> > pending_log;
+  // unique tag for this entry; it remains the same until the entry is deleted (like when versioning
+  // is suspended) and then re-created (by re-enabling versioning);
   std::string tag;
   bool exists;
   bool pending_removal;
index 0e197535b7ac7162cf8cb47270d005c655cd18ad..c198df4f383c9709368efbf83201d6e2818df31e 100644 (file)
@@ -58,6 +58,25 @@ long long strict_strtoll(std::string_view str, int base, std::string *err)
   return ret;
 }
 
+unsigned long long strict_strtoull(std::string_view str, int base, std::string *err)
+{
+  char *endptr;
+  errno = 0; /* To distinguish success/failure after call (see man page) */
+  auto ret = strtoull(str.data(), &endptr, base);
+  if (endptr == str.data() || endptr != str.data() + str.size()) {
+    *err = (std::string{"Expected option value to be integer, got '"} +
+        std::string{str} + "'");
+    return 0;
+  }
+  if (errno) {
+    *err = (std::string{"The option value '"} + std::string{str} +
+        "' seems to be invalid");
+    return 0;
+  }
+  *err = "";
+  return ret;
+}
+
 int strict_strtol(std::string_view str, int base, std::string *err)
 {
   long long ret = strict_strtoll(str, base, err);
index 681ac1a290c84c383efd37c8db17b4ec19114408..9108f942a30c52123e79d152b1999641c2aa880c 100644 (file)
@@ -73,6 +73,8 @@ bool strict_strtob(std::string_view str, std::string *err);
 
 long long strict_strtoll(std::string_view str, int base, std::string *err);
 
+unsigned long long strict_strtoull(std::string_view str, int base, std::string *err);
+
 int strict_strtol(std::string_view str, int base, std::string *err);
 
 double strict_strtod(std::string_view str, std::string *err);
index 054604002754084240ba9e91ac9d8bf137cedbb1..2f7a4ada7d7f127cf944885bb2cba52c221bfbfc 100644 (file)
@@ -6380,8 +6380,10 @@ int RGWRados::Object::Delete::delete_obj(optional_yield y,
     if (add_log) {
       r = add_datalog_entry(dpp, store->svc.datalog_rados,
                            target->get_bucket_info(), bs->shard_id, y);
-      ldpp_dout(dpp, 0) << "failed to write datalog for object: r=" << r << dendl;
-      return r;
+      if (r < 0) {
+        ldpp_dout(dpp, 0) << "failed to write datalog for object: r=" << r << dendl;
+        return r;
+      }
     }
 
     return 0;
@@ -8948,6 +8950,11 @@ int RGWRados::apply_olh_log(const DoutPrefixProvider *dpp,
       ldpp_dout(dpp, 20) << "olh_log_entry: epoch=" << iter->first << " op=" << (int)entry.op
                      << " key=" << entry.key.name << "[" << entry.key.instance << "] "
                      << (entry.delete_marker ? "(delete)" : "") << dendl;
+
+      if (link_epoch == iter->first)
+        ldpp_dout(dpp, 1) << "apply_olh_log epoch collision detected for " << entry.key
+                          << "; incoming op: " << entry.op << "(" << entry.op_tag << ")" << dendl;
+
       switch (entry.op) {
       case CLS_RGW_OLH_OP_REMOVE_INSTANCE:
         remove_instances.push_back(entry.key);
index 1d4ad5d47894fd3ca98106f27609cee7b1adde31..d339a74737a2d7164da0a2814b9b6e189c2bf3c7 100644 (file)
@@ -6,6 +6,9 @@ import time
 import logging
 import errno
 import dateutil.parser
+from datetime import datetime
+import threading
+from typing import Dict, List, Any
 
 from itertools import combinations
 from itertools import zip_longest
@@ -110,6 +113,12 @@ def bilog_list(zone, bucket, args = None):
     bilog, _ = zone.cluster.admin(cmd, read_only=True)
     return json.loads(bilog)
 
+def bucket_list(zone, bucket, args = None):
+    cmd = ['bucket', 'list', '--bucket', bucket, '--max-entries', '100000', '--uid', user.name] + (args or [])
+    cmd += ['--tenant', config.tenant] if config.tenant else []
+    output, _ = zone.cluster.admin(cmd, read_only=True)
+    return json.loads(output)
+
 def bilog_autotrim(zone, args = None):
     cmd = ['bilog', 'autotrim'] + (args or []) + zone.zone_args()
     zone.cluster.admin(cmd, debug_rgw=20)
@@ -1826,7 +1835,7 @@ def test_bucket_log_trim_after_delete_bucket_primary_reshard():
 
     # run bilog trim twice on primary zone where the bucket was resharded
     bilog_autotrim(primary.zone, ['--rgw-sync-log-trim-max-buckets', '50'],)
-    
+
     for zonegroup in realm.current_period.zonegroups:
         zonegroup_conns = ZonegroupConns(zonegroup)
         for zone in zonegroup_conns.zones:
@@ -2315,7 +2324,7 @@ def test_assume_role_after_sync():
         log.info(f'checking if zone: {zone.name} has role: {role_name}')
         assert(zone.has_role(role_name))
         log.info(f'success, zone: {zone.name} has role: {role_name}')
-    
+
     for zone in zonegroup_conns.zones:
         if zone == zonegroup_conns.master_zone:
             log.info(f'creating bucket in primary zone')
@@ -3971,6 +3980,153 @@ def test_bucket_create_location_constraint():
                                     CreateBucketConfiguration={'LocationConstraint': zg.name})
                 assert e.response['ResponseMetadata']['HTTPStatusCode'] == 400
 
+def test_timestamp_based_epochs():
+    """
+    test_timestamp_based_epochs:
+        the test generates objects/instance in both zones: for each of NUM_OBJECTS NUM_VERSIONS are generated;
+        then it waits for the replication to finish and then lists objects/instances in both zones and checks
+        that the instances there are listed are in chronological order, with the expectation that without
+        time-based epochs the listed order of object versions won't be chronological; with the time-based epochs
+        the order should be strictly chronological
+    """
+    class ObjVersion:
+        def __init__ (self, name: str, instance: str, mtime: datetime, ver_epoch: int):
+            self.name = name
+            self.instance = instance
+            self.mtime = mtime
+            self.ver_epoch = ver_epoch
+
+    def __eq__ (self, other):
+        return (self.name == other.name and
+                self.instance == other.instance and
+                self.mtime == other.mtime and
+                self.ver_epoch == other.ver_epoch)
+
+    def parse_bucket_list_output (data: Any) -> Dict[str, List[ObjVersion]]:
+        """
+        Parses output of the 'radosgw-admin bucket-list --bucket <name> --format json' command.
+        :param output:
+        :return:
+        """
+        if not isinstance(data, list):
+            raise ValueError("Expected a list of entries in JSON input")
+
+        results: Dict[str, List[ObjVersion]] = {}
+        for entry in data:
+            if not isinstance(entry, dict):
+                continue
+
+            name = entry["name"]
+            instance = entry["instance"]
+            mtime = entry.get("meta", {}).get("mtime")
+            ver_epoch = entry["versioned_epoch"]
+
+            obj_ver= ObjVersion(name, instance, mtime, ver_epoch)
+            if results.get(name) is None:
+                results[name] = []
+            results[name].append(obj_ver)
+
+        return results
+
+    zonegroup = realm.master_zonegroup()
+    zonegroup_conns = ZonegroupConns(zonegroup)
+    primary = zonegroup_conns.rw_zones[0]
+
+    NUM_OBJECTS = 10
+    NUM_VERSIONS = 100
+
+    source_bucket = primary.create_bucket(gen_bucket_name())
+    log.info('created bucket=%s', source_bucket.name)
+
+    def create_bucket_objects (zone):
+        client = zone.s3_client
+        log.info(f"Creating objects for {client.meta.endpoint_url} in bucket {source_bucket.name}")
+        for i in range(0, NUM_OBJECTS):
+            for vid in range(0, NUM_VERSIONS):
+                key=f"obj-{i}.txt"
+                response=client.put_object(Key=key, Body=f"This is version {vid}", Bucket=source_bucket.name)
+                log.info(f"Instance {key} ({response['ResponseMetadata']['HTTPHeaders']['x-amz-version-id']}) created @ {client.meta.endpoint_url}")
+            log.info(f"{NUM_VERSIONS} versions created for object {key} on {client.meta.endpoint_url}")
+
+
+    # list all objects/versions in the zone and check that their versions are listed in the
+    # chronological order - from the newest to the oldest;
+    def check_modification_history(zone) -> Dict[str, int]:
+        response = bucket_list(zone, source_bucket.name)
+        obj_versions = parse_bucket_list_output(response)
+
+        # use this map to keep track of status checks for each object
+        obj_status = {f"obj-{oid}.txt" : -1 for oid in range(NUM_OBJECTS)}
+        expected_num_versions_per_obj = NUM_VERSIONS * len(zonegroup_conns.rw_zones)
+        for obj_name, versions in obj_versions.items():
+            log.info(f"Checking object {obj_name}'s' history - there are {len(versions)} versions")
+            assert len(versions) == expected_num_versions_per_obj, \
+                f"Number of versions ({len(versions)}) for {obj_name} does not match the expected number {expected_num_versions_per_obj}"
+            prev_version = versions[0]
+            out_of_order_versions = 0
+            for idx in range(1, len(versions)):
+                version = versions[idx]
+                # prior to the timestamp-based epochs we used integer based epochs which are not based on the modification time of the
+                # object; so whenever there is an epoch collision we might see that an older object might appear in the bucket
+                # listing before the newer one - which is the problem which timestamp-based epochs solve (by increasing epoch
+                # resolution significantly thus making epoch collisions virtually impossible); nevertheless, if the 2 versions
+                # were created at the exact same time we still rely on the version id to determine which one appears first
+                # (more recent) in the modification history even though both have the same timestamp;
+                if version.ver_epoch == prev_version.ver_epoch and version.mtime > prev_version.mtime:
+                    log.error(f"Version {obj_name}:{version.instance} is newer than {obj_name}:{prev_version.instance} but is listed later in the history")
+                    out_of_order_versions += 1
+                elif version.ver_epoch + 1 == prev_version.ver_epoch:
+                    log.warning(f"Version {version.instance} is just 1ns apart from {prev_version.instance}")
+
+                prev_version = version
+
+            obj_status[obj_name] = out_of_order_versions
+            if out_of_order_versions==0:
+                log.info(f"{obj_name}: OK")
+            else:
+                log.warning(f"{obj_name}: {out_of_order_versions} versions are out of order")
+
+        return obj_status
+
+    def set_bucket_versioning(state: bool):
+        primary.s3_client.put_bucket_versioning(Bucket=source_bucket.name, VersioningConfiguration=
+        {'Status': 'Enabled' if state else 'Disabled'})
+
+    set_bucket_versioning(True)
+
+    # wait for those changes to propagate to the secondary zone;
+    zonegroup_meta_checkpoint(zonegroup)
+
+    threads = [threading.Thread(target=create_bucket_objects, args=[zone]) for zone in zonegroup_conns.rw_zones]
+    for t in threads:
+        t.start()
+    for t in threads:
+        t.join()
+
+    # polls bucket sync status for all zones in the zonegroup until they catch up with the checkpoint
+    zonegroup_bucket_checkpoint(zonegroup_conns, source_bucket.name)
+
+    # now check modification history in each zone
+    threads = [threading.Thread(target=check_modification_history, args=[zone.zone])
+               for zone in zonegroup_conns.rw_zones]
+    for t in threads:
+        t.start()
+    for t in threads:
+        t.join()
+
+    # check the results
+    for zone in zonegroup_conns.rw_zones:
+        log.info(f"Checking modification history for zone {zone.name}")
+        obj_status = check_modification_history(zone.zone)
+        for name, out_of_order_versions in obj_status.items():
+            if out_of_order_versions == 0:
+                log.info(f"Object {name}: history OK")
+            elif out_of_order_versions == -1:
+                assert False, f"Object {name}: has no versions"
+            else:
+                assert False, f"Object {name}: found {out_of_order_versions} versions which are out of order"
+
+
 def run_per_zonegroup(func):
     def wrapper(*args, **kwargs):
         for zonegroup in realm.current_period.zonegroups:
@@ -4712,7 +4868,6 @@ def test_bucket_delete_with_bucket_sync_policy_directional():
 
     assert check_all_buckets_dont_exist(zcA, buckets)
     assert check_all_buckets_dont_exist(zcB, buckets)
-    
     remove_sync_policy_group(c1, "sync-group")
 
     return
@@ -4791,7 +4946,6 @@ def test_bucket_delete_with_bucket_sync_policy_symmetric():
 
     assert check_all_buckets_dont_exist(zcA, buckets)
     assert check_all_buckets_dont_exist(zcB, buckets)
-    
     remove_sync_policy_group(c1, "sync-group")
     return
 
@@ -4924,7 +5078,6 @@ def test_delete_bucket_with_zone_opt_out():
 
     bucket = get_bucket(zcC, bucketA.name)
     check_objects_not_exist(bucket, objnameA)
-    
     # verify that objnameB is not synced to either zoneA or zoneB
     bucket = get_bucket(zcA, bucketA.name)
     check_objects_not_exist(bucket, objnameB)
@@ -4960,7 +5113,6 @@ def test_delete_bucket_with_zone_opt_out():
     assert check_all_buckets_dont_exist(zcC, buckets)
 
     remove_sync_policy_group(c1, "sync-group")
-    
     return
 
 @attr('sync_policy')
@@ -5019,7 +5171,6 @@ def test_bucket_delete_with_sync_policy_object_prefix():
 
     zone_bucket_checkpoint(zoneA, zoneB, bucketA.name)
     zone_data_checkpoint(zoneB, zoneA)
-    
     # verify that objnameA is synced to zoneB
     bucket = get_bucket(zcB, bucketA.name)
     check_object_exists(bucket, objnameA)
@@ -5654,15 +5805,12 @@ def test_bucket_replication_source_allow_either_getobjectversion_or_getobjectver
 def test_bucket_replication_source_forbidden_objretention():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
-    
     source = zonegroup_conns.rw_zones[0]
     dest = zonegroup_conns.rw_zones[1]
-    
     source_bucket_name = gen_bucket_name()
     source.s3_client.create_bucket(Bucket=source_bucket_name, ObjectLockEnabledForBucket=True)
     dest_bucket = dest.create_bucket(gen_bucket_name())
     zonegroup_meta_checkpoint(zonegroup)
-    
     # create replication configuration
     source.s3_client.put_bucket_replication(
         Bucket=source_bucket_name,
@@ -5677,7 +5825,6 @@ def test_bucket_replication_source_forbidden_objretention():
             }]
         }
     )
-    
     # Deny myself from fetching the source object's retention for replication
     source.s3_client.put_bucket_policy(
         Bucket=source_bucket_name,
@@ -5692,7 +5839,6 @@ def test_bucket_replication_source_forbidden_objretention():
         })
     )
     zonegroup_meta_checkpoint(zonegroup)
-    
     # upload an object and wait for sync.
     objname = 'dummy'
     k = new_key(source, source_bucket_name, objname)
@@ -5712,15 +5858,12 @@ def test_bucket_replication_source_forbidden_objretention():
 def test_bucket_replication_source_forbidden_legalhold():
     zonegroup = realm.master_zonegroup()
     zonegroup_conns = ZonegroupConns(zonegroup)
-    
     source = zonegroup_conns.rw_zones[0]
     dest = zonegroup_conns.rw_zones[1]
-    
     source_bucket_name = gen_bucket_name()
     source.s3_client.create_bucket(Bucket=source_bucket_name, ObjectLockEnabledForBucket=True)
     dest_bucket = dest.create_bucket(gen_bucket_name())
     zonegroup_meta_checkpoint(zonegroup)
-    
     # create replication configuration
     source.s3_client.put_bucket_replication(
         Bucket=source_bucket_name,
@@ -5735,7 +5878,6 @@ def test_bucket_replication_source_forbidden_legalhold():
             }]
         }
     )
-    
     # Deny myself from fetching the source object's retention for replication
     source.s3_client.put_bucket_policy(
         Bucket=source_bucket_name,
@@ -5750,7 +5892,6 @@ def test_bucket_replication_source_forbidden_legalhold():
         })
     )
     zonegroup_meta_checkpoint(zonegroup)
-    
     # upload an object and wait for sync.
     objname = 'dummy'
     k = new_key(source, source_bucket_name, objname)
index 08c11718bd04967ccdc8e60feebf0bc852002a08..2ee2b942300362b4a5d7f3b5b53ea0698cf364ea 100644 (file)
@@ -4,7 +4,6 @@ import logging
 import boto
 import boto.s3.connection
 
-import datetime
 import dateutil
 
 from itertools import zip_longest  # type: ignore