]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: fetch_remote_obj() supports placement rule
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 4 Jan 2019 21:19:42 +0000 (13:19 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 4 Jan 2019 21:19:42 +0000 (13:19 -0800)
placement rule is optional, and if not provided will use the storage_class of
the source object that is being fetched (if possible).

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_putobj_processor.h
src/rgw/rgw_rados.cc
src/rgw/rgw_rados.h

index e2e343832b7b7f9a8f952cb8af48adf680165dd9..b83ad4a62990c0fe32ca2a71ad74e42b7ef603c5 100644 (file)
@@ -592,6 +592,7 @@ int RGWAsyncFetchRemoteObj::_send_request()
                        src_obj,
                        bucket_info, /* dest */
                        bucket_info, /* source */
+                      dest_placement_rule,
                        NULL, /* real_time* src_mtime, */
                        NULL, /* real_time* mtime, */
                        NULL, /* const real_time* mod_ptr, */
index 9a8024aa272a74db74320b253ee51f8fee196b9b..5740b6846d47ddee050dd8adb5b7a9af32254c56 100644 (file)
@@ -851,6 +851,7 @@ class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
   string source_zone;
 
   RGWBucketInfo bucket_info;
+  std::optional<rgw_placement_rule> dest_placement_rule;
 
   rgw_obj_key key;
   std::optional<uint64_t> versioned_epoch;
@@ -866,11 +867,13 @@ public:
   RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, RGWRados *_store,
                          const string& _source_zone,
                          RGWBucketInfo& _bucket_info,
+                        std::optional<rgw_placement_rule> _dest_placement_rule,
                          const rgw_obj_key& _key,
                          std::optional<uint64_t> _versioned_epoch,
                          bool _if_newer, rgw_zone_set *_zones_trace) : RGWAsyncRadosRequest(caller, cn), store(_store),
                                                       source_zone(_source_zone),
                                                       bucket_info(_bucket_info),
+                                                     dest_placement_rule(_dest_placement_rule),
                                                       key(_key),
                                                       versioned_epoch(_versioned_epoch),
                                                       copy_if_newer(_if_newer)
@@ -888,6 +891,7 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
   string source_zone;
 
   RGWBucketInfo bucket_info;
+  std::optional<rgw_placement_rule> dest_placement_rule;
 
   rgw_obj_key key;
   std::optional<uint64_t> versioned_epoch;
@@ -903,12 +907,14 @@ public:
   RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
                       const string& _source_zone,
                       RGWBucketInfo& _bucket_info,
+                     std::optional<rgw_placement_rule> _dest_placement_rule,
                       const rgw_obj_key& _key,
                       std::optional<uint64_t> _versioned_epoch,
                       bool _if_newer, rgw_zone_set *_zones_trace) : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
                                        async_rados(_async_rados), store(_store),
                                        source_zone(_source_zone),
                                        bucket_info(_bucket_info),
+                                      dest_placement_rule(_dest_placement_rule),
                                        key(_key),
                                        versioned_epoch(_versioned_epoch),
                                        copy_if_newer(_if_newer), req(NULL), zones_trace(_zones_trace) {}
@@ -926,7 +932,8 @@ public:
   }
 
   int send_request() override {
-    req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store, source_zone, bucket_info,
+    req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
+                                    source_zone, bucket_info, dest_placement_rule,
                                      key, versioned_epoch, copy_if_newer, zones_trace);
     async_rados->queue(req);
     return 0;
index 5a59a476cbe7653cd0db9b2f32dc6f06b4aa74c2..0e534cca684cc0dff842b1785f266242fe76fdab 100644 (file)
@@ -1666,6 +1666,7 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl
 RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
 {
   return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
+                                std::nullopt,
                                  key, versioned_epoch,
                                  true, zones_trace);
 }
index 851d31c311b2e1be8df07f49448d4fcb5663f1f2..17bf5150abfa7e1256e22aa1292882e426fe6a15 100644 (file)
@@ -144,6 +144,10 @@ class ManifestObjectProcessor : public HeadObjectProcessor,
           tail_placement_rule = *ptail_placement_rule;
         }
       }
+
+  void set_tail_placement(const rgw_placement_rule&& tpr) {
+    tail_placement_rule = tpr;
+  }
 };
 
 
index a69c950ba7129a1b06e0f91cbe516e98779265a5..68f5ede2f9f832ae79051f7e87bc7af9fc92df12 100644 (file)
@@ -3828,18 +3828,21 @@ class RGWRadosPutObj : public RGWHTTPStreamRWRequest::ReceiveCB
   void (*progress_cb)(off_t, void *);
   void *progress_data;
   bufferlist extra_data_bl;
-  uint64_t extra_data_left;
-  uint64_t data_len;
+  uint64_t extra_data_left{0};
+  bool need_to_process_attrs{false};
+  uint64_t data_len{0};
   map<string, bufferlist> src_attrs;
   uint64_t ofs{0};
   uint64_t lofs{0}; /* logical ofs */
+  std::function<int(const map<string, bufferlist>&)> attrs_handler;
 public:
   RGWRadosPutObj(CephContext* cct,
                  CompressorRef& plugin,
                  boost::optional<RGWPutObj_Compress>& compressor,
                  rgw::putobj::ObjectProcessor *p,
                  void (*_progress_cb)(off_t, void *),
-                 void *_progress_data) :
+                 void *_progress_data,
+                 std::function<int(const map<string, bufferlist>&)> _attrs_handler) :
                        cct(cct),
                        filter(p),
                        compressor(compressor),
@@ -3847,8 +3850,7 @@ public:
                        processor(p),
                        progress_cb(_progress_cb),
                        progress_data(_progress_data),
-                       extra_data_left(0),
-                       data_len(0) {}
+                       attrs_handler(_attrs_handler) {}
 
   int process_attrs(void) {
     if (extra_data_bl.length()) {
@@ -3864,6 +3866,11 @@ public:
       src_attrs.erase(RGW_ATTR_MANIFEST); // not interested in original object layout
     }
 
+    int ret = attrs_handler(src_attrs);
+    if (ret < 0) {
+      return ret;
+    }
+
     if (plugin && src_attrs.find(RGW_ATTR_CRYPT_MODE) == src_attrs.end()) {
       //do not compress if object is encrypted
       compressor = boost::in_place(cct, plugin, filter);
@@ -3874,6 +3881,7 @@ public:
       buffering = boost::in_place(&*compressor, buffer_size);
       filter = &*buffering;
     }
+
     return 0;
   }
 
@@ -3900,6 +3908,17 @@ public:
       if (bl.length() == 0) {
         return 0;
       }
+    } else if (need_to_process_attrs) {
+      /* need to call process_attrs() even if we don't get any attrs,
+       * need it to call attrs_handler(). At the moment this
+       * will never happenas all callers will have extra_data_len > 0, but need
+       * to have it for sake of completeness.
+       */
+      int res = process_attrs();
+      if (res < 0) {
+        return res;
+      }
+      need_to_process_attrs = false;
     }
 
     ceph_assert(uint64_t(ofs) >= extra_data_len);
@@ -3923,6 +3942,9 @@ public:
 
   void set_extra_data_len(uint64_t len) override {
     extra_data_left = len;
+    if (len == 0) {
+      need_to_process_attrs = true;
+    }
     RGWHTTPStreamRWRequest::ReceiveCB::set_extra_data_len(len);
   }
 
@@ -4206,6 +4228,7 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
                const rgw_obj& src_obj,
                RGWBucketInfo& dest_bucket_info,
                RGWBucketInfo& src_bucket_info,
+               std::optional<rgw_placement_rule> dest_placement_rule,
                real_time *src_mtime,
                real_time *mtime,
                const real_time *mod_ptr,
@@ -4233,18 +4256,13 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
   append_rand_alpha(cct, tag, tag, 32);
   obj_time_weight set_mtime_weight;
   set_mtime_weight.high_precision = high_precision_time;
+  int ret;
 
   rgw::AioThrottle aio(cct->_conf->rgw_put_obj_min_window_size);
   using namespace rgw::putobj;
-  rgw_placement_rule *ptail_rule{nullptr};
-#warning FIXME ptail_rule
+  const rgw_placement_rule *ptail_rule = (dest_placement_rule ? &(*dest_placement_rule) : nullptr);
   AtomicObjectProcessor processor(&aio, this, dest_bucket_info, ptail_rule, user_id,
                                   obj_ctx, dest_obj, olh_epoch, tag);
-  int ret = processor.prepare();
-  if (ret < 0) {
-    return ret;
-  }
-
   RGWRESTConn *conn;
   auto& zone_conn_map = svc.zone->get_zone_conn_map();
   auto& zonegroup_conn_map = svc.zone->get_zonegroup_conn_map();
@@ -4284,7 +4302,24 @@ int RGWRados::fetch_remote_obj(RGWObjectCtx& obj_ctx,
     }
   }
 
-  RGWRadosPutObj cb(cct, plugin, compressor, &processor, progress_cb, progress_data);
+  RGWRadosPutObj cb(cct, plugin, compressor, &processor, progress_cb, progress_data,
+                    [&](const map<string, bufferlist>& obj_attrs) {
+                      if (!ptail_rule) {
+                        auto iter = obj_attrs.find(RGW_ATTR_STORAGE_CLASS);
+                        if (iter != obj_attrs.end()) {
+                          rgw_placement_rule dest_rule;
+                          dest_rule.storage_class = iter->second.to_str();
+                          dest_rule.inherit_from(dest_bucket_info.placement_rule);
+                          processor.set_tail_placement(std::move(dest_rule));
+                        }
+                      }
+
+                      int ret = processor.prepare();
+                      if (ret < 0) {
+                        return ret;
+                      }
+                      return 0;
+                    });
 
   string etag;
   real_time set_mtime;
@@ -4538,7 +4573,8 @@ int RGWRados::copy_obj(RGWObjectCtx& obj_ctx,
 
   if (remote_src || !source_zone.empty()) {
     return fetch_remote_obj(obj_ctx, user_id, info, source_zone,
-               dest_obj, src_obj, dest_bucket_info, src_bucket_info, src_mtime, mtime, mod_ptr,
+               dest_obj, src_obj, dest_bucket_info, src_bucket_info,
+               dest_placement, src_mtime, mtime, mod_ptr,
                unmod_ptr, high_precision_time,
                if_match, if_nomatch, attrs_mod, copy_if_newer, attrs, category,
                olh_epoch, delete_at, ptag, petag, progress_cb, progress_data);
index d0988ca59a720e15368232d8c95383d484139adf..23d0f96f51bc2bd474b41175a6dd97397397a430 100644 (file)
@@ -1902,6 +1902,7 @@ public:
                        const rgw_obj& src_obj,
                        RGWBucketInfo& dest_bucket_info,
                        RGWBucketInfo& src_bucket_info,
+                      std::optional<rgw_placement_rule> dest_placement,
                        ceph::real_time *src_mtime,
                        ceph::real_time *mtime,
                        const ceph::real_time *mod_ptr,