]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
RGW:Multisite: Create a new filter for ETag Verifier
authorPrasad Krishnan <prasad.krishnan@flipkart.com>
Mon, 16 Mar 2020 18:45:36 +0000 (18:45 +0000)
committerNathan Cutler <ncutler@suse.com>
Thu, 28 Jan 2021 15:45:32 +0000 (16:45 +0100)
This patch re-writes the ETag verifier into a filter that peeks into the
incoming stream of data and calculates MD5 checksum.

Signed-off-by: Prasad Krishnan <prasad.krishnan@flipkart.com>
(cherry picked from commit 2677c4b88806d4af6d525157e7006c1b0ca1b964)

src/rgw/CMakeLists.txt
src/rgw/rgw_etag_verifier.cc [new file with mode: 0644]
src/rgw/rgw_etag_verifier.h [new file with mode: 0644]
src/rgw/rgw_obj_manifest.h
src/rgw/rgw_rados.cc

index 95145ac6e2643f2d68dc8557ab576ecec7e8ed44..22583b8af64a55202941803edc5c29780a94a654 100644 (file)
@@ -62,6 +62,7 @@ set(librgw_common_srcs
   rgw_cache.cc
   rgw_common.cc
   rgw_compression.cc
+  rgw_etag_verifier.cc
   rgw_cors.cc
   rgw_cors_s3.cc
   rgw_dencoder.cc
diff --git a/src/rgw/rgw_etag_verifier.cc b/src/rgw/rgw_etag_verifier.cc
new file mode 100644 (file)
index 0000000..2b79254
--- /dev/null
@@ -0,0 +1,111 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+#include "rgw_etag_verifier.h"
+
+#define dout_subsys ceph_subsys_rgw
+
+int RGWPutObj_ETagVerifier_Atomic::process(bufferlist&& in, uint64_t logical_offset)
+{
+  bufferlist out;
+  if (in.length() > 0)
+    hash.Update((const unsigned char *)in.c_str(), in.length());
+
+  return Pipe::process(std::move(in), logical_offset);
+}
+
+void RGWPutObj_ETagVerifier_Atomic::calculate_etag()
+{
+  unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
+  char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
+
+  /* Return early if ETag has already been calculated */
+  if (!calculated_etag.empty())
+    return;
+
+  hash.Final(m);
+  buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
+  ldout(cct, 20) << "Single part object: " << " etag:" << calculated_etag
+          << dendl;
+  calculated_etag = calc_md5;
+}
+
+void RGWPutObj_ETagVerifier_MPU::process_end_of_MPU_part(bufferlist in)
+{
+  unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE];
+  char calc_md5_part[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
+  std::string calculated_etag_part;
+
+  hash.Final(m);
+  mpu_etag_hash.Update((const unsigned char *)m, sizeof(m));
+  hash.Restart();
+
+  /* Debugging begin */
+  buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5_part);
+  calculated_etag_part = calc_md5_part;
+  ldout(cct, 20) << "Part etag: " << calculated_etag_part << dendl;
+  /* Debugging end */
+
+  cur_part_index++;
+  next_part_index++;
+}
+
+int RGWPutObj_ETagVerifier_MPU::process(bufferlist&& in, uint64_t logical_offset)
+{
+  uint64_t bl_end = in.length() + logical_offset;
+
+  /* Handle the last MPU part */
+  if (next_part_index == part_ofs.size()) {
+    hash.Update((const unsigned char *)in.c_str(), in.length());
+    goto done;
+  }
+
+  /* Incoming bufferlist spans two MPU parts. Calculate separate ETags */
+  if (bl_end > part_ofs[next_part_index]) {
+
+    uint64_t part_one_len = part_ofs[next_part_index] - logical_offset;
+    hash.Update((const unsigned char *)in.c_str(), part_one_len);
+    process_end_of_MPU_part(in);
+
+    hash.Update((const unsigned char *)in.c_str() + part_one_len,
+      bl_end - part_ofs[cur_part_index]);
+    /*
+     * If we've moved to the last part of the MPU, avoid usage of
+     * parts_ofs[next_part_index] as it will lead to our-of-range access.
+     */
+    if (next_part_index == part_ofs.size())
+      goto done;
+  } else {
+    hash.Update((const unsigned char *)in.c_str(), in.length());
+  }
+
+  /* Update the MPU Etag if the current part has ended */
+  if (logical_offset + in.length() + 1 == part_ofs[next_part_index])
+    process_end_of_MPU_part(in);
+
+done:
+  return Pipe::process(std::move(in), logical_offset);
+}
+
+void RGWPutObj_ETagVerifier_MPU::calculate_etag()
+{
+  unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE], mpu_m[CEPH_CRYPTO_MD5_DIGESTSIZE];
+  char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
+
+  /* Return early if ETag has already been calculated */
+  if (!calculated_etag.empty())
+    return;
+
+  hash.Final(m);
+  mpu_etag_hash.Update((const unsigned char *)m, sizeof(m));
+
+  /* Refer RGWCompleteMultipart::execute() for ETag calculation for MPU object */
+  mpu_etag_hash.Final(mpu_m);
+  buf_to_hex(mpu_m, CEPH_CRYPTO_MD5_DIGESTSIZE, final_etag_str);
+  snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2],
+           sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
+           "-%lld", (long long)(part_ofs.size()));
+
+  ldout(cct, 20) << "MPU calculated ETag:" << calculated_etag << dendl;
+  calculated_etag = final_etag_str;
+}
diff --git a/src/rgw/rgw_etag_verifier.h b/src/rgw/rgw_etag_verifier.h
new file mode 100644 (file)
index 0000000..5c777a8
--- /dev/null
@@ -0,0 +1,68 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab ft=cpp
+
+/*
+ * RGW Etag Verifier is an RGW filter which enables the objects copied using
+ * multisite sync to be verified using their ETag from source i.e. the MD5
+ * checksum of the object is computed at the destination and is verified to be
+ * identical to the ETag stored in the object HEAD at source cluster.
+ * 
+ * For MPU objects, a different filter named RGWMultipartEtagFilter is applied
+ * which re-computes ETag using RGWObjManifest. This computes the ETag using the
+ * same algorithm used at the source cluster i.e. MD5 sum of the individual ETag
+ * on the MPU parts.
+ */
+#ifndef CEPH_RGW_ETAG_VERIFIER_H
+#define CEPH_RGW_ETAG_VERIFIER_H
+
+#include "rgw_putobj.h"
+#include "rgw_op.h"
+
+class RGWPutObj_ETagVerifier : public rgw::putobj::Pipe
+{
+protected:
+  CephContext* cct;
+  MD5 hash;
+  string calculated_etag;
+
+public:
+  RGWPutObj_ETagVerifier(CephContext* cct_, rgw::putobj::DataProcessor *next)
+    : Pipe(next), cct(cct_) {}
+
+  virtual void calculate_etag() = 0;
+  string get_calculated_etag() { return calculated_etag;}
+  virtual void append_part_ofs(uint64_t ofs) = 0;
+
+}; /* RGWPutObj_ETagVerifier */
+
+class RGWPutObj_ETagVerifier_Atomic : public RGWPutObj_ETagVerifier
+{
+public:
+  RGWPutObj_ETagVerifier_Atomic(CephContext* cct_, rgw::putobj::DataProcessor *next)
+    : RGWPutObj_ETagVerifier(cct_, next) {}
+
+  int process(bufferlist&& data, uint64_t logical_offset) override;
+  void calculate_etag() override;
+  void append_part_ofs(uint64_t ofs) override {}
+
+}; /* RGWPutObj_ETagVerifier_Atomic */
+
+class RGWPutObj_ETagVerifier_MPU : public RGWPutObj_ETagVerifier
+{
+  std::vector<uint64_t> part_ofs;
+  int cur_part_index{0}, next_part_index{1};
+  MD5 mpu_etag_hash;
+  void process_end_of_MPU_part(bufferlist in);
+
+public:
+  RGWPutObj_ETagVerifier_MPU(CephContext* cct_, rgw::putobj::DataProcessor *next)
+    : RGWPutObj_ETagVerifier(cct_, next) {}
+
+  int process(bufferlist&& data, uint64_t logical_offset) override;
+  void calculate_etag() override;
+  void append_part_ofs(uint64_t ofs) override { part_ofs.emplace_back(ofs); }
+
+}; /* RGWPutObj_ETagVerifier_MPU */
+
+#endif /* CEPH_RGW_ETAG_VERIFIER_H */
index 227eaff50f200078c733b97e0e623f21464b429b..0a6dfa67d6e78bf35f64440e55f87709fdc36dba 100644 (file)
@@ -466,8 +466,9 @@ public:
       return location;
     }
 
-    int get_cur_part_id() const {
-      return cur_part_id;
+    /* where current part starts */
+    uint64_t get_part_ofs() const {
+      return part_ofs;
     }
 
     /* start of current stripe */
index b46c2e24eb207f15551baf56a6bbd7f681a9be34..d2cbc1ab015e169b2845c7f18fc76c7d0a900b55 100644 (file)
@@ -43,6 +43,7 @@
 #include "rgw_tools.h"
 #include "rgw_coroutine.h"
 #include "rgw_compression.h"
+#include "rgw_etag_verifier.h"
 #include "rgw_worker.h"
 
 #undef fork // fails to compile RGWPeriod::fork() below
@@ -3256,20 +3257,21 @@ class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB
   rgw_obj obj;
   rgw::putobj::DataProcessor *filter;
   boost::optional<RGWPutObj_Compress>& compressor;
+  boost::optional<RGWPutObj_ETagVerifier_Atomic> etag_verifier_atomic;
+  boost::optional<RGWPutObj_ETagVerifier_MPU> etag_verifier_mpu;
   boost::optional<rgw::putobj::ChunkProcessor> buffering;
   CompressorRef& plugin;
   rgw::putobj::ObjectProcessor *processor;
   void (*progress_cb)(off_t, void *);
   void *progress_data;
-  bufferlist extra_data_bl, full_obj, manifest_bl;
+  bufferlist extra_data_bl, manifest_bl;
   uint64_t extra_data_left{0};
-  bool need_to_process_attrs{true};
+  bool need_to_process_attrs{true}, is_mpu_obj{false};
   uint64_t data_len{0};
   map<string, bufferlist> src_attrs;
   uint64_t ofs{0};
   uint64_t lofs{0}; /* logical ofs */
   std::function<int(map<string, bufferlist>&)> attrs_handler;
-  string calculated_etag;
 public:
   RGWRadosPutObj(CephContext* cct,
                  CompressorRef& plugin,
@@ -3327,6 +3329,58 @@ public:
       filter = &*buffering;
     }
 
+    /*
+     * Presently we don't support ETag based verification if compression or
+     * encryption is requested. We can enable simultaneous support once we have
+     * a mechanism to know the sequence in which the filters must be applied.
+     */
+    if (cct->_conf->rgw_copy_verify_object && !plugin &&
+        src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
+
+      RGWObjManifest manifest;
+
+      auto miter = manifest_bl.cbegin();
+      try {
+        decode(manifest, miter);
+      } catch (buffer::error& err) {
+        ldout(cct, 0) << "ERROR: couldn't decode manifest" << dendl;
+        return -EIO;
+      }
+
+      RGWObjManifestRule rule;
+      bool found = manifest.get_rule(0, &rule);
+      if (!found) {
+        lderr(cct) << "ERROR: manifest->get_rule() could not find rule" << dendl;
+        return -EIO;
+      }
+
+      if (rule.part_size == 0) {
+        /* Atomic object */
+        etag_verifier_atomic = boost::in_place(cct, filter);
+        filter = &*etag_verifier_atomic;
+      } else {
+        is_mpu_obj = true;
+        etag_verifier_mpu = boost::in_place(cct, filter);
+
+        RGWObjManifest::obj_iterator mi;
+        uint64_t cur_part_ofs = UINT64_MAX;
+
+        /*
+         * We must store the offset of each part to calculate the ETAGs for each
+         * MPU part. These part ETags then become the input for the MPU object
+         * Etag.
+         */
+        for (mi = manifest.obj_begin(); mi != manifest.obj_end(); ++mi) {
+          if (cur_part_ofs == mi.get_part_ofs())
+            continue;
+          cur_part_ofs = mi.get_part_ofs();
+          ldout(cct, 20) << "MPU Part offset:" << cur_part_ofs << dendl;
+          etag_verifier_mpu->append_part_ofs(cur_part_ofs);
+        }
+        filter = &*etag_verifier_mpu;
+      }
+    }
+
     need_to_process_attrs = false;
 
     return 0;
@@ -3373,8 +3427,6 @@ public:
 
     const uint64_t lofs = data_len;
     data_len += size;
-    if (cct->_conf->rgw_copy_verify_object)
-      full_obj.append(bl);
 
     return filter->process(std::move(bl), lofs);
   }
@@ -3397,84 +3449,13 @@ public:
   }
 
   string get_calculated_etag() {
-    MD5 hash, mpu_etag_hash;
-    unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE], mpu_m[CEPH_CRYPTO_MD5_DIGESTSIZE];
-    char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
-    char calc_md5_part[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1];
-    char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16];
-    RGWObjManifest manifest;
-    std::string calculated_etag_part;
-    int prev_part_id = 1, cur_part_id = 1;
-
-    auto miter = manifest_bl.cbegin();
-    try {
-      decode(manifest, miter);
-    } catch (buffer::error& err) {
-      ldout(cct, 0) << "ERROR: couldn't decode manifest" << dendl;
-      return std::string();
-    }
-    RGWObjManifest::obj_iterator mi;
-
-    RGWObjManifestRule rule;
-    bool found = manifest.get_rule(0, &rule);
-    if (!found) {
-      lderr(cct) << "ERROR: manifest->get_rule() could not find rule" << dendl;
-      return std::string();
-    }
-
-    /*
-     * Check if the object was created using multipart upload. This check is
-     * required as MPU objects' ETag != MD5sum.
-     */
-    if (rule.part_size == 0) {
-      hash.Update((const unsigned char *)full_obj.c_str(), data_len);
-      hash.Final(m);
-      buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5);
-      calculated_etag = calc_md5;
-      ldout(cct, 20) << "Single part object: " << manifest.get_obj().get_oid() <<
-             " size:" << manifest.get_obj_size() << " etag:" <<
-             calculated_etag << dendl;
-      return calculated_etag;
-    }
-
-    for (mi = manifest.obj_begin(); mi != manifest.obj_end(); ++mi) {
-      bufferlist obj_stripe;
-
-      cur_part_id = mi.get_cur_part_id();
-      if (cur_part_id != prev_part_id) {
-        hash.Final(m);
-        mpu_etag_hash.Update((const unsigned char *)m, sizeof(m));
-       hash.Restart();
-        buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5_part);
-        calculated_etag_part = calc_md5_part;
-        ldout(cct, 20) << "Part " << prev_part_id << " etag: " <<
-              calculated_etag_part << dendl;
-       prev_part_id = cur_part_id;
-      }
-      full_obj.splice(0, mi.get_stripe_size(), &obj_stripe);
-      hash.Update((const unsigned char *)obj_stripe.c_str(), mi.get_stripe_size());
+    if (is_mpu_obj) {
+      etag_verifier_mpu->calculate_etag();
+      return etag_verifier_mpu->get_calculated_etag();
+    } else {
+      etag_verifier_atomic->calculate_etag();
+      return etag_verifier_atomic->get_calculated_etag();
     }
-
-    hash.Final(m);
-    mpu_etag_hash.Update((const unsigned char *)m, sizeof(m));
-
-    buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5_part);
-    calculated_etag_part = calc_md5_part;
-    ldout(cct, 20) << "Part " << prev_part_id << " etag: " << calculated_etag_part << dendl;
-
-    /* Refer RGWCompleteMultipart::execute() for ETag calculation for MPU object */
-    mpu_etag_hash.Final(mpu_m);
-    buf_to_hex(mpu_m, CEPH_CRYPTO_MD5_DIGESTSIZE, final_etag_str);
-    snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2],
-            sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2,
-             "-%lld", (long long)(mi.get_cur_part_id() - 1));
-    calculated_etag = final_etag_str;
-
-    ldout(cct, 20) << "MPU calculated ETag:"<< calculated_etag << " Object:" <<
-          manifest.get_obj().get_oid() <<
-          " size:" << manifest.get_obj_size() << dendl;
-    return calculated_etag;
-
   }
 };
 
@@ -4040,10 +4021,16 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
     }
 
     if (cct->_conf->rgw_copy_verify_object) {
-      if (cb.get_calculated_etag().compare(etag)) {
+      string trimmed_etag = etag;
+
+      /* Remove the leading and trailing double quotes from etag */
+      trimmed_etag.erase(std::remove(trimmed_etag.begin(), trimmed_etag.end(),'\"'),
+        trimmed_etag.end());
+
+      if (cb.get_calculated_etag().compare(trimmed_etag)) {
         ret = -EIO;
         ldout(cct, 0) << "ERROR: source and destination objects don't match. Expected etag:"
-          << etag << " Computed etag:" << cb.get_calculated_etag() << dendl;
+          << trimmed_etag << " Computed etag:" << cb.get_calculated_etag() << dendl;
         goto set_err_state;
      }
    }