]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: sync: user mode sync pipes fetch objects using effective uid
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 20 Nov 2019 04:01:57 +0000 (20:01 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Tue, 28 Jan 2020 18:20:39 +0000 (10:20 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket_sync.cc
src/rgw/rgw_bucket_sync.h
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_cr_rados.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_json_enc.cc
src/rgw/rgw_sync_policy.h

index 7c68f405f8b13f161d7d3429ea0b6681ff0721cb..acfd7974152924d8baabfd732d417b88de46cff1 100644 (file)
@@ -2864,6 +2864,8 @@ class SyncPolicyContext
 
   rgw_sync_policy_info *policy{nullptr};
 
+  std::optional<rgw_user> owner;
+
 public:
   SyncPolicyContext(const string& zonegroup_id,
                     const string& zonegroup_name,
@@ -2888,6 +2890,8 @@ public:
       return ret;
     }
 
+    owner = bucket_info.owner;
+
     if (!bucket_info.sync_policy) {
       rgw_sync_policy_info new_policy;
       bucket_info.set_sync_policy(std::move(new_policy));
@@ -2921,6 +2925,9 @@ public:
     return *policy;
   }
 
+  std::optional<rgw_user>& get_owner() {
+    return owner;
+  }
 };
 
 void resolve_zone_id_opt(std::optional<string>& zone_name, std::optional<rgw_zone_id>& zone_id)
@@ -3203,6 +3210,7 @@ int main(int argc, const char **argv)
   std::optional<string> opt_prefix_rm;
 
   std::optional<int> opt_priority;
+  std::optional<string> opt_mode;
 
   rgw::notify::EventTypeList event_types;
 
@@ -3610,6 +3618,8 @@ int main(int argc, const char **argv)
       opt_prefix_rm = val;
     } else if (ceph_argparse_witharg(args, i, &val, "--priority", (char*)NULL)) {
       opt_priority = atoi(val.c_str());
+    } else if (ceph_argparse_witharg(args, i, &val, "--mode", (char*)NULL)) {
+      opt_mode = val;
     } else if (ceph_argparse_binary_flag(args, i, &detail, NULL, "--detail", (char*)NULL)) {
       // do nothing
     } else if (strncmp(*i, "-", 1) == 0) {
@@ -8283,6 +8293,25 @@ next:
     if (opt_priority) {
       pipe->params.priority = *opt_priority;
     }
+    if (opt_mode) {
+      if (*opt_mode == "system") {
+        pipe->params.mode = rgw_sync_pipe_params::MODE_SYSTEM;
+      } else if (*opt_mode == "user") {
+        pipe->params.mode = rgw_sync_pipe_params::MODE_USER;
+      } else {
+        cerr << "ERROR: bad mode value: should be one of the following: system, user" << std::endl;
+        return EINVAL;
+      }
+    }
+
+    if (!user_id.empty()) {
+      pipe->params.user = user_id;
+    } else if (pipe->params.user.empty()) {
+      auto owner = sync_policy_ctx.get_owner();
+      if (owner) {
+        pipe->params.user = *owner;
+      }
+    }
 
     ret = sync_policy_ctx.write_policy();
     if (ret < 0) {
index a76fd43b7838e3fafdd63dd6d19cbfd138f24390..3a57f546a73f85a4beab4e9f70197d462a77135a 100644 (file)
@@ -289,6 +289,7 @@ void RGWBucketSyncFlowManager::pipe_rules::insert(const rgw_sync_bucket_pipe& pi
 }
 
 bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rgw_obj_key& key,
+                                                                        std::optional<rgw_user> *user,
                                                                         std::optional<rgw_user> *acl_translation_owner,
                                                                         std::optional<string> *storage_class,
                                                                         rgw_sync_pipe_params::Mode *mode,
@@ -345,6 +346,7 @@ bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rg
 
   bool conflict = false;
 
+  std::optional<rgw_user> _user;
   std::optional<rgw_sync_pipe_acl_translation> _acl_translation;
   std::optional<string> _storage_class;
   rgw_sync_pipe_params::Mode _mode;
@@ -353,13 +355,15 @@ bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rg
   for (auto& iter : iters) {
     auto& rule_params = iter->second->params;
     if (++i == 0) {
+      _user = rule_params.user;
       _acl_translation = rule_params.dest.acl_translation;
       _storage_class = rule_params.dest.storage_class;
       _mode = rule_params.mode;
       continue;
     }
 
-    conflict = !(_acl_translation == rule_params.dest.acl_translation &&
+    conflict = !(_user == rule_params.user &&
+                 _acl_translation == rule_params.dest.acl_translation &&
                  _storage_class == rule_params.dest.storage_class &&
                  _mode == rule_params.mode);
     if (conflict) {
@@ -368,6 +372,7 @@ bool RGWBucketSyncFlowManager::pipe_rules::find_basic_info_without_tags(const rg
     }
   }
 
+  *user = _user;
   if (_acl_translation) {
     *acl_translation_owner = _acl_translation->owner;
   }
@@ -668,6 +673,15 @@ RGWBucketSyncPolicyHandler::RGWBucketSyncPolicyHandler(const RGWBucketSyncPolicy
                                                                                             bucket_info(_bucket_info) {
   if (_bucket_info.sync_policy) {
     sync_policy = *_bucket_info.sync_policy;
+
+    for (auto& entry : sync_policy.groups) {
+      for (auto& pipe : entry.second.pipes) {
+        if (pipe.params.mode == rgw_sync_pipe_params::MODE_USER &&
+            pipe.params.user.empty()) {
+          pipe.params.user = _bucket_info.owner;
+        }
+      }
+    }
   }
   bucket = _bucket_info.bucket;
   zone_svc = parent->zone_svc;
index b6fa246877fae72a309f50c3c97b40f69aab7db4..a57c461de85435594a531dfd2216862683261b9e 100644 (file)
@@ -147,6 +147,7 @@ public:
     void insert(const rgw_sync_bucket_pipe& pipe);
 
     bool find_basic_info_without_tags(const rgw_obj_key& key,
+                                      std::optional<rgw_user> *user,
                                       std::optional<rgw_user> *acl_translation,
                                       std::optional<string> *storage_class,
                                       rgw_sync_pipe_params::Mode *mode,
@@ -183,6 +184,7 @@ public:
     }
     
     bool find_basic_info_without_tags(const rgw_obj_key& key,
+                                      std::optional<rgw_user> *user,
                                       std::optional<rgw_user> *acl_translation,
                                       std::optional<string> *storage_class,
                                       rgw_sync_pipe_params::Mode *mode,
@@ -190,7 +192,7 @@ public:
       if (!rules) {
         return false;
       }
-      return rules->find_basic_info_without_tags(key, acl_translation, storage_class, mode, need_more_info);
+      return rules->find_basic_info_without_tags(key, user, acl_translation, storage_class, mode, need_more_info);
     }
 
     bool find_obj_params(const rgw_obj_key& key,
index 5ad6bf287ff35d97e1d4cb266592f42d38bc6f13..93748ef24569a757aff4e89a6122e19b35d60030 100644 (file)
@@ -584,7 +584,6 @@ int RGWAsyncFetchRemoteObj::_send_request()
 {
   RGWObjectCtx obj_ctx(store);
 
-  string user_id;
   char buf[16];
   snprintf(buf, sizeof(buf), ".%lld", (long long)store->getRados()->instance_id());
   map<string, bufferlist> attrs;
@@ -595,7 +594,7 @@ int RGWAsyncFetchRemoteObj::_send_request()
 
   std::optional<uint64_t> bytes_transferred;
   int r = store->getRados()->fetch_remote_obj(obj_ctx,
-                       rgw_user(user_id),
+                       user_id.value_or(rgw_user()),
                        NULL, /* req_info */
                        source_zone,
                        dest_obj,
index f5c8f67b2b8ee07a3e01f3ba6053a1f315ffbc2b..289ff5464863daae6495b556cee17e195451c021 100644 (file)
@@ -920,6 +920,8 @@ class RGWAsyncFetchRemoteObj : public RGWAsyncRadosRequest {
   rgw::sal::RGWRadosStore *store;
   rgw_zone_id source_zone;
 
+  std::optional<rgw_user> user_id;
+
   rgw_bucket src_bucket;
   std::optional<rgw_placement_rule> dest_placement_rule;
   RGWBucketInfo dest_bucket_info;
@@ -941,6 +943,7 @@ protected:
 public:
   RGWAsyncFetchRemoteObj(RGWCoroutine *caller, RGWAioCompletionNotifier *cn, rgw::sal::RGWRadosStore *_store,
                          const rgw_zone_id& _source_zone,
+                         std::optional<rgw_user>& _user_id,
                          const rgw_bucket& _src_bucket,
                         std::optional<rgw_placement_rule> _dest_placement_rule,
                          const RGWBucketInfo& _dest_bucket_info,
@@ -953,6 +956,7 @@ public:
                          PerfCounters* counters, const DoutPrefixProvider *dpp)
     : RGWAsyncRadosRequest(caller, cn), store(_store),
       source_zone(_source_zone),
+      user_id(_user_id),
       src_bucket(_src_bucket),
       dest_placement_rule(_dest_placement_rule),
       dest_bucket_info(_dest_bucket_info),
@@ -976,6 +980,8 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
   rgw::sal::RGWRadosStore *store;
   rgw_zone_id source_zone;
 
+  std::optional<rgw_user> user_id;
+
   rgw_bucket src_bucket;
   std::optional<rgw_placement_rule> dest_placement_rule;
   RGWBucketInfo dest_bucket_info;
@@ -998,6 +1004,7 @@ class RGWFetchRemoteObjCR : public RGWSimpleCoroutine {
 public:
   RGWFetchRemoteObjCR(RGWAsyncRadosProcessor *_async_rados, rgw::sal::RGWRadosStore *_store,
                       const rgw_zone_id& _source_zone,
+                      std::optional<rgw_user> _user_id,
                       const rgw_bucket& _src_bucket,
                      std::optional<rgw_placement_rule> _dest_placement_rule,
                       const RGWBucketInfo& _dest_bucket_info,
@@ -1011,6 +1018,7 @@ public:
     : RGWSimpleCoroutine(_store->ctx()), cct(_store->ctx()),
       async_rados(_async_rados), store(_store),
       source_zone(_source_zone),
+      user_id(_user_id),
       src_bucket(_src_bucket),
       dest_placement_rule(_dest_placement_rule),
       dest_bucket_info(_dest_bucket_info),
@@ -1036,7 +1044,7 @@ public:
 
   int send_request() override {
     req = new RGWAsyncFetchRemoteObj(this, stack->create_completion_notifier(), store,
-                                    source_zone, src_bucket, dest_placement_rule, dest_bucket_info,
+                                    source_zone, user_id, src_bucket, dest_placement_rule, dest_bucket_info,
                                      key, dest_key, versioned_epoch, copy_if_newer, filter,
                                      zones_trace, counters, dpp);
     async_rados->queue(req);
index 7e1b7494af98d2823a8467e141a86d9cae57fa3d..101b6199741e27106936fb084b8b39a2cafcc71d 100644 (file)
@@ -1974,6 +1974,10 @@ int RGWDefaultSyncModule::create_instance(CephContext *cct, const JSONFormattabl
 class RGWFetchObjFilter_Sync : public RGWFetchObjFilter_Default {
   rgw_bucket_sync_pipe sync_pipe;
 
+  std::optional<ceph::real_time> mtime;
+  std::optional<string> etag;
+  std::optional<uint64_t> obj_size;
+
 public:
   RGWFetchObjFilter_Sync(rgw_bucket_sync_pipe& _sync_pipe) : sync_pipe(_sync_pipe) {
   }
@@ -2018,6 +2022,7 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct,
   std::optional<std::map<string, bufferlist> > new_attrs;
 
   if (params.mode == rgw_sync_pipe_params::MODE_USER) {
+#if 0
     RGWAccessControlPolicy policy;
     auto iter = obj_attrs.find(RGW_ATTR_ACL);
     if (iter == obj_attrs.end()) {
@@ -2031,7 +2036,7 @@ int RGWFetchObjFilter_Sync::filter(CephContext *cct,
       ldout(cct, 0) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode policy " << dendl;
       return abort_err;
     }
-
+#endif
     
   }
 
@@ -2061,6 +2066,7 @@ class RGWObjFetchCR : public RGWCoroutine {
   rgw_zone_set *zones_trace;
 
   bool need_more_info{false};
+  bool check_change{false};
 
   ceph::real_time src_mtime;
   uint64_t src_size;
@@ -2068,6 +2074,7 @@ class RGWObjFetchCR : public RGWCoroutine {
   map<string, bufferlist> src_attrs;
   map<string, string> src_headers;
 
+  std::optional<rgw_user> param_user;
   std::optional<rgw_user> param_acl_translation;
   std::optional<string> param_storage_class;
   rgw_sync_pipe_params::Mode param_mode;
@@ -2089,6 +2096,7 @@ public:
 
       {
         if (!sync_pipe.info.handler.find_basic_info_without_tags(key,
+                                                                 &param_user,
                                                                  &param_acl_translation,
                                                                  &param_storage_class,
                                                                  &param_mode,
@@ -2137,12 +2145,25 @@ public:
                                                     &params)) {
           return set_cr_error(-ERR_PRECONDITION_FAILED);
         }
+
+        param_user = params.user;
+        if (params.dest.acl_translation) {
+          param_acl_translation = params.dest.acl_translation->owner;
+        }
+        param_storage_class = params.dest.storage_class;
+        param_mode = params.mode;
       }
 
       yield {
         auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
 
+        std::optional<rgw_user> uid;
+        if (param_mode == rgw_sync_pipe_params::MODE_USER) {
+          uid = param_user;
+        }
+#warning FIXME: race guard
         call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+                                     uid,
                                      sync_pipe.info.source_bs.bucket,
                                      std::nullopt, sync_pipe.dest_bucket_info,
                                      key, std::nullopt, versioned_epoch,
@@ -2245,7 +2266,7 @@ RGWCoroutine *RGWArchiveDataSyncModule::sync_object(RGWDataSyncCtx *sc, rgw_buck
 
   auto filter = make_shared<RGWFetchObjFilter_Sync>(sync_pipe);
 
-  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone,
+  return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sc->source_zone, nullopt,
                                  sync_pipe.info.source_bs.bucket, std::nullopt, sync_pipe.dest_bucket_info,
                                  key, dest_key, versioned_epoch,
                                  true,
index 347925f902d7b05d69dfe8023285ce83f7e4a0f5..f0aa4b71de5d545146d21e202103e4fddc0d470d 100644 (file)
@@ -956,6 +956,7 @@ void rgw_sync_pipe_params::dump(Formatter *f) const
       s = "user";
   }
   encode_json("mode", s, f);
+  encode_json("user", user, f);
 }
 
 void rgw_sync_pipe_params::decode_json(JSONObj *obj)
@@ -970,6 +971,7 @@ void rgw_sync_pipe_params::decode_json(JSONObj *obj)
   } else {
     mode = MODE_USER;
   }
+  JSONDecoder::decode_json("user", user, obj);
 }
 
 
index a4d2fb005a59fd3be06a1eec40611ffc1db071a8..210544cc25459c1a9322ca2f943f01a702d0e728 100644 (file)
@@ -321,13 +321,15 @@ struct rgw_sync_pipe_params {
     MODE_USER = 1,
   } mode{MODE_SYSTEM};
   int32_t priority{0};
+  rgw_user user;
 
   void encode(bufferlist& bl) const {
-    ENCODE_START(1, 1, bl);
+    ENCODE_START(2, 1, bl);
     encode(source, bl);
     encode(dest, bl);
     encode(priority, bl);
     encode((uint8_t)mode, bl);
+    encode(user, bl);
     ENCODE_FINISH(bl);
   }
 
@@ -339,6 +341,9 @@ struct rgw_sync_pipe_params {
     uint8_t m;
     decode(m, bl);
     mode = (Mode)m;
+    if (struct_v >= 2) {
+      decode(user, bl);
+    }
     DECODE_FINISH(bl);
   }