]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: aws sync: new config structure, support multiple connections
authorYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Jan 2018 22:57:24 +0000 (14:57 -0800)
committerYehuda Sadeh <yehuda@redhat.com>
Thu, 12 Apr 2018 22:38:38 +0000 (15:38 -0700)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/common/ceph_json.cc
src/common/ceph_json.h
src/rgw/rgw_sync_module_aws.cc

index 89252b272b6da098361327092aa761abbc413fa9..803a97a6cd3ddbb2487553e7f04323408b9464c2 100644 (file)
@@ -815,6 +815,15 @@ int JSONFormattable::erase(const string& name)
   return 0;
 }
 
+void JSONFormattable::derive_from(const JSONFormattable& parent)
+{
+  for (auto& o : parent.obj) {
+    if (obj.find(o.first) == obj.end()) {
+      obj[o.first] = o.second;
+    }
+  }
+}
+
 void encode_json(const char *name, const JSONFormattable& v, Formatter *f)
 {
   switch (v.type) {
index 308abe867cf8f94119d601257c4cd900c18d1f2f..cbd6e1bf2266ea968bd943238a3f4a34b4d6bf87 100644 (file)
@@ -586,6 +586,8 @@ struct JSONFormattable {
 
   int set(const string& name, const string& val);
   int erase(const string& name);
+
+  void derive_from(const JSONFormattable& jf);
 };
 WRITE_CLASS_ENCODER(JSONFormattable)
 
index c027ef7c79bf635636d1a0c36713c935cdd9b6af..396d49950e14aa525500188b13f8322f7c2531ea 100644 (file)
@@ -43,23 +43,189 @@ static string obj_to_aws_path(const rgw_obj& obj)
   return obj.bucket.name + "/" + obj.key.name;
 }
 
-struct AWSSyncConfig {
-  string s3_endpoint;
-  string bucket_suffix;
+/*
+
+   json configuration definition:
+
+    {
+      default = {
+        "connection": {
+            "access_key": <access>,
+            "secret": <secret>,
+            "endpoint": <endpoint>,
+            "host_style" <path | virtual>
+        },
+        "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
+                                               # final object name will be target_path + "/" + obj
+      },
+      "connections": [
+          {
+            "connection_id": <id>,
+            "access_key": <access>,
+            "secret": <secret>,
+            "endpoint": <endpoint>,
+          } ... ],
+      "targets": [
+          {
+         "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
+         "target_path": <dest>,   # (override default)
+         "connection_id": <connection_id> # (override default)
+          } ... ],
+      "acl_mapping": [    # list of source uids and how they map into destination uids in the dest objects acls
+      {
+         "source_id": <id>,
+         "dest_id": <id>
+      } ...
+      ]
+    }
+
+*/
+
+struct AWSSyncConfig_Connection {
+  string connection_id;
+  string endpoint;
   RGWAccessKey key;
-  HostStyle host_style;
+  HostStyle host_style{PathStyle};
+
+  void init(const JSONFormattable& config) {
+    connection_id = config["connectionn_id"];
+    endpoint = config["endpoint"];
 
+    key = RGWAccessKey(config["access_key"], config["secret"]);
+    string host_style_str = config["host_style"];
+    if (host_style_str != "virtual") {
+      host_style = PathStyle;
+    } else {
+      host_style = VirtualStyle;
+    }
+  }
+};
+
+static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
+{
+  string sval;
+  if (config.find(key, &sval)) {
+    string err;
+    uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
+    if (!err.empty()) {
+      ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
+      return -EINVAL;
+    }
+    *pval = val;
+  }
+  return 0;
+}
 
+struct AWSSyncConfig_S3 {
   uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
   uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};
+
+  int init(CephContext *cct, const JSONFormattable& config) {
+    int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &multipart_sync_threshold);
+    if (r < 0) {
+      return r;
+    }
+
+    r = conf_to_uint64(cct, config, "multipart_min_part_size", &multipart_min_part_size);
+    if (r < 0) {
+      return r;
+    }
+#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
+    if (multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+      multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+    }
+    return 0;
+  }
+};
+
+struct AWSSyncConfig_Default {
+  std::shared_ptr<AWSSyncConfig_Connection> conn;
+
+  void init(const JSONFormattable& config) {
+    conn = make_shared<AWSSyncConfig_Connection>();
+    conn->init(config["connection"]);
+  }
+};
+
+struct AWSSyncConfig {
+  AWSSyncConfig_Default default_conf;
+
+  map<string, AWSSyncConfig_Connection> connections;
+
+  string bucket_suffix;
+
+  AWSSyncConfig_S3 s3;
+
+  AWSSyncConfig() {}
+  AWSSyncConfig(const AWSSyncConfig& c) : default_conf(c.default_conf),
+                                          connections(c.connections),
+                                          bucket_suffix(c.bucket_suffix),
+                                          s3(c.s3) {}
+
+  int init(CephContext *cct, const JSONFormattable& config) {
+    if (config.exists("default")) {
+      default_conf.init(config["default"]);
+    }
+
+    auto& default_conn = config["default"]["connection"];
+
+    for (auto conn : config["connections"].array()) {
+      auto new_conn = conn;
+      new_conn.derive_from(default_conn);
+
+      connections[new_conn["connection_id"]].init(new_conn);
+    }
+
+    bucket_suffix = config["bucket_suffix"];
+
+    int r = s3.init(cct, config["s3"]);
+    if (r < 0) {
+      return r;
+    }
+
+    return 0;
+  }
 };
 
 struct AWSSyncInstanceEnv {
   AWSSyncConfig conf;
   string id;
-  std::unique_ptr<RGWRESTConn> conn;
+  std::unique_ptr<RGWRESTConn> default_conn;
+
+  map<string, std::unique_ptr<RGWRESTConn> > connections;
 
   AWSSyncInstanceEnv(const AWSSyncConfig& _conf) : conf(_conf) {}
+
+  void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {
+    char buf[32];
+    snprintf(buf, sizeof(buf), "s3-%llu", (unsigned long long)instance_id);
+    id = buf;
+    auto& conn = conf.default_conf.conn;
+    if (conn) {
+      default_conn.reset(new S3RESTConn(sync_env->cct,
+                                        sync_env->store,
+                                        id,
+                                        { conn->endpoint },
+                                        conn->key,
+                                        conn->host_style));
+    }
+
+    for (auto i : conf.connections) {
+      auto& c = i.second;
+      connections[c.connection_id].reset(new S3RESTConn(sync_env->cct,
+                                                        sync_env->store,
+                                                        id,
+                                                        { c.endpoint },
+                                                        c.key,
+                                                        c.host_style));
+
+    }
+  }
+
+  RGWRESTConn *get_conn(const rgw_bucket& bucket) const {
+#warning FIXME
+    return default_conn.get();
+  }
 };
 
 class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
@@ -663,7 +829,7 @@ public:
         status.src_properties = src_properties;
 #define MULTIPART_MAX_PARTS 10000
         uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
-        status.part_size = std::max(conf.multipart_min_part_size, min_part_size);
+        status.part_size = std::max(conf.s3.multipart_min_part_size, min_part_size);
         status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
         status.cur_part = 1;
       }
@@ -748,7 +914,8 @@ int decode_attr(map<string, bufferlist>& attrs, const char *attr_name, T *result
 // maybe use Fetch Remote Obj instead?
 class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
   const AWSSyncInstanceEnv& instance;
-  RGWRESTConn *source_conn;
+  RGWRESTConn *source_conn{nullptr};
+  RGWRESTConn *dest_conn{nullptr};
   bufferlist res;
   unordered_map <string, bool> bucket_created;
   string target_bucket_name;
@@ -805,11 +972,14 @@ public:
       }
 
       target_bucket_name = aws_bucket_name(bucket_info, instance.conf.bucket_suffix);
+
+      dest_conn = instance.get_conn(bucket_info.bucket);
+
       if (bucket_created.find(target_bucket_name) == bucket_created.end()){
         yield {
           ldout(sync_env->cct,0) << "AWS: creating bucket" << target_bucket_name << dendl;
           bufferlist bl;
-          call(new RGWPutRawRESTResourceCR <bufferlist> (sync_env->cct, instance.conn.get(),
+          call(new RGWPutRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn,
                                                   sync_env->http_manager,
                                                   target_bucket_name, nullptr, bl, &out_bl));
         }
@@ -858,12 +1028,12 @@ public:
         src_properties.zone_short_id = src_zone_short_id;
         src_properties.pg_ver = src_pg_ver;
 
-        if (size < instance.conf.multipart_sync_threshold) {
+        if (size < instance.conf.s3.multipart_sync_threshold) {
           call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj,
                                                  src_properties,
-                                                 instance.conn.get(), dest_obj));
+                                                 dest_conn, dest_obj));
         } else {
-          call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj, instance.conn.get(),
+          call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj, dest_conn,
                                                      dest_obj, size, src_properties));
         }
       }
@@ -895,7 +1065,8 @@ public:
 };
 
 class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine {
-  RGWDataSyncEnv *sync_env;
+  RGWDataSyncEnv *sync_env{nullptr};
+  RGWRESTConn *dest_conn{nullptr};
   RGWBucketInfo bucket_info;
   rgw_obj_key key;
   ceph::real_time mtime;
@@ -913,7 +1084,8 @@ public:
       yield {
         string path = aws_bucket_name(bucket_info, instance.conf.bucket_suffix) + "/" + aws_object_name(bucket_info, key);
         ldout(sync_env->cct, 0) << "AWS: removing aws object at" << path << dendl;
-        call(new RGWDeleteRESTResourceCR(sync_env->cct, instance.conn.get(),
+        dest_conn = instance.get_conn(bucket_info.bucket);
+        call(new RGWDeleteRESTResourceCR(sync_env->cct, dest_conn,
                                          sync_env->http_manager,
                                          path, nullptr /* params */));
       }
@@ -938,13 +1110,7 @@ public:
   }
 
   void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {
-    instance.id = string("s3:") + instance.conf.s3_endpoint;
-    instance.conn.reset(new S3RESTConn(cct,
-                                    sync_env->store,
-                                    instance.id,
-                                    { instance.conf.s3_endpoint },
-                                    instance.conf.key,
-                                    instance.conf.host_style));
+    instance.init(sync_env, instance_id);
   }
 
   ~RGWAWSDataSyncModule() {}
@@ -977,54 +1143,14 @@ public:
   }
 };
 
-static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
-{
-  string sval;
-  if (config.find(key, &sval)) {
-    string err;
-    uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
-    if (!err.empty()) {
-      ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
-      return -EINVAL;
-    }
-    *pval = val;
-  }
-  return 0;
-}
-
 int RGWAWSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config,  RGWSyncModuleInstanceRef *instance){
   AWSSyncConfig conf;
 
-  conf.s3_endpoint = config["s3_endpoint"];
-
-  string host_style_str = config["host_style"];
-  if (host_style_str != "virtual") {
-    conf.host_style = PathStyle;
-  } else {
-    conf.host_style = VirtualStyle;
-  }
-
-  conf.bucket_suffix = config["bucket_suffix"];
-
-  string access_key = config["access_key"];
-  string secret = config["secret"];
-
-  conf.key = RGWAccessKey(access_key, secret);
-
-  int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &conf.multipart_sync_threshold);
+  int r = conf.init(cct, config);
   if (r < 0) {
     return r;
   }
 
-  r = conf_to_uint64(cct, config, "multipart_min_part_size", &conf.multipart_min_part_size);
-  if (r < 0) {
-    return r;
-  }
-#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
-  if (conf.multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
-    conf.multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
-  }
-
   instance->reset(new RGWAWSSyncModuleInstance(cct, conf));
   return 0;
 }