From: Prasad Krishnan Date: Mon, 16 Mar 2020 18:45:36 +0000 (+0000) Subject: RGW:Multisite: Create a new filter for ETag Verifier X-Git-Tag: v15.2.9~47^2~11 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=524dca1dab05ff59c44bec23444ebd0e1c55af7b;p=ceph.git RGW:Multisite: Create a new filter for ETag Verifier This patch re-writes the ETag verifier into a filter that peeks into the incoming stream of data and calculates MD5 checksum. Signed-off-by: Prasad Krishnan (cherry picked from commit 2677c4b88806d4af6d525157e7006c1b0ca1b964) --- diff --git a/src/rgw/CMakeLists.txt b/src/rgw/CMakeLists.txt index 95145ac6e264..22583b8af64a 100644 --- a/src/rgw/CMakeLists.txt +++ b/src/rgw/CMakeLists.txt @@ -62,6 +62,7 @@ set(librgw_common_srcs rgw_cache.cc rgw_common.cc rgw_compression.cc + rgw_etag_verifier.cc rgw_cors.cc rgw_cors_s3.cc rgw_dencoder.cc diff --git a/src/rgw/rgw_etag_verifier.cc b/src/rgw/rgw_etag_verifier.cc new file mode 100644 index 000000000000..2b79254c147b --- /dev/null +++ b/src/rgw/rgw_etag_verifier.cc @@ -0,0 +1,111 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +#include "rgw_etag_verifier.h" + +#define dout_subsys ceph_subsys_rgw + +int RGWPutObj_ETagVerifier_Atomic::process(bufferlist&& in, uint64_t logical_offset) +{ + bufferlist out; + if (in.length() > 0) + hash.Update((const unsigned char *)in.c_str(), in.length()); + + return Pipe::process(std::move(in), logical_offset); +} + +void RGWPutObj_ETagVerifier_Atomic::calculate_etag() +{ + unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE]; + char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; + + /* Return early if ETag has already been calculated */ + if (!calculated_etag.empty()) + return; + + hash.Final(m); + buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5); + ldout(cct, 20) << "Single part object: " << " etag:" << calculated_etag + << dendl; + calculated_etag = calc_md5; +} + +void RGWPutObj_ETagVerifier_MPU::process_end_of_MPU_part(bufferlist in) +{ + unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE]; + char calc_md5_part[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; + std::string calculated_etag_part; + + hash.Final(m); + mpu_etag_hash.Update((const unsigned char *)m, sizeof(m)); + hash.Restart(); + + /* Debugging begin */ + buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5_part); + calculated_etag_part = calc_md5_part; + ldout(cct, 20) << "Part etag: " << calculated_etag_part << dendl; + /* Debugging end */ + + cur_part_index++; + next_part_index++; +} + +int RGWPutObj_ETagVerifier_MPU::process(bufferlist&& in, uint64_t logical_offset) +{ + uint64_t bl_end = in.length() + logical_offset; + + /* Handle the last MPU part */ + if (next_part_index == part_ofs.size()) { + hash.Update((const unsigned char *)in.c_str(), in.length()); + goto done; + } + + /* Incoming bufferlist spans two MPU parts. Calculate separate ETags */ + if (bl_end > part_ofs[next_part_index]) { + + uint64_t part_one_len = part_ofs[next_part_index] - logical_offset; + hash.Update((const unsigned char *)in.c_str(), part_one_len); + process_end_of_MPU_part(in); + + hash.Update((const unsigned char *)in.c_str() + part_one_len, + bl_end - part_ofs[cur_part_index]); + /* + * If we've moved to the last part of the MPU, avoid usage of + * parts_ofs[next_part_index] as it will lead to our-of-range access. + */ + if (next_part_index == part_ofs.size()) + goto done; + } else { + hash.Update((const unsigned char *)in.c_str(), in.length()); + } + + /* Update the MPU Etag if the current part has ended */ + if (logical_offset + in.length() + 1 == part_ofs[next_part_index]) + process_end_of_MPU_part(in); + +done: + return Pipe::process(std::move(in), logical_offset); +} + +void RGWPutObj_ETagVerifier_MPU::calculate_etag() +{ + unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE], mpu_m[CEPH_CRYPTO_MD5_DIGESTSIZE]; + char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; + + /* Return early if ETag has already been calculated */ + if (!calculated_etag.empty()) + return; + + hash.Final(m); + mpu_etag_hash.Update((const unsigned char *)m, sizeof(m)); + + /* Refer RGWCompleteMultipart::execute() for ETag calculation for MPU object */ + mpu_etag_hash.Final(mpu_m); + buf_to_hex(mpu_m, CEPH_CRYPTO_MD5_DIGESTSIZE, final_etag_str); + snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], + sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, + "-%lld", (long long)(part_ofs.size())); + + ldout(cct, 20) << "MPU calculated ETag:" << calculated_etag << dendl; + calculated_etag = final_etag_str; +} diff --git a/src/rgw/rgw_etag_verifier.h b/src/rgw/rgw_etag_verifier.h new file mode 100644 index 000000000000..5c777a8932ee --- /dev/null +++ b/src/rgw/rgw_etag_verifier.h @@ -0,0 +1,68 @@ +// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- +// vim: ts=8 sw=2 smarttab ft=cpp + +/* + * RGW Etag Verifier is an RGW filter which enables the objects copied using + * multisite sync to be verified using their ETag from source i.e. the MD5 + * checksum of the object is computed at the destination and is verified to be + * identical to the ETag stored in the object HEAD at source cluster. + * + * For MPU objects, a different filter named RGWMultipartEtagFilter is applied + * which re-computes ETag using RGWObjManifest. This computes the ETag using the + * same algorithm used at the source cluster i.e. MD5 sum of the individual ETag + * on the MPU parts. + */ +#ifndef CEPH_RGW_ETAG_VERIFIER_H +#define CEPH_RGW_ETAG_VERIFIER_H + +#include "rgw_putobj.h" +#include "rgw_op.h" + +class RGWPutObj_ETagVerifier : public rgw::putobj::Pipe +{ +protected: + CephContext* cct; + MD5 hash; + string calculated_etag; + +public: + RGWPutObj_ETagVerifier(CephContext* cct_, rgw::putobj::DataProcessor *next) + : Pipe(next), cct(cct_) {} + + virtual void calculate_etag() = 0; + string get_calculated_etag() { return calculated_etag;} + virtual void append_part_ofs(uint64_t ofs) = 0; + +}; /* RGWPutObj_ETagVerifier */ + +class RGWPutObj_ETagVerifier_Atomic : public RGWPutObj_ETagVerifier +{ +public: + RGWPutObj_ETagVerifier_Atomic(CephContext* cct_, rgw::putobj::DataProcessor *next) + : RGWPutObj_ETagVerifier(cct_, next) {} + + int process(bufferlist&& data, uint64_t logical_offset) override; + void calculate_etag() override; + void append_part_ofs(uint64_t ofs) override {} + +}; /* RGWPutObj_ETagVerifier_Atomic */ + +class RGWPutObj_ETagVerifier_MPU : public RGWPutObj_ETagVerifier +{ + std::vector part_ofs; + int cur_part_index{0}, next_part_index{1}; + MD5 mpu_etag_hash; + + void process_end_of_MPU_part(bufferlist in); + +public: + RGWPutObj_ETagVerifier_MPU(CephContext* cct_, rgw::putobj::DataProcessor *next) + : RGWPutObj_ETagVerifier(cct_, next) {} + + int process(bufferlist&& data, uint64_t logical_offset) override; + void calculate_etag() override; + void append_part_ofs(uint64_t ofs) override { part_ofs.emplace_back(ofs); } + +}; /* RGWPutObj_ETagVerifier_MPU */ + +#endif /* CEPH_RGW_ETAG_VERIFIER_H */ diff --git a/src/rgw/rgw_obj_manifest.h b/src/rgw/rgw_obj_manifest.h index 227eaff50f20..0a6dfa67d6e7 100644 --- a/src/rgw/rgw_obj_manifest.h +++ b/src/rgw/rgw_obj_manifest.h @@ -466,8 +466,9 @@ public: return location; } - int get_cur_part_id() const { - return cur_part_id; + /* where current part starts */ + uint64_t get_part_ofs() const { + return part_ofs; } /* start of current stripe */ diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index b46c2e24eb20..d2cbc1ab015e 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -43,6 +43,7 @@ #include "rgw_tools.h" #include "rgw_coroutine.h" #include "rgw_compression.h" +#include "rgw_etag_verifier.h" #include "rgw_worker.h" #undef fork // fails to compile RGWPeriod::fork() below @@ -3256,20 +3257,21 @@ class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB rgw_obj obj; rgw::putobj::DataProcessor *filter; boost::optional& compressor; + boost::optional etag_verifier_atomic; + boost::optional etag_verifier_mpu; boost::optional buffering; CompressorRef& plugin; rgw::putobj::ObjectProcessor *processor; void (*progress_cb)(off_t, void *); void *progress_data; - bufferlist extra_data_bl, full_obj, manifest_bl; + bufferlist extra_data_bl, manifest_bl; uint64_t extra_data_left{0}; - bool need_to_process_attrs{true}; + bool need_to_process_attrs{true}, is_mpu_obj{false}; uint64_t data_len{0}; map src_attrs; uint64_t ofs{0}; uint64_t lofs{0}; /* logical ofs */ std::function&)> attrs_handler; - string calculated_etag; public: RGWRadosPutObj(CephContext* cct, CompressorRef& plugin, @@ -3327,6 +3329,58 @@ public: filter = &*buffering; } + /* + * Presently we don't support ETag based verification if compression or + * encryption is requested. We can enable simultaneous support once we have + * a mechanism to know the sequence in which the filters must be applied. + */ + if (cct->_conf->rgw_copy_verify_object && !plugin && + src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) { + + RGWObjManifest manifest; + + auto miter = manifest_bl.cbegin(); + try { + decode(manifest, miter); + } catch (buffer::error& err) { + ldout(cct, 0) << "ERROR: couldn't decode manifest" << dendl; + return -EIO; + } + + RGWObjManifestRule rule; + bool found = manifest.get_rule(0, &rule); + if (!found) { + lderr(cct) << "ERROR: manifest->get_rule() could not find rule" << dendl; + return -EIO; + } + + if (rule.part_size == 0) { + /* Atomic object */ + etag_verifier_atomic = boost::in_place(cct, filter); + filter = &*etag_verifier_atomic; + } else { + is_mpu_obj = true; + etag_verifier_mpu = boost::in_place(cct, filter); + + RGWObjManifest::obj_iterator mi; + uint64_t cur_part_ofs = UINT64_MAX; + + /* + * We must store the offset of each part to calculate the ETAGs for each + * MPU part. These part ETags then become the input for the MPU object + * Etag. + */ + for (mi = manifest.obj_begin(); mi != manifest.obj_end(); ++mi) { + if (cur_part_ofs == mi.get_part_ofs()) + continue; + cur_part_ofs = mi.get_part_ofs(); + ldout(cct, 20) << "MPU Part offset:" << cur_part_ofs << dendl; + etag_verifier_mpu->append_part_ofs(cur_part_ofs); + } + filter = &*etag_verifier_mpu; + } + } + need_to_process_attrs = false; return 0; @@ -3373,8 +3427,6 @@ public: const uint64_t lofs = data_len; data_len += size; - if (cct->_conf->rgw_copy_verify_object) - full_obj.append(bl); return filter->process(std::move(bl), lofs); } @@ -3397,84 +3449,13 @@ public: } string get_calculated_etag() { - MD5 hash, mpu_etag_hash; - unsigned char m[CEPH_CRYPTO_MD5_DIGESTSIZE], mpu_m[CEPH_CRYPTO_MD5_DIGESTSIZE]; - char calc_md5[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; - char calc_md5_part[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 1]; - char final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2 + 16]; - RGWObjManifest manifest; - std::string calculated_etag_part; - int prev_part_id = 1, cur_part_id = 1; - - auto miter = manifest_bl.cbegin(); - try { - decode(manifest, miter); - } catch (buffer::error& err) { - ldout(cct, 0) << "ERROR: couldn't decode manifest" << dendl; - return std::string(); - } - RGWObjManifest::obj_iterator mi; - - RGWObjManifestRule rule; - bool found = manifest.get_rule(0, &rule); - if (!found) { - lderr(cct) << "ERROR: manifest->get_rule() could not find rule" << dendl; - return std::string(); - } - - /* - * Check if the object was created using multipart upload. This check is - * required as MPU objects' ETag != MD5sum. - */ - if (rule.part_size == 0) { - hash.Update((const unsigned char *)full_obj.c_str(), data_len); - hash.Final(m); - buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5); - calculated_etag = calc_md5; - ldout(cct, 20) << "Single part object: " << manifest.get_obj().get_oid() << - " size:" << manifest.get_obj_size() << " etag:" << - calculated_etag << dendl; - return calculated_etag; - } - - for (mi = manifest.obj_begin(); mi != manifest.obj_end(); ++mi) { - bufferlist obj_stripe; - - cur_part_id = mi.get_cur_part_id(); - if (cur_part_id != prev_part_id) { - hash.Final(m); - mpu_etag_hash.Update((const unsigned char *)m, sizeof(m)); - hash.Restart(); - buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5_part); - calculated_etag_part = calc_md5_part; - ldout(cct, 20) << "Part " << prev_part_id << " etag: " << - calculated_etag_part << dendl; - prev_part_id = cur_part_id; - } - full_obj.splice(0, mi.get_stripe_size(), &obj_stripe); - hash.Update((const unsigned char *)obj_stripe.c_str(), mi.get_stripe_size()); + if (is_mpu_obj) { + etag_verifier_mpu->calculate_etag(); + return etag_verifier_mpu->get_calculated_etag(); + } else { + etag_verifier_atomic->calculate_etag(); + return etag_verifier_atomic->get_calculated_etag(); } - - hash.Final(m); - mpu_etag_hash.Update((const unsigned char *)m, sizeof(m)); - - buf_to_hex(m, CEPH_CRYPTO_MD5_DIGESTSIZE, calc_md5_part); - calculated_etag_part = calc_md5_part; - ldout(cct, 20) << "Part " << prev_part_id << " etag: " << calculated_etag_part << dendl; - - /* Refer RGWCompleteMultipart::execute() for ETag calculation for MPU object */ - mpu_etag_hash.Final(mpu_m); - buf_to_hex(mpu_m, CEPH_CRYPTO_MD5_DIGESTSIZE, final_etag_str); - snprintf(&final_etag_str[CEPH_CRYPTO_MD5_DIGESTSIZE * 2], - sizeof(final_etag_str) - CEPH_CRYPTO_MD5_DIGESTSIZE * 2, - "-%lld", (long long)(mi.get_cur_part_id() - 1)); - calculated_etag = final_etag_str; - - ldout(cct, 20) << "MPU calculated ETag:"<< calculated_etag << " Object:" << - manifest.get_obj().get_oid() << - " size:" << manifest.get_obj_size() << dendl; - return calculated_etag; - } }; @@ -4040,10 +4021,16 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx, } if (cct->_conf->rgw_copy_verify_object) { - if (cb.get_calculated_etag().compare(etag)) { + string trimmed_etag = etag; + + /* Remove the leading and trailing double quotes from etag */ + trimmed_etag.erase(std::remove(trimmed_etag.begin(), trimmed_etag.end(),'\"'), + trimmed_etag.end()); + + if (cb.get_calculated_etag().compare(trimmed_etag)) { ret = -EIO; ldout(cct, 0) << "ERROR: source and destination objects don't match. Expected etag:" - << etag << " Computed etag:" << cb.get_calculated_etag() << dendl; + << trimmed_etag << " Computed etag:" << cb.get_calculated_etag() << dendl; goto set_err_state; } }