]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: aws sync: check that source object doesn't change
authorYehuda Sadeh <yehuda@redhat.com>
Thu, 26 Oct 2017 00:56:57 +0000 (17:56 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 10 Apr 2018 15:05:39 +0000 (08:05 -0700)
Make suret that while syncing the object it doesn't change, which can
be a problem when uploading the object piecemeal.

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_cr_rest.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h
src/rgw/rgw_sync_module.cc
src/rgw/rgw_sync_module.h
src/rgw/rgw_sync_module_aws.cc
src/rgw/rgw_sync_module_aws.h

index eca59a7f6f7a287b354b44c43809443aa4615171..6d21be1dee69553d3ed11101844f251879e9ad85 100644 (file)
@@ -618,7 +618,7 @@ int RGWAsyncStatRemoteObj::_send_request()
                        pattrs,
                        nullptr,
                        nullptr, /* string *ptag, */
-                       nullptr); /* string *petag, */
+                       petag); /* string *petag, */
 
   if (r < 0) {
     ldout(store->ctx(), 0) << "store->fetch_remote_obj() returned r=" << r << dendl;
index 20db87508d6b08e46ccf47154333bc81d44dc3e6..80b96a3056a6c9349b09e7fddaf55e8670a07e62 100644 (file)
@@ -830,6 +830,7 @@ class RGWAsyncStatRemoteObj : public RGWAsyncRadosRequest {
 
   ceph::real_time *pmtime;
   uint64_t *psize;
+  string *petag;
   map<string, bufferlist> *pattrs;
 
 protected:
@@ -841,12 +842,14 @@ public:
                          const rgw_obj_key& _key,
                          ceph::real_time *_pmtime,
                          uint64_t *_psize,
+                         string *_petag,
                          map<string, bufferlist> *_pattrs) : RGWAsyncRadosRequest(caller, cn), store(_store),
                                                       source_zone(_source_zone),
                                                       bucket_info(_bucket_info),
                                                       key(_key),
                                                       pmtime(_pmtime),
                                                       psize(_psize),
+                                                      petag(_petag),
                                                       pattrs(_pattrs) {}
 };
 
@@ -862,6 +865,7 @@ class RGWStatRemoteObjCR : public RGWSimpleCoroutine {
 
   ceph::real_time *pmtime;
   uint64_t *psize;
+  string *petag;
   map<string, bufferlist> *pattrs;
 
   RGWAsyncStatRemoteObj *req;
@@ -873,6 +877,7 @@ public:
                       const rgw_obj_key& _key,
                       ceph::real_time *_pmtime,
                       uint64_t *_psize,
+                      string *_petag,
                       map<string, bufferlist> *_pattrs) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
                                        async_rados(_async_rados), store(_store),
                                        source_zone(_source_zone),
@@ -880,6 +885,7 @@ public:
                                        key(_key),
                                        pmtime(_pmtime),
                                        psize(_psize),
+                                       petag(_petag),
                                        pattrs(_pattrs),
                                        req(NULL) {}
 
@@ -897,7 +903,7 @@ public:
 
   int send_request() override {
     req = new RGWAsyncStatRemoteObj(this, stack->create_completion_notifier(), store, source_zone,
-                                    bucket_info, key, pmtime, psize, pattrs);
+                                    bucket_info, key, pmtime, psize, petag, pattrs);
     async_rados->queue(req);
     return 0;
   }
index 72767ebbf5c8e873015eba114e99e5cba129abf0..005ed10cbac77bd2c43dfb98bb0f1d180d047d61 100644 (file)
@@ -377,6 +377,9 @@ protected:
     uint64_t size;
   } range;
 
+  ceph::real_time mtime;
+  string etag;
+
 public:
   RGWStreamReadHTTPResourceCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
index 8fdb88fc530ff61f112c3fe89bea81430d890545..43908c85f2ba4c81434402c869ef53cbda10080e 100644 (file)
@@ -7571,6 +7571,10 @@ struct obj_time_weight {
     if (l < r) {
       return true;
     }
+    if (!zone_short_id || !rhs.zone_short_id) {
+      /* don't compare zone ids, if one wasn't provided */
+      return false;
+    }
     if (zone_short_id != rhs.zone_short_id) {
       return (zone_short_id < rhs.zone_short_id);
     }
@@ -7588,6 +7592,10 @@ struct obj_time_weight {
     if (mtime < rhs.mtime) {
       return true;
     }
+    if (!zone_short_id || !rhs.zone_short_id) {
+      /* don't compare zone ids, if one wasn't provided */
+      return false;
+    }
     if (zone_short_id != rhs.zone_short_id) {
       return (zone_short_id < rhs.zone_short_id);
     }
index f35d119d76349c6d9e1aab7b7e3fa10ce77220ff..0bfce1af303557e089e3e10e70f9b3c680ccb4eb 100644 (file)
@@ -169,14 +169,18 @@ int RGWRESTConn::complete_request(RGWRESTStreamS3PutObj *req, string& etag, real
   return ret;
 }
 
-static void set_date_header(const real_time *t, map<string, string>& headers, const string& header_name)
+static void set_date_header(const real_time *t, map<string, string>& headers, bool high_precision_time, const string& header_name)
 {
   if (!t) {
     return;
   }
   stringstream s;
   utime_t tm = utime_t(*t);
-  tm.gmtime_nsec(s);
+  if (high_precision_time) {
+    tm.gmtime_nsec(s);
+  } else {
+    tm.gmtime(s);
+  }
   headers[header_name] = s.str();
 }
 
@@ -256,8 +260,11 @@ int RGWRESTConn::get_obj(const rgw_obj& obj, const get_obj_params& in_params, bo
     }
   }
 
-  set_date_header(in_params.mod_ptr, extra_headers, "HTTP_IF_MODIFIED_SINCE");
-  set_date_header(in_params.unmod_ptr, extra_headers, "HTTP_IF_UNMODIFIED_SINCE");
+  set_date_header(in_params.mod_ptr, extra_headers, in_params.high_precision_time, "HTTP_IF_MODIFIED_SINCE");
+  set_date_header(in_params.unmod_ptr, extra_headers, in_params.high_precision_time, "HTTP_IF_UNMODIFIED_SINCE");
+  if (!in_params.etag.empty()) {
+    set_header(in_params.etag, extra_headers, "HTTP_IF_MATCH");
+  }
   if (in_params.mod_zone_id != 0) {
     set_header(in_params.mod_zone_id, extra_headers, "HTTP_DEST_ZONE_SHORT_ID");
   }
index bdbdd4822db1aae89adc267e813456798bca5cac..aa91c65faeff3ddc55f230f38d503c8391068e9d 100644 (file)
@@ -105,6 +105,9 @@ public:
     req_info *info{nullptr};
     const ceph::real_time *mod_ptr{nullptr};
     const ceph::real_time *unmod_ptr{nullptr};
+    bool high_precision_time{true};
+
+    string etag;
 
     uint32_t mod_zone_id{0};
     uint64_t mod_pg_ver{0};
index 635cc3c519c4b3fd2b4723e54567317c0882546d..309772b22d317e73ac4ef85a1438dba78bd6e4f4 100644 (file)
@@ -29,7 +29,7 @@ int RGWCallStatRemoteObjCR::operate() {
     yield {
       call(new RGWStatRemoteObjCR(sync_env->async_rados, sync_env->store,
                                   sync_env->source_zone,
-                                  bucket_info, key, &mtime, &size, &attrs));
+                                  bucket_info, key, &mtime, &size, &etag, &attrs));
     }
     if (retcode < 0) {
       ldout(sync_env->cct, 0) << "RGWStatRemoteObjCR() returned " << retcode << dendl;
@@ -41,7 +41,7 @@ int RGWCallStatRemoteObjCR::operate() {
     yield {
       RGWStatRemoteObjCBCR *cb = allocate_callback();
       if (cb) {
-        cb->set_result(mtime, size, std::move(attrs));
+        cb->set_result(mtime, size, etag, std::move(attrs));
         call(cb);
       }
     }
index be3fdc3d420ec420e45fabe009050d52c71118d6..9925d677eafd2cd7546c645d24322effd35b51bb 100644 (file)
@@ -122,6 +122,7 @@ protected:
 
   ceph::real_time mtime;
   uint64_t size = 0;
+  string etag;
   map<string, bufferlist> attrs;
 public:
   RGWStatRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
@@ -130,9 +131,11 @@ public:
 
   void set_result(ceph::real_time& _mtime,
                   uint64_t _size,
+                  const string& _etag,
                   map<string, bufferlist>&& _attrs) {
     mtime = _mtime;
     size = _size;
+    etag = _etag;
     attrs = std::move(_attrs);
   }
 };
@@ -140,6 +143,7 @@ public:
 class RGWCallStatRemoteObjCR : public RGWCoroutine {
   ceph::real_time mtime;
   uint64_t size{0};
+  string etag;
   map<string, bufferlist> attrs;
 
 protected:
index f89364f777dfc72d4bade054fbce3a09681c9d34..8ef9908760cc2e3334f7f36a197b460b5053196b 100644 (file)
@@ -43,6 +43,7 @@ struct AWSConfig {
   std::unique_ptr<RGWRESTConn> conn;
 };
 
+
 class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
 {
   RGWDataSyncEnv *sync_env;
@@ -50,15 +51,17 @@ class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
   rgw_obj src_obj;
   RGWRESTConn::get_obj_params req_params;
 
-  string etag;
+  rgw_sync_aws_src_obj_properties src_properties;
 public:
   RGWRESTStreamGetCRF(CephContext *_cct,
                                RGWCoroutinesEnv *_env,
                                RGWCoroutine *_caller,
                                RGWDataSyncEnv *_sync_env,
                                RGWRESTConn *_conn,
-                               rgw_obj& _src_obj) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager),
-                                                                                 sync_env(_sync_env), conn(_conn), src_obj(_src_obj) {
+                               rgw_obj& _src_obj,
+                               const rgw_sync_aws_src_obj_properties& _src_properties) : RGWStreamReadHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager),
+                                                                                 sync_env(_sync_env), conn(_conn), src_obj(_src_obj),
+                                                                                 src_properties(_src_properties) {
   }
 
   int init() override {
@@ -68,6 +71,11 @@ public:
     req_params.get_op = true;
     req_params.prepend_metadata = true;
 
+    req_params.unmod_ptr = &src_properties.mtime;
+    req_params.etag = src_properties.etag;
+    req_params.mod_zone_id = src_properties.zone_short_id;
+    req_params.mod_pg_ver = src_properties.pg_ver;
+
     if (range.is_set) {
       req_params.range_is_set = true;
       req_params.range_start = range.ofs;
@@ -91,8 +99,6 @@ public:
       const string& val = header.second;
       if (header.first == "RGWX_OBJECT_SIZE") {
         rest_obj.content_len = atoi(val.c_str());
-      } else if (header.first == "ETAG") {
-        etag = val;
       } else {
         rest_obj.attrs[header.first] = val;
       }
@@ -189,6 +195,8 @@ class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine {
   rgw_obj src_obj;
   rgw_obj dest_obj;
 
+  rgw_sync_aws_src_obj_properties src_properties;
+
   std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
   std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
 
@@ -196,18 +204,22 @@ public:
   RGWAWSStreamObjToCloudPlainCR(RGWDataSyncEnv *_sync_env,
                                 RGWRESTConn *_source_conn,
                                 const rgw_obj& _src_obj,
+                                const rgw_sync_aws_src_obj_properties& _src_properties,
                                 RGWRESTConn *_dest_conn,
                                 const rgw_obj& _dest_obj) : RGWCoroutine(_sync_env->cct),
                                                    sync_env(_sync_env),
                                                    source_conn(_source_conn),
                                                    dest_conn(_dest_conn),
                                                    src_obj(_src_obj),
-                                                   dest_obj(_dest_obj) {}
+                                                   dest_obj(_dest_obj),
+                                                   src_properties(_src_properties) {}
 
   int operate() {
     reenter(this) {
       /* init input */
-      in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+      in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env,
+                                           source_conn, src_obj,
+                                           src_properties));
 
       /* init output */
       out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conn,
@@ -232,6 +244,8 @@ class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
   rgw_obj src_obj;
   rgw_obj dest_obj;
 
+  rgw_sync_aws_src_obj_properties src_properties;
+
   string upload_id;
 
   rgw_sync_aws_multipart_part_info part_info;
@@ -247,6 +261,7 @@ public:
                                 const rgw_obj& _src_obj,
                                 RGWRESTConn *_dest_conn,
                                 const rgw_obj& _dest_obj,
+                                const rgw_sync_aws_src_obj_properties& _src_properties,
                                 const string& _upload_id,
                                 const rgw_sync_aws_multipart_part_info& _part_info,
                                 string *_petag) : RGWCoroutine(_sync_env->cct),
@@ -255,6 +270,7 @@ public:
                                                    dest_conn(_dest_conn),
                                                    src_obj(_src_obj),
                                                    dest_obj(_dest_obj),
+                                                   src_properties(_src_properties),
                                                    upload_id(_upload_id),
                                                    part_info(_part_info),
                                                    petag(_petag) {}
@@ -262,7 +278,9 @@ public:
   int operate() {
     reenter(this) {
       /* init input */
-      in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env, source_conn, src_obj));
+      in_crf.reset(new RGWRESTStreamGetCRF(cct, get_env(), this, sync_env,
+                                           source_conn, src_obj,
+                                           src_properties));
 
       in_crf->set_range(part_info.ofs, part_info.size);
 
@@ -334,7 +352,6 @@ class RGWAWSInitMultipartCR : public RGWCoroutine {
   rgw_obj dest_obj;
 
   uint64_t obj_size;
-  ceph::real_time mtime;
 
   bufferlist out_bl;
 
@@ -357,13 +374,11 @@ public:
                         RGWRESTConn *_dest_conn,
                         const rgw_obj& _dest_obj,
                         uint64_t _obj_size,
-                        const ceph::real_time& _mtime,
                         string *_upload_id) : RGWCoroutine(_sync_env->cct),
                                                    sync_env(_sync_env),
                                                    dest_conn(_dest_conn),
                                                    dest_obj(_dest_obj),
                                                    obj_size(_obj_size),
-                                                   mtime(_mtime),
                                                    upload_id(_upload_id) {}
 
   int operate() {
@@ -575,7 +590,8 @@ class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
   rgw_obj dest_obj;
 
   uint64_t obj_size;
-  ceph::real_time mtime;
+  string src_etag;
+  rgw_sync_aws_src_obj_properties src_properties;
 
   rgw_sync_aws_multipart_upload_info status;
 
@@ -592,14 +608,14 @@ public:
                                 RGWRESTConn *_dest_conn,
                                 const rgw_obj& _dest_obj,
                                 uint64_t _obj_size,
-                                const ceph::real_time& _mtime) : RGWCoroutine(_sync_env->cct),
+                                const rgw_sync_aws_src_obj_properties& _src_properties) : RGWCoroutine(_sync_env->cct),
                                                    sync_env(_sync_env),
                                                    source_conn(_source_conn),
                                                    dest_conn(_dest_conn),
                                                    src_obj(_src_obj),
                                                    dest_obj(_dest_obj),
                                                    obj_size(_obj_size),
-                                                   mtime(_mtime),
+                                                   src_properties(_src_properties),
                                                    status_obj(sync_env->store->get_zone_params().log_pool,
                                                               RGWBucketSyncStatusManager::obj_status_oid(sync_env->source_zone, src_obj)) {
   }
@@ -618,20 +634,21 @@ public:
       if (retcode >= 0) {
         /* check here that mtime and size did not change */
 
-        if (status.mtime != mtime || status.obj_size != obj_size) {
+        if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
+            status.src_properties.etag != src_properties.etag) {
           yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
           retcode = -ENOENT;
         }
       }
 
       if (retcode == -ENOENT) {
-        yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, status.mtime, &status.upload_id));
+        yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, &status.upload_id));
         if (retcode < 0) {
           return set_cr_error(retcode);
         }
 
         status.obj_size = obj_size;
-        status.mtime = mtime;
+        status.src_properties = src_properties;
 #warning flexible part size needed
         status.part_size = 5 * 1024 * 1024;
         status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
@@ -652,6 +669,7 @@ public:
           call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env,
                                                              source_conn, src_obj,
                                                              dest_conn, dest_obj,
+                                                             status.src_properties,
                                                              status.upload_id,
                                                              cur_part_info,
                                                              &cur_part_info.etag));
@@ -692,6 +710,27 @@ public:
     return 0;
   }
 };
+template <class T>
+int decode_attr(map<string, bufferlist>& attrs, const char *attr_name, T *result, T def_val)
+{
+  map<string, bufferlist>::iterator iter = attrs.find(attr_name);
+  if (iter == attrs.end()) {
+    *result = def_val;
+    return 0;
+  }
+  bufferlist& bl = iter->second;
+  if (bl.length() == 0) {
+    *result = def_val;
+    return 0;
+  }
+  bufferlist::iterator bliter = bl.begin();
+  try {
+    decode(*result, bliter);
+  } catch (buffer::error& err) {
+    return -EIO;
+  }
+  return 0;
+}
 
 // maybe use Fetch Remote Obj instead?
 class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
@@ -704,6 +743,9 @@ class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
   string obj_path;
   int ret{0};
 
+  uint32_t src_zone_short_id{0};
+  uint64_t src_pg_ver{0};
+
   static constexpr uint32_t multipart_threshold = 8 * 1024 * 1024;
 
 public:
@@ -719,10 +761,21 @@ public:
 
   int operate() override {
     reenter(this) {
-
+      ret = decode_attr(attrs, RGW_ATTR_PG_VER, &src_pg_ver, (uint64_t)0);
+      if (ret < 0) {
+        ldout(sync_env->cct, 0) << "ERROR: failed to decode pg ver attr, ignoring" << dendl;
+      } else {
+        ret = decode_attr(attrs, RGW_ATTR_SOURCE_ZONE, &src_zone_short_id, (uint32_t)0);
+        if (ret < 0) {
+          ldout(sync_env->cct, 0) << "ERROR: failed to decode source zone short_id attr, ignoring" << dendl;
+          src_pg_ver = 0; /* all or nothing */
+        }
+      }
       ldout(sync_env->cct, 0) << "AWS: download begin: z=" << sync_env->source_zone
                               << " b=" << bucket_info.bucket << " k=" << key << " size=" << size
-                              << " mtime=" << mtime << " attrs=" << attrs
+                              << " mtime=" << mtime << " etag=" << etag
+                              << " zone_short_id=" << src_zone_short_id << " pg_ver=" << src_pg_ver
+                              << " attrs=" << attrs
                               << dendl;
 
       source_conn = sync_env->store->get_zone_conn_by_id(sync_env->source_zone);
@@ -758,11 +811,20 @@ public:
                                                     uri resolution */
         rgw_obj dest_obj(target_bucket, aws_object_name(bucket_info, key));
 
+
+        rgw_sync_aws_src_obj_properties src_properties;
+        src_properties.mtime = mtime;
+        src_properties.etag = etag;
+        src_properties.zone_short_id = src_zone_short_id;
+        src_properties.pg_ver = src_pg_ver;
+
         if (size < multipart_threshold) {
-          call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj, conf.conn.get(), dest_obj));
+          call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj,
+                                                 src_properties,
+                                                 conf.conn.get(), dest_obj));
         } else {
           call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, source_conn, src_obj, conf.conn.get(),
-                                                     dest_obj, size, mtime));
+                                                     dest_obj, size, src_properties));
         }
       }
       if (retcode < 0) {
index 5180da5f7cdf666e17c37a5014f28a6d0ad458a3..14749d514b1e05b6d91c829efb2ad2a6dd53debe 100644 (file)
@@ -29,10 +29,36 @@ struct rgw_sync_aws_multipart_part_info {
 };
 WRITE_CLASS_ENCODER(rgw_sync_aws_multipart_part_info)
 
+struct rgw_sync_aws_src_obj_properties {
+  ceph::real_time mtime;
+  string etag;
+  uint32_t zone_short_id{0};
+  uint64_t pg_ver{0};
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    encode(mtime, bl);
+    encode(etag, bl);
+    encode(zone_short_id, bl);
+    encode(pg_ver, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+    DECODE_START(1, bl);
+    decode(mtime, bl);
+    decode(etag, bl);
+    decode(zone_short_id, bl);
+    decode(pg_ver, bl);
+    DECODE_FINISH(bl);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_sync_aws_src_obj_properties)
+
 struct rgw_sync_aws_multipart_upload_info {
   string upload_id;
   uint64_t obj_size;
-  ceph::real_time mtime;
+  rgw_sync_aws_src_obj_properties src_properties;
   uint32_t part_size{0};
   uint32_t num_parts{0};
 
@@ -45,7 +71,7 @@ struct rgw_sync_aws_multipart_upload_info {
     ENCODE_START(1, 1, bl);
     encode(upload_id, bl);
     encode(obj_size, bl);
-    encode(mtime, bl);
+    encode(src_properties, bl);
     encode(part_size, bl);
     encode(num_parts, bl);
     encode(cur_part, bl);
@@ -58,7 +84,7 @@ struct rgw_sync_aws_multipart_upload_info {
     DECODE_START(1, bl);
     decode(upload_id, bl);
     decode(obj_size, bl);
-    decode(mtime, bl);
+    decode(src_properties, bl);
     decode(part_size, bl);
     decode(num_parts, bl);
     decode(cur_part, bl);