return obj.bucket.name + "/" + obj.key.name;
}
-struct AWSSyncConfig {
- string s3_endpoint;
- string bucket_suffix;
+/*
+
+ json configuration definition:
+
+ {
+ default = {
+ "connection": {
+ "access_key": <access>,
+ "secret": <secret>,
+ "endpoint": <endpoint>,
+ "host_style" <path | virtual>
+ },
+ "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
+ # final object name will be target_path + "/" + obj
+ },
+ "connections": [
+ {
+ "connection_id": <id>,
+ "access_key": <access>,
+ "secret": <secret>,
+ "endpoint": <endpoint>,
+ } ... ],
+ "targets": [
+ {
+ "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
+ "target_path": <dest>, # (override default)
+ "connection_id": <connection_id> # (override default)
+ } ... ],
+ "acl_mapping": [ # list of source uids and how they map into destination uids in the dest objects acls
+ {
+ "source_id": <id>,
+ "dest_id": <id>
+ } ...
+ ]
+ }
+
+*/
+
+struct AWSSyncConfig_Connection {
+ string connection_id;
+ string endpoint;
RGWAccessKey key;
- HostStyle host_style;
+ HostStyle host_style{PathStyle};
+
+ void init(const JSONFormattable& config) {
+ connection_id = config["connectionn_id"];
+ endpoint = config["endpoint"];
+ key = RGWAccessKey(config["access_key"], config["secret"]);
+ string host_style_str = config["host_style"];
+ if (host_style_str != "virtual") {
+ host_style = PathStyle;
+ } else {
+ host_style = VirtualStyle;
+ }
+ }
+};
+
+static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
+{
+ string sval;
+ if (config.find(key, &sval)) {
+ string err;
+ uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
+ if (!err.empty()) {
+ ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
+ return -EINVAL;
+ }
+ *pval = val;
+ }
+ return 0;
+}
+struct AWSSyncConfig_S3 {
uint64_t multipart_sync_threshold{DEFAULT_MULTIPART_SYNC_PART_SIZE};
uint64_t multipart_min_part_size{DEFAULT_MULTIPART_SYNC_PART_SIZE};
+
+ int init(CephContext *cct, const JSONFormattable& config) {
+ int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &multipart_sync_threshold);
+ if (r < 0) {
+ return r;
+ }
+
+ r = conf_to_uint64(cct, config, "multipart_min_part_size", &multipart_min_part_size);
+ if (r < 0) {
+ return r;
+ }
+#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
+ if (multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
+ multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
+ }
+ return 0;
+ }
+};
+
+struct AWSSyncConfig_Default {
+ std::shared_ptr<AWSSyncConfig_Connection> conn;
+
+ void init(const JSONFormattable& config) {
+ conn = make_shared<AWSSyncConfig_Connection>();
+ conn->init(config["connection"]);
+ }
+};
+
+struct AWSSyncConfig {
+ AWSSyncConfig_Default default_conf;
+
+ map<string, AWSSyncConfig_Connection> connections;
+
+ string bucket_suffix;
+
+ AWSSyncConfig_S3 s3;
+
+ AWSSyncConfig() {}
+ AWSSyncConfig(const AWSSyncConfig& c) : default_conf(c.default_conf),
+ connections(c.connections),
+ bucket_suffix(c.bucket_suffix),
+ s3(c.s3) {}
+
+ int init(CephContext *cct, const JSONFormattable& config) {
+ if (config.exists("default")) {
+ default_conf.init(config["default"]);
+ }
+
+ auto& default_conn = config["default"]["connection"];
+
+ for (auto conn : config["connections"].array()) {
+ auto new_conn = conn;
+ new_conn.derive_from(default_conn);
+
+ connections[new_conn["connection_id"]].init(new_conn);
+ }
+
+ bucket_suffix = config["bucket_suffix"];
+
+ int r = s3.init(cct, config["s3"]);
+ if (r < 0) {
+ return r;
+ }
+
+ return 0;
+ }
};
struct AWSSyncInstanceEnv {
AWSSyncConfig conf;
string id;
- std::unique_ptr<RGWRESTConn> conn;
+ std::unique_ptr<RGWRESTConn> default_conn;
+
+ map<string, std::unique_ptr<RGWRESTConn> > connections;
AWSSyncInstanceEnv(const AWSSyncConfig& _conf) : conf(_conf) {}
+
+ void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {
+ char buf[32];
+ snprintf(buf, sizeof(buf), "s3-%llu", (unsigned long long)instance_id);
+ id = buf;
+ auto& conn = conf.default_conf.conn;
+ if (conn) {
+ default_conn.reset(new S3RESTConn(sync_env->cct,
+ sync_env->store,
+ id,
+ { conn->endpoint },
+ conn->key,
+ conn->host_style));
+ }
+
+ for (auto i : conf.connections) {
+ auto& c = i.second;
+ connections[c.connection_id].reset(new S3RESTConn(sync_env->cct,
+ sync_env->store,
+ id,
+ { c.endpoint },
+ c.key,
+ c.host_style));
+
+ }
+ }
+
+ RGWRESTConn *get_conn(const rgw_bucket& bucket) const {
+#warning FIXME
+ return default_conn.get();
+ }
};
class RGWRESTStreamGetCRF : public RGWStreamReadHTTPResourceCRF
status.src_properties = src_properties;
#define MULTIPART_MAX_PARTS 10000
uint64_t min_part_size = obj_size / MULTIPART_MAX_PARTS;
- status.part_size = std::max(conf.multipart_min_part_size, min_part_size);
+ status.part_size = std::max(conf.s3.multipart_min_part_size, min_part_size);
status.num_parts = (obj_size + status.part_size - 1) / status.part_size;
status.cur_part = 1;
}
// maybe use Fetch Remote Obj instead?
class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
const AWSSyncInstanceEnv& instance;
- RGWRESTConn *source_conn;
+ RGWRESTConn *source_conn{nullptr};
+ RGWRESTConn *dest_conn{nullptr};
bufferlist res;
unordered_map <string, bool> bucket_created;
string target_bucket_name;
}
target_bucket_name = aws_bucket_name(bucket_info, instance.conf.bucket_suffix);
+
+ dest_conn = instance.get_conn(bucket_info.bucket);
+
if (bucket_created.find(target_bucket_name) == bucket_created.end()){
yield {
ldout(sync_env->cct,0) << "AWS: creating bucket" << target_bucket_name << dendl;
bufferlist bl;
- call(new RGWPutRawRESTResourceCR <bufferlist> (sync_env->cct, instance.conn.get(),
+ call(new RGWPutRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn,
sync_env->http_manager,
target_bucket_name, nullptr, bl, &out_bl));
}
src_properties.zone_short_id = src_zone_short_id;
src_properties.pg_ver = src_pg_ver;
- if (size < instance.conf.multipart_sync_threshold) {
+ if (size < instance.conf.s3.multipart_sync_threshold) {
call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj,
src_properties,
- instance.conn.get(), dest_obj));
+ dest_conn, dest_obj));
} else {
- call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj, instance.conn.get(),
+ call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj, dest_conn,
dest_obj, size, src_properties));
}
}
};
class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine {
- RGWDataSyncEnv *sync_env;
+ RGWDataSyncEnv *sync_env{nullptr};
+ RGWRESTConn *dest_conn{nullptr};
RGWBucketInfo bucket_info;
rgw_obj_key key;
ceph::real_time mtime;
yield {
string path = aws_bucket_name(bucket_info, instance.conf.bucket_suffix) + "/" + aws_object_name(bucket_info, key);
ldout(sync_env->cct, 0) << "AWS: removing aws object at" << path << dendl;
- call(new RGWDeleteRESTResourceCR(sync_env->cct, instance.conn.get(),
+ dest_conn = instance.get_conn(bucket_info.bucket);
+ call(new RGWDeleteRESTResourceCR(sync_env->cct, dest_conn,
sync_env->http_manager,
path, nullptr /* params */));
}
}
void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {
- instance.id = string("s3:") + instance.conf.s3_endpoint;
- instance.conn.reset(new S3RESTConn(cct,
- sync_env->store,
- instance.id,
- { instance.conf.s3_endpoint },
- instance.conf.key,
- instance.conf.host_style));
+ instance.init(sync_env, instance_id);
}
~RGWAWSDataSyncModule() {}
}
};
-static int conf_to_uint64(CephContext *cct, const JSONFormattable& config, const string& key, uint64_t *pval)
-{
- string sval;
- if (config.find(key, &sval)) {
- string err;
- uint64_t val = strict_strtoll(sval.c_str(), 10, &err);
- if (!err.empty()) {
- ldout(cct, 0) << "ERROR: could not parse configurable value for cloud sync module: " << key << ": " << sval << dendl;
- return -EINVAL;
- }
- *pval = val;
- }
- return 0;
-}
-
int RGWAWSSyncModule::create_instance(CephContext *cct, const JSONFormattable& config, RGWSyncModuleInstanceRef *instance){
AWSSyncConfig conf;
- conf.s3_endpoint = config["s3_endpoint"];
-
- string host_style_str = config["host_style"];
- if (host_style_str != "virtual") {
- conf.host_style = PathStyle;
- } else {
- conf.host_style = VirtualStyle;
- }
-
- conf.bucket_suffix = config["bucket_suffix"];
-
- string access_key = config["access_key"];
- string secret = config["secret"];
-
- conf.key = RGWAccessKey(access_key, secret);
-
- int r = conf_to_uint64(cct, config, "multipart_sync_threshold", &conf.multipart_sync_threshold);
+ int r = conf.init(cct, config);
if (r < 0) {
return r;
}
- r = conf_to_uint64(cct, config, "multipart_min_part_size", &conf.multipart_min_part_size);
- if (r < 0) {
- return r;
- }
-#define MULTIPART_MIN_POSSIBLE_PART_SIZE (5 * 1024 * 1024)
- if (conf.multipart_min_part_size < MULTIPART_MIN_POSSIBLE_PART_SIZE) {
- conf.multipart_min_part_size = MULTIPART_MIN_POSSIBLE_PART_SIZE;
- }
-
instance->reset(new RGWAWSSyncModuleInstance(cct, conf));
return 0;
}