]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw: copy_obj uses AtomicObjectProcessor
authorCasey Bodley <cbodley@redhat.com>
Wed, 10 Oct 2018 19:54:37 +0000 (15:54 -0400)
committerCasey Bodley <cbodley@redhat.com>
Tue, 16 Oct 2018 15:06:14 +0000 (11:06 -0400)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index dbfb47c75d0fb86cde22467ce50fd07cf529cdbd..7bc52497a14c5b8894dc54f5b41d284d9068956d 100644 (file)
@@ -7687,7 +7687,7 @@ static void set_copy_attrs(map<string, bufferlist>& src_attrs,
   }
 }
 
-int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj)
+int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, const rgw_obj& obj)
 {
   map<string, bufferlist> attrset;
 
@@ -7710,9 +7710,7 @@ int RGWRados::rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj)
   attrset.erase(RGW_ATTR_TAIL_TAG);
 
   return copy_obj_data(rctx, dest_bucket_info, read_op, obj_size - 1, obj, NULL, mtime, attrset,
-                       0, real_time(),
-                       (obj.key.instance.empty() ? NULL : &(obj.key.instance)),
-                       NULL);
+                       0, real_time(), NULL);
 }
 
 struct obj_time_weight {
@@ -8355,10 +8353,15 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
     }
   }
 
+  if (version_id && !version_id->empty()) {
+    dest_obj.key.set_instance(*version_id);
+  } else if (dest_bucket_info.versioning_enabled()) {
+    gen_rand_obj_instance_name(&dest_obj);
+  }
+
   if (copy_data) { /* refcounting tail wouldn't work here, just copy the data */
     return copy_obj_data(obj_ctx, dest_bucket_info, read_op, obj_size - 1, dest_obj,
-                         mtime, real_time(), attrs, olh_epoch, delete_at,
-                         version_id, petag);
+                         mtime, real_time(), attrs, olh_epoch, delete_at, petag);
   }
 
   RGWObjManifest::obj_iterator miter = astate->manifest.obj_begin();
@@ -8373,12 +8376,6 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
     return ret;
   }
 
-  if (version_id && !version_id->empty()) {
-    dest_obj.key.set_instance(*version_id);
-  } else if (dest_bucket_info.versioning_enabled()) {
-    gen_rand_obj_instance_name(&dest_obj);
-  }
-
   bufferlist first_chunk;
 
   bool copy_itself = (dest_obj == src_obj);
@@ -8481,26 +8478,23 @@ done_ret:
 int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
                RGWBucketInfo& dest_bucket_info,
               RGWRados::Object::Read& read_op, off_t end,
-               rgw_obj& dest_obj,
+               const rgw_obj& dest_obj,
               real_time *mtime,
               real_time set_mtime,
                map<string, bufferlist>& attrs,
                uint64_t olh_epoch,
               real_time delete_at,
-               string *version_id,
                string *petag)
 {
   string tag;
   append_rand_alpha(cct, tag, tag, 32);
 
-  RGWPutObjProcessor_Atomic processor(obj_ctx,
-                                      dest_bucket_info, dest_obj.bucket, dest_obj.key.name,
-                                      cct->_conf->rgw_obj_stripe_size, tag, dest_bucket_info.versioning_enabled());
-  if (version_id) {
-    processor.set_version_id(*version_id);
-  }
-  processor.set_olh_epoch(olh_epoch);
-  int ret = processor.prepare(this, NULL);
+  using namespace rgw::putobj;
+  AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
+  AtomicObjectProcessor processor(&aio, this, dest_bucket_info,
+                                  dest_bucket_info.owner, obj_ctx,
+                                  dest_obj, olh_epoch, tag);
+  int ret = processor.prepare();
   if (ret < 0)
     return ret;
 
@@ -8515,24 +8509,20 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
     }
 
     uint64_t read_len = ret;
-    bool again;
-
-    do {
-      void *handle;
-      rgw_raw_obj obj;
-
-      ret = processor.handle_data(bl, ofs, &handle, &obj, &again);
-      if (ret < 0) {
-        return ret;
-      }
-      ret = processor.throttle_data(handle, obj, read_len, false);
-      if (ret < 0)
-        return ret;
-    } while (again);
+    ret = processor.process(std::move(bl), ofs);
+    if (ret < 0) {
+      return ret;
+    }
 
     ofs += read_len;
   } while (ofs <= end);
 
+  // flush
+  ret = processor.process({}, ofs);
+  if (ret < 0) {
+    return ret;
+  }
+
   string etag;
   auto iter = attrs.find(RGW_ATTR_ETAG);
   if (iter != attrs.end()) {
@@ -8556,7 +8546,8 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx,
     accounted_size = compressed ? cs_info.orig_size : ofs;
   }
 
-  return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at);
+  return processor.complete(accounted_size, etag, mtime, set_mtime, attrs, delete_at,
+                            nullptr, nullptr, nullptr, nullptr, nullptr);
 }
 
 bool RGWRados::is_meta_master()
index 7714914dabbdf0425f01298a414e2305bd14fef6..641fc9a79bed503f1611b3ed71dc29f4e78b6a86 100644 (file)
@@ -3027,7 +3027,7 @@ public:
     ATTRSMOD_MERGE   = 2
   };
 
-  int rewrite_obj(RGWBucketInfo& dest_bucket_info, rgw_obj& obj);
+  int rewrite_obj(RGWBucketInfo& dest_bucket_info, const rgw_obj& obj);
 
   int stat_remote_obj(RGWObjectCtx& obj_ctx,
                const rgw_user& user_id,
@@ -3118,13 +3118,12 @@ public:
   int copy_obj_data(RGWObjectCtx& obj_ctx,
                RGWBucketInfo& dest_bucket_info,
               RGWRados::Object::Read& read_op, off_t end,
-               rgw_obj& dest_obj,
+               const rgw_obj& dest_obj,
               ceph::real_time *mtime,
               ceph::real_time set_mtime,
                map<string, bufferlist>& attrs,
                uint64_t olh_epoch,
               ceph::real_time delete_at,
-               string *version_id,
                string *petag);
   
   int check_bucket_empty(RGWBucketInfo& bucket_info);