]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: add error repo primitives with timestamp comparisons
authorCasey Bodley <cbodley@redhat.com>
Tue, 31 Mar 2020 13:22:54 +0000 (09:22 -0400)
committerCasey Bodley <cbodley@redhat.com>
Mon, 13 Apr 2020 15:06:46 +0000 (11:06 -0400)
the sync error repo stores omap keys for each datalog entry that needs a
retry. this adds a new primitive for that, that also stores a timestamp
with each omap key, and will only allow overwrites/removals when a newer
timestamp is provided

Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/CMakeLists.txt
src/rgw/rgw_sync_error_repo.cc [new file with mode: 0644]
src/rgw/rgw_sync_error_repo.h [new file with mode: 0644]

index 7cbd18da3bbadaa2bf97fd2494fe1829594ab395..4af33be7d3d89a35727857a9260b2f8c0fcc69b5 100644 (file)
@@ -85,6 +85,7 @@ set(librgw_common_srcs
   rgw_sync.cc
   rgw_data_sync.cc
   rgw_sync_counters.cc
+  rgw_sync_error_repo.cc
   rgw_sync_module.cc
   rgw_sync_module_aws.cc
   rgw_sync_module_es.cc
@@ -232,7 +233,7 @@ endif()
 target_link_libraries(rgw_a
   PRIVATE
   librados cls_otp_client cls_lock_client cls_rgw_client cls_refcount_client
-  cls_log_client cls_timeindex_client cls_version_client
+  cls_log_client cls_timeindex_client cls_version_client cls_cmpomap_client
   cls_user_client cls_rgw_gc_client ceph-common common_utf8 global
   ${CURL_LIBRARIES}
   ${EXPAT_LIBRARIES}
diff --git a/src/rgw/rgw_sync_error_repo.cc b/src/rgw/rgw_sync_error_repo.cc
new file mode 100644 (file)
index 0000000..e952ce9
--- /dev/null
@@ -0,0 +1,137 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#include "rgw_sync_error_repo.h"
+#include "rgw_coroutine.h"
+#include "rgw_sal.h"
+#include "services/svc_rados.h"
+#include "cls/cmpomap/client.h"
+
+ceph::real_time rgw_error_repo_decode_value(const bufferlist& bl)
+{
+  uint64_t value;
+  try {
+    using ceph::decode;
+    decode(value, bl);
+  } catch (const buffer::error&) {
+    value = 0; // empty buffer = 0
+  }
+  return ceph::real_clock::zero() + ceph::timespan(value);
+}
+
+int rgw_error_repo_write(librados::ObjectWriteOperation& op,
+                         const std::string& key,
+                         ceph::real_time timestamp)
+{
+  // overwrite the existing timestamp if value is greater
+  const uint64_t value = timestamp.time_since_epoch().count();
+  using namespace cls::cmpomap;
+  const bufferlist zero = u64_buffer(0); // compare against 0 for missing keys
+  return cmp_set_vals(op, Mode::U64, Op::GT, {{key, u64_buffer(value)}}, zero);
+}
+
+int rgw_error_repo_remove(librados::ObjectWriteOperation& op,
+                          const std::string& key,
+                          ceph::real_time timestamp)
+{
+  // remove the omap key if value >= existing
+  const uint64_t value = timestamp.time_since_epoch().count();
+  using namespace cls::cmpomap;
+  return cmp_rm_keys(op, Mode::U64, Op::GTE, {{key, u64_buffer(value)}});
+}
+
+class RGWErrorRepoWriteCR : public RGWSimpleCoroutine {
+  RGWSI_RADOS::Obj obj;
+  std::string key;
+  ceph::real_time timestamp;
+
+  boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+ public:
+  RGWErrorRepoWriteCR(RGWSI_RADOS* rados, const rgw_raw_obj& raw_obj,
+                      const std::string& key, ceph::real_time timestamp)
+    : RGWSimpleCoroutine(rados->ctx()),
+      obj(rados->obj(raw_obj)),
+      key(key), timestamp(timestamp)
+  {}
+
+  int send_request() override {
+    librados::ObjectWriteOperation op;
+    int r = rgw_error_repo_write(op, key, timestamp);
+    if (r < 0) {
+      return r;
+    }
+    r = obj.open();
+    if (r < 0) {
+      return r;
+    }
+
+    cn = stack->create_completion_notifier();
+    return obj.aio_operate(cn->completion(), &op);
+  }
+
+  int request_complete() override {
+    return cn->completion()->get_return_value();
+  }
+};
+
+RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados,
+                                      const rgw_raw_obj& obj,
+                                      const std::string& key,
+                                      ceph::real_time timestamp)
+{
+  return new RGWErrorRepoWriteCR(rados, obj, key, timestamp);
+}
+
+
+class RGWErrorRepoRemoveCR : public RGWSimpleCoroutine {
+  RGWSI_RADOS::Obj obj;
+  std::string key;
+  ceph::real_time timestamp;
+
+  boost::intrusive_ptr<RGWAioCompletionNotifier> cn;
+ public:
+  RGWErrorRepoRemoveCR(RGWSI_RADOS* rados, const rgw_raw_obj& raw_obj,
+                       const std::string& key, ceph::real_time timestamp)
+    : RGWSimpleCoroutine(rados->ctx()),
+      obj(rados->obj(raw_obj)),
+      key(key), timestamp(timestamp)
+  {}
+
+  int send_request() override {
+    librados::ObjectWriteOperation op;
+    int r = rgw_error_repo_remove(op, key, timestamp);
+    if (r < 0) {
+      return r;
+    }
+    r = obj.open();
+    if (r < 0) {
+      return r;
+    }
+
+    cn = stack->create_completion_notifier();
+    return obj.aio_operate(cn->completion(), &op);
+  }
+
+  int request_complete() override {
+    return cn->completion()->get_return_value();
+  }
+};
+
+RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados,
+                                       const rgw_raw_obj& obj,
+                                       const std::string& key,
+                                       ceph::real_time timestamp)
+{
+  return new RGWErrorRepoRemoveCR(rados, obj, key, timestamp);
+}
diff --git a/src/rgw/rgw_sync_error_repo.h b/src/rgw/rgw_sync_error_repo.h
new file mode 100644 (file)
index 0000000..f5169c0
--- /dev/null
@@ -0,0 +1,45 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2020 Red Hat, Inc
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software
+ * Foundation.  See file COPYING.
+ */
+
+#pragma once
+
+#include "include/rados/librados_fwd.hpp"
+#include "include/buffer_fwd.h"
+#include "common/ceph_time.h"
+
+class RGWSI_RADOS;
+class RGWCoroutine;
+struct rgw_raw_obj;
+
+// decode a timestamp as a uint64_t for CMPXATTR_MODE_U64
+ceph::real_time rgw_error_repo_decode_value(const bufferlist& bl);
+
+// write an omap key iff the given timestamp is newer
+int rgw_error_repo_write(librados::ObjectWriteOperation& op,
+                         const std::string& key,
+                         ceph::real_time timestamp);
+RGWCoroutine* rgw_error_repo_write_cr(RGWSI_RADOS* rados,
+                                      const rgw_raw_obj& obj,
+                                      const std::string& key,
+                                      ceph::real_time timestamp);
+
+// remove an omap key iff there isn't a newer timestamp
+int rgw_error_repo_remove(librados::ObjectWriteOperation& op,
+                          const std::string& key,
+                          ceph::real_time timestamp);
+RGWCoroutine* rgw_error_repo_remove_cr(RGWSI_RADOS* rados,
+                                       const rgw_raw_obj& obj,
+                                       const std::string& key,
+                                       ceph::real_time timestamp);
+