{
"source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
"target_path": <dest>, # (override default)
- "connection_id": <connection_id> # (override default)
+ "connection_id": <connection_id> # optional, if empty references default connection
} ... ],
"acl_mapping": [ # list of source uids and how they map into destination uids in the dest objects acls
{
}
};
+struct AWSSyncConfig_Target {
+ string source_bucket;
+ string target_path;
+ string connection_id;
+
+ void init(const JSONFormattable& config) {
+ source_bucket = config["source_bucket"];
+ target_path = config["target_path"];
+ connection_id = config["connection_id"];
+ }
+};
+
+
struct AWSSyncConfig {
AWSSyncConfig_Default default_conf;
AWSSyncConfig_S3 s3;
+ struct Target {
+ string path;
+ string connection_id;
+ bool prefix{false};
+ };
+
+ map<string, Target> explicit_targets;
+
+ bool find_target(const rgw_bucket bucket, const Target **result) const {
+ const string& name = bucket.name;
+ auto iter = explicit_targets.upper_bound(name);
+ if (iter == explicit_targets.begin()) {
+ return false;
+ }
+
+ --iter;
+ if (iter->first.size() > name.size()) {
+ return false;
+ }
+ if (name.compare(0, iter->first.size(), iter->first) != 0) {
+ return false;
+ }
+
+ const Target *target = &iter->second;
+
+ if (!target->prefix &&
+ name.size() != iter->first.size()) {
+ return false;
+ }
+
+ *result = target;
+ return true;
+ }
+
AWSSyncConfig() {}
AWSSyncConfig(const AWSSyncConfig& c) : default_conf(c.default_conf),
connections(c.connections),
return r;
}
+ for (auto target_conf : config["targets"].array()) {
+ AWSSyncConfig_Target tc;
+ tc.init(target_conf);
+
+ if (connections.find(tc.connection_id) == connections.end()) {
+ ldout(cct, 0) << "ERROR: targets configuration reference non-existent connection_id=" << tc.connection_id << dendl;
+ return -EINVAL;
+ }
+
+ Target t;
+ t.connection_id = tc.connection_id;
+ t.path = tc.target_path;
+
+ auto& sb = tc.source_bucket;
+
+ string s;
+
+ t.prefix = (!sb.empty() && sb[sb.size() - 1] == '*');
+
+ if (t.prefix) {
+ sb = sb.substr(0, sb.size() - 1);
+ }
+
+ if (explicit_targets.find(sb) != explicit_targets.end()) {
+ ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl;
+ }
+
+ explicit_targets[sb] = t;
+ }
+
return 0;
}
};
}
}
- RGWRESTConn *get_conn(const rgw_bucket& bucket) const {
-#warning FIXME
- return default_conn.get();
+ int get_conn(RGWDataSyncEnv *sync_env, const rgw_bucket& bucket, RGWRESTConn **connection) const {
+ const AWSSyncConfig::Target *target;
+
+ if (!conf.find_target(bucket, &target)) {
+ ldout(sync_env->cct, 20) << "Couldn't find configured target connection for bucket " << bucket.name << ", using default connection" << dendl;
+
+ *connection = default_conn.get();
+ return 0;
+ }
+
+ ldout(sync_env->cct, 20) << "Found configured target connection for bucket " << bucket.name << ", using connection id=" << target->connection_id << dendl;
+
+ auto iter = connections.find(target->connection_id);
+ if (iter == connections.end()) {
+ ldout(sync_env->cct, 0) << "ERROR: connection " << target->connection_id << " is not configured" << dendl;
+ return -EINVAL;
+ }
+ *connection = iter->second.get();
+ return 0;
}
};
target_bucket_name = aws_bucket_name(bucket_info, instance.conf.bucket_suffix);
- dest_conn = instance.get_conn(bucket_info.bucket);
+ ret = instance.get_conn(sync_env, bucket_info.bucket, &dest_conn);
+ if (ret < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to get dest connection for bucket " << bucket_info.bucket << dendl;
+ return set_cr_error(ret);
+ }
if (bucket_created.find(target_bucket_name) == bucket_created.end()){
yield {
rgw_obj_key key;
ceph::real_time mtime;
const AWSSyncInstanceEnv& instance;
+ int ret{0};
public:
RGWAWSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
yield {
string path = aws_bucket_name(bucket_info, instance.conf.bucket_suffix) + "/" + aws_object_name(bucket_info, key);
ldout(sync_env->cct, 0) << "AWS: removing aws object at" << path << dendl;
- dest_conn = instance.get_conn(bucket_info.bucket);
+ ret = instance.get_conn(sync_env, bucket_info.bucket, &dest_conn);
+ if (ret < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: failed to get dest connection for bucket " << bucket_info.bucket << dendl;
+ return set_cr_error(ret);
+ }
+
call(new RGWDeleteRESTResourceCR(sync_env->cct, dest_conn,
sync_env->http_manager,
path, nullptr /* params */));