json configuration definition:
{
- default = {
+ "connection": {
+ "access_key": <access>,
+ "secret": <secret>,
+ "endpoint": <endpoint>,
+ "host_style": <path | virtual>,
+ },
+ "acls": [ { "type": <id | email | uri>,
+ "source_id": <source_id>,
+ "dest_id": <dest_id> } ... ], # optional, acl mappings, no mappings if does not exist
+ "target_path": <target_path>, # override default
+
+
+ # anything below here is for non trivial configuration
+ # can be used in conjuction with the above
+
+ "default": {
"connection": {
"access_key": <access>,
"secret": <secret>,
"endpoint": <endpoint>,
"host_style" <path | virtual>,
- "acl_mappings": [ # list of source uids and how they map into destination uids in the dest objects acls
- {
- "type" : <id | email | uri>, # optional, default is id
- "source_id": <id>,
- "dest_id": <id>
- } ...
},
+ "acls": [ # list of source uids and how they map into destination uids in the dest objects acls
+ {
+ "type" : <id | email | uri>, # optional, default is id
+ "source_id": <id>,
+ "dest_id": <id>
+ } ... ]
"target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path,
# final object name will be target_path + "/" + obj
},
"connections": [
{
- "connection_id": <id>,
+ "id": <id>,
"access_key": <access>,
"secret": <secret>,
"endpoint": <endpoint>,
- "acl_mappings": [ # optional, overrides default
- {
- "source_id": <id>,
- "dest_id": <id>
- } ... ]
} ... ],
- "targets": [
+ "acl_profiles": [
{
- "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
- "target_path": <dest>, # (override default)
- "connection_id": <connection_id> # optional, if empty references default connection
+ "id": <id>, # acl mappings
+ "acls": [ {
+ "type": <id | email | uri>,
+ "source_id": <id>,
+ "dest_id": <id>
+ } ... ]
+ }
+ ],
+ "profiles": [
+ {
+ "source_bucket": <source>, # can specify either specific bucket name (foo), or prefix (foo*)
+ "target_path": <dest>, # (override default)
+ "connection_id": <connection_id>, # optional, if empty references default connection
+ "acls_id": <mappings_id>, # optional, if empty references default mappings
} ... ],
}
*/
-struct AWSSyncConfig_Connection {
- string connection_id;
- string endpoint;
- RGWAccessKey key;
- HostStyle host_style{PathStyle};
+struct ACLMapping {
+ ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER};
+ string source_id;
+ string dest_id;
- struct ACLMapping {
- ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER};
- string source_id;
- string dest_id;
+ ACLMapping() = default;
- ACLMapping(ACLGranteeTypeEnum t,
- const string& s,
- const string& d) : type(t),
- source_id(s),
- dest_id(d) {}
+ ACLMapping(ACLGranteeTypeEnum t,
+ const string& s,
+ const string& d) : type(t),
+ source_id(s),
+ dest_id(d) {}
- void dump_conf(CephContext *cct, JSONFormatter& jf) const {
- Formatter::ObjectSection os(jf, "acl_mapping");
- string s;
- switch (type) {
- case ACL_TYPE_EMAIL_USER:
- s = "email";
- break;
- case ACL_TYPE_GROUP:
- s = "uri";
- break;
- default:
- s = "id";
- break;
- }
- encode_json("type", s, &jf);
- encode_json("source_id", source_id, &jf);
- encode_json("dest_id", dest_id, &jf);
+ void init(const JSONFormattable& config) {
+ string t = config["type"];
+
+ if (t == "email") {
+ type = ACL_TYPE_EMAIL_USER;
+ } else if (t == "uri") {
+ type = ACL_TYPE_GROUP;
+ } else {
+ type = ACL_TYPE_CANON_USER;
}
- };
+ source_id = config["source_id"];
+ dest_id = config["dest_id"];
+ }
+
+ void dump_conf(CephContext *cct, JSONFormatter& jf) const {
+ Formatter::ObjectSection os(jf, "acl_mapping");
+ string s;
+ switch (type) {
+ case ACL_TYPE_EMAIL_USER:
+ s = "email";
+ break;
+ case ACL_TYPE_GROUP:
+ s = "uri";
+ break;
+ default:
+ s = "id";
+ break;
+ }
+ encode_json("type", s, &jf);
+ encode_json("source_id", source_id, &jf);
+ encode_json("dest_id", dest_id, &jf);
+ }
+};
+
+struct ACLMappings {
map<string, ACLMapping> acl_mappings;
void init(const JSONFormattable& config) {
- connection_id = config["connectionn_id"];
+ for (auto c : config.array()) {
+ ACLMapping m;
+ m.init(c);
+
+ acl_mappings.emplace(std::make_pair(m.source_id, m));
+ }
+ }
+ void dump_conf(CephContext *cct, JSONFormatter& jf) const {
+ Formatter::ArraySection os(jf, "acls");
+
+ for (auto& i : acl_mappings) {
+ i.second.dump_conf(cct, jf);
+ }
+ }
+};
+
+struct AWSSyncConfig_ACLProfiles {
+ map<string, std::shared_ptr<ACLMappings> > acl_profiles;
+
+ void init(const JSONFormattable& config) {
+ for (auto c : config.array()) {
+ const string& profile_id = c["id"];
+
+ std::shared_ptr<ACLMappings> ap{new ACLMappings};
+ ap->init(c["acls"]);
+
+ acl_profiles[profile_id] = ap;
+ }
+ }
+ void dump_conf(CephContext *cct, JSONFormatter& jf) const {
+ Formatter::ArraySection section(jf, "acl_profiles");
+
+ for (auto& p : acl_profiles) {
+ Formatter::ObjectSection section(jf, "profile");
+ encode_json("id", p.first, &jf);
+ p.second->dump_conf(cct, jf);
+ }
+ }
+
+ bool find(const string& profile_id, ACLMappings *result) const {
+ auto iter = acl_profiles.find(profile_id);
+ if (iter == acl_profiles.end()) {
+ return false;
+ }
+ *result = *iter->second;
+ return true;
+ }
+};
+
+struct AWSSyncConfig_Connection {
+ string connection_id;
+ string endpoint;
+ RGWAccessKey key;
+ HostStyle host_style{PathStyle};
+
+ bool has_endpoint{false};
+ bool has_key{false};
+ bool has_host_style{false};
+
+ void init(const JSONFormattable& config) {
+ has_endpoint = config.exists("endpoint");
+ has_key = config.exists("access_key") || config.exists("secret");
+ has_host_style = config.exists("host_style");
+
+ connection_id = config["id"];
endpoint = config["endpoint"];
key = RGWAccessKey(config["access_key"], config["secret"]);
} else {
host_style = VirtualStyle;
}
-
- for (auto c : config["acl_mappings"].array()) {
- const string& source_id = c["source_id"];
- const string& type_str = c["type"];
- ACLGranteeTypeEnum type;
- if (type_str == "email") {
- type = ACL_TYPE_EMAIL_USER;
- } else if (type_str == "uri") {
- type = ACL_TYPE_GROUP;
- } else {
- type = ACL_TYPE_CANON_USER;
- }
- acl_mappings.emplace(std::make_pair(source_id, ACLMapping(type, source_id, c["dest_id"])));
- }
}
void dump_conf(CephContext *cct, JSONFormatter& jf) const {
Formatter::ObjectSection section(jf, "connection");
- encode_json("connection_id", connection_id, &jf);
+ encode_json("id", connection_id, &jf);
encode_json("endpoint", endpoint, &jf);
string s = (host_style == PathStyle ? "path" : "virtual");
encode_json("host_style", s, &jf);
string secret = (key.key.empty() ? "" : "******");
encode_json("secret", secret, &jf);
}
-
- {
- Formatter::ArraySection os(jf, "acl_mappings");
- for (auto& m : acl_mappings) {
- m.second.dump_conf(cct, jf);
- }
- }
}
};
}
};
-struct AWSSyncConfig_Default {
- std::shared_ptr<AWSSyncConfig_Connection> conn;
+struct AWSSyncConfig_Profile {
+ string source_bucket;
+ bool prefix{false};
string target_path;
+ string connection_id;
+ string acls_id;
+
+ std::shared_ptr<AWSSyncConfig_Connection> conn_conf;
+ std::shared_ptr<ACLMappings> acls;
+
+ std::shared_ptr<RGWRESTConn> conn;
void init(const JSONFormattable& config) {
- conn = make_shared<AWSSyncConfig_Connection>();
- conn->init(config["connection"]);
+ source_bucket = config["source_bucket"];
+
+ prefix = (!source_bucket.empty() && source_bucket[source_bucket.size() - 1] == '*');
+
+ if (prefix) {
+ source_bucket = source_bucket.substr(0, source_bucket.size() - 1);
+ }
+
target_path = config["target_path"];
- if (target_path.empty()) {
- target_path = default_target_path;
+ connection_id = config["connection_id"];
+ acls_id = config["acls_id"];
+
+ if (config.exists("connection")) {
+ conn_conf = make_shared<AWSSyncConfig_Connection>();
+ conn_conf->init(config["connection"]);
}
- }
- void dump_conf(CephContext *cct, JSONFormatter& jf) const {
- Formatter::ObjectSection section(jf, "default");
- if (conn) {
- conn->dump_conf(cct, jf);
+ if (config.exists("acls")) {
+ acls = make_shared<ACLMappings>();
+ acls->init(config["acls"]);
}
- encode_json("target_path", target_path, &jf);
}
-};
-struct AWSSyncConfig_Target {
- string source_bucket;
- string target_path;
- string connection_id;
-
- void init(const JSONFormattable& config) {
- source_bucket = config["source_bucket"];
- target_path = config["target_path"];
- connection_id = config["connection_id"];
+ void dump_conf(CephContext *cct, JSONFormatter& jf, const char *section = "config") const {
+ Formatter::ObjectSection config(jf, section);
+ string sb{source_bucket};
+ if (prefix) {
+ sb.append("*");
+ }
+ encode_json("source_bucket", sb, &jf);
+ encode_json("target_path", target_path, &jf);
+ encode_json("connection_id", connection_id, &jf);
+ encode_json("acls_id", acls_id, &jf);
+ if (conn_conf.get()) {
+ conn_conf->dump_conf(cct, jf);
+ }
+ if (acls.get()) {
+ acls->dump_conf(cct, jf);
+ }
}
};
struct AWSSyncConfig {
- AWSSyncConfig_Default default_conf;
+ AWSSyncConfig_Profile default_profile;
+ std::shared_ptr<AWSSyncConfig_Profile> root_profile;
- map<string, AWSSyncConfig_Connection> connections;
+ map<string, std::shared_ptr<AWSSyncConfig_Connection> > connections;
+ AWSSyncConfig_ACLProfiles acl_profiles;
+
+ map<string, std::shared_ptr<AWSSyncConfig_Profile> > explicit_profiles;
AWSSyncConfig_S3 s3;
- struct Target {
- string path;
- string connection_id;
- bool prefix{false};
+ int init_profile(CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile,
+ bool connection_must_exist) {
+ if (!profile.connection_id.empty()) {
+ if (profile.conn_conf) {
+ ldout(cct, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl;
+ return -EINVAL;
+ }
+ if (connections.find(profile.connection_id) == connections.end()) {
+ ldout(cct, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl;
+ return -EINVAL;
+ }
+ profile.conn_conf = connections[profile.connection_id];
+ } else if (!profile.conn_conf) {
+ profile.connection_id = default_profile.connection_id;
+ auto i = connections.find(profile.connection_id);
+ if (i != connections.end()) {
+ profile.conn_conf = i->second;
+ }
+ }
+
+ if (connection_must_exist && !profile.conn_conf) {
+ ldout(cct, 0) << "ERROR: remote connection undefined for sync profile" << dendl;
+ return -EINVAL;
+ }
+
+ if (profile.conn_conf && default_profile.conn_conf) {
+ if (!profile.conn_conf->has_endpoint) {
+ profile.conn_conf->endpoint = default_profile.conn_conf->endpoint;
+ }
+ if (!profile.conn_conf->has_host_style) {
+ profile.conn_conf->host_style = default_profile.conn_conf->host_style;
+ }
+ if (!profile.conn_conf->has_key) {
+ profile.conn_conf->key = default_profile.conn_conf->key;
+ }
+ }
+
+ ACLMappings acl_mappings;
- void dump_conf(CephContext *cct, JSONFormatter& jf) const {
- Formatter::ObjectSection section(jf, "target");
- encode_json("path", path, &jf);
- encode_json("prefix", prefix, &jf);
- encode_json("connection_id", connection_id, &jf);
+ if (!profile.acls_id.empty()) {
+ if (!acl_profiles.find(profile.acls_id, &acl_mappings)) {
+ ldout(cct, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl;
+ return -EINVAL;
+ }
+ profile.acls = acl_profiles.acl_profiles[profile.acls_id];
+ } else if (!profile.acls) {
+ if (default_profile.acls) {
+ profile.acls = default_profile.acls;
+ profile.acls_id = default_profile.acls_id;
+ }
}
- };
- map<string, Target> explicit_targets;
+ if (profile.target_path.empty()) {
+ profile.target_path = default_profile.target_path;
+ }
+ if (profile.target_path.empty()) {
+ profile.target_path = default_target_path;
+ }
+
+ return 0;
+ }
+
+ int init_target(CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
+ std::shared_ptr<AWSSyncConfig_Profile> profile;
+ profile.reset(new AWSSyncConfig_Profile);
+ profile->init(profile_conf);
+
+ int ret = init_profile(cct, profile_conf, *profile, true);
+ if (ret < 0) {
+ return ret;
+ }
+
+ auto& sb = profile->source_bucket;
+
+ if (explicit_profiles.find(sb) != explicit_profiles.end()) {
+ ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl;
+ }
+
+ explicit_profiles[sb] = profile;
+ if (ptarget) {
+ *ptarget = profile;
+ }
+ return 0;
+ }
- bool find_target(const rgw_bucket bucket, const Target **result) const {
+ bool do_find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
const string& name = bucket.name;
- auto iter = explicit_targets.upper_bound(name);
- if (iter == explicit_targets.begin()) {
+ auto iter = explicit_profiles.upper_bound(name);
+ if (iter == explicit_profiles.begin()) {
return false;
}
return false;
}
- const Target *target = &iter->second;
+ std::shared_ptr<AWSSyncConfig_Profile>& target = iter->second;
if (!target->prefix &&
name.size() != iter->first.size()) {
return true;
}
+ void find_profile(const rgw_bucket bucket, std::shared_ptr<AWSSyncConfig_Profile> *result) {
+ if (!do_find_profile(bucket, result)) {
+ *result = root_profile;
+ }
+ }
+
AWSSyncConfig() {}
int init(CephContext *cct, const JSONFormattable& config) {
+ auto& default_conf = config["default"];
+
if (config.exists("default")) {
- default_conf.init(config["default"]);
+ default_profile.init(default_conf);
+ init_profile(cct, default_conf, default_profile, false);
}
- auto& default_conn = config["default"]["connection"];
-
for (auto conn : config["connections"].array()) {
auto new_conn = conn;
- new_conn.derive_from(default_conn);
- connections[new_conn["connection_id"]].init(new_conn);
+ std::shared_ptr<AWSSyncConfig_Connection> c{new AWSSyncConfig_Connection};
+ c->init(new_conn);
+
+ connections[new_conn["id"]] = c;
}
+ acl_profiles.init(config["acl_profiles"]);
+
int r = s3.init(cct, config["s3"]);
if (r < 0) {
return r;
}
- for (auto target_conf : config["targets"].array()) {
- AWSSyncConfig_Target tc;
- tc.init(target_conf);
-
- if (!tc.connection_id.empty() &&
- connections.find(tc.connection_id) == connections.end()) {
- ldout(cct, 0) << "ERROR: targets configuration reference non-existent connection_id=" << tc.connection_id << dendl;
- return -EINVAL;
- }
-
- Target t;
- t.connection_id = tc.connection_id;
- t.path = tc.target_path;
-
- auto& sb = tc.source_bucket;
-
- string s;
-
- t.prefix = (!sb.empty() && sb[sb.size() - 1] == '*');
+ auto new_root_conf = config;
- if (t.prefix) {
- sb = sb.substr(0, sb.size() - 1);
- }
+ r = init_target(cct, new_root_conf, &root_profile); /* the root profile config */
+ if (r < 0) {
+ return r;
+ }
- if (explicit_targets.find(sb) != explicit_targets.end()) {
- ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl;
+ for (auto target_conf : config["profiles"].array()) {
+ int r = init_target(cct, target_conf, nullptr);
+ if (r < 0) {
+ return r;
}
-
- explicit_targets[sb] = t;
}
JSONFormatter jf(true);
dump_conf(cct, jf);
-
stringstream ss;
jf.flush(ss);
}
void update_config(RGWDataSyncEnv *sync_env, const string& sid) {
- expand_target(sync_env, sid, default_conf.target_path, &default_conf.target_path);
- ldout(sync_env->cct, 20) << "updated target: (default) -> " << default_conf.target_path << dendl;
- for (auto& t : explicit_targets) {
- expand_target(sync_env, sid, t.second.path, &t.second.path);
- ldout(sync_env->cct, 20) << "updated target: " << t.first << " -> " << t.second.path << dendl;
+ expand_target(sync_env, sid, root_profile->target_path, &root_profile->target_path);
+ ldout(sync_env->cct, 20) << "updated target: (root) -> " << root_profile->target_path << dendl;
+ for (auto& t : explicit_profiles) {
+ expand_target(sync_env, sid, t.second->target_path, &t.second->target_path);
+ ldout(sync_env->cct, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl;
}
}
void dump_conf(CephContext *cct, JSONFormatter& jf) const {
Formatter::ObjectSection config(jf, "config");
- default_conf.dump_conf(cct, jf);
+ root_profile->dump_conf(cct, jf);
jf.open_array_section("connections");
for (auto c : connections) {
- c.second.dump_conf(cct, jf);
+ c.second->dump_conf(cct, jf);
}
jf.close_section();
- s3.dump_conf(cct, jf);
+ acl_profiles.dump_conf(cct, jf);
{ // targets
- Formatter::ArraySection as(jf, "targets");
- for (auto& t : explicit_targets) {
- Formatter::ObjectSection target_section(jf, "target");
+ Formatter::ArraySection as(jf, "profiles");
+ for (auto& t : explicit_profiles) {
+ Formatter::ObjectSection target_section(jf, "profile");
encode_json("name", t.first, &jf);
- t.second.dump_conf(cct, jf);
+ t.second->dump_conf(cct, jf);
}
}
}
- string get_path(const RGWBucketInfo& bucket_info,
- const rgw_obj_key& obj) const {
+ string get_path(std::shared_ptr<AWSSyncConfig_Profile>& profile,
+ const RGWBucketInfo& bucket_info,
+ const rgw_obj_key& obj) {
string bucket_str;
string owner;
if (!bucket_info.owner.tenant.empty()) {
owner += bucket_info.owner.id;
}
bucket_str += bucket_info.bucket.name;
- const Target *target{nullptr};
- const string *path{nullptr};
- if (find_target(bucket_info.bucket, &target)) {
- path = &target->path;
- }
- if (!path || path->empty()) {
- path = &default_conf.target_path;
- }
+
+ const string& path = profile->target_path;
string new_path;
- apply_meta_param(*path, "bucket", bucket_str, &new_path);
+ apply_meta_param(path, "bucket", bucket_str, &new_path);
apply_meta_param(new_path, "owner", owner, &new_path);
new_path += string("/") + get_key_oid(obj);
return new_path;
}
- void get_target(const RGWBucketInfo& bucket_info,
+ void get_target(std::shared_ptr<AWSSyncConfig_Profile>& profile,
+ const RGWBucketInfo& bucket_info,
const rgw_obj_key& obj,
string *bucket_name,
- string *obj_name) const {
- string path = get_path(bucket_info, obj);
+ string *obj_name) {
+ string path = get_path(profile, bucket_info, obj);
size_t pos = path.find('/');
*bucket_name = path.substr(0, pos);
*obj_name = path.substr(pos + 1);
}
+
+ void init_conns(RGWDataSyncEnv *sync_env, const string& id) {
+ update_config(sync_env, id);
+
+ auto& root_conf = root_profile->conn_conf;
+
+ root_profile->conn.reset(new S3RESTConn(sync_env->cct,
+ sync_env->store,
+ id,
+ { root_conf->endpoint },
+ root_conf->key,
+ root_conf->host_style));
+
+ for (auto i : explicit_profiles) {
+ auto& c = i.second;
+
+ c->conn.reset(new S3RESTConn(sync_env->cct,
+ sync_env->store,
+ id,
+ { c->conn_conf->endpoint },
+ c->conn_conf->key,
+ c->conn_conf->host_style));
+ }
+ }
};
+
struct AWSSyncInstanceEnv {
AWSSyncConfig conf;
string id;
- std::unique_ptr<RGWRESTConn> default_conn;
- struct Connection {
- AWSSyncConfig_Connection conf;
- std::unique_ptr<RGWRESTConn> conn;
- };
-
- map<string, Connection> connections;
-
- AWSSyncInstanceEnv(const AWSSyncConfig& _conf) : conf(_conf) {}
+ AWSSyncInstanceEnv(AWSSyncConfig& _conf) : conf(_conf) {}
void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) {
char buf[32];
snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id);
id = buf;
- conf.update_config(sync_env, id);
-
- auto& conn = conf.default_conf.conn;
- if (conn) {
- default_conn.reset(new S3RESTConn(sync_env->cct,
- sync_env->store,
- id,
- { conn->endpoint },
- conn->key,
- conn->host_style));
- }
-
- for (auto i : conf.connections) {
- auto& c = i.second;
-
- auto& dest = connections[c.connection_id];
- dest.conn.reset(new S3RESTConn(sync_env->cct,
- sync_env->store,
- id,
- { c.endpoint },
- c.key,
- c.host_style));
- dest.conf = c;
-
- }
+ conf.init_conns(sync_env, id);
}
- int get_conn(RGWDataSyncEnv *sync_env, const rgw_bucket& bucket, const AWSSyncConfig_Connection **conn_conf, RGWRESTConn **connection) const {
- const AWSSyncConfig::Target *target;
-
- if (!conf.find_target(bucket, &target) ||
- target->connection_id.empty()) {
- ldout(sync_env->cct, 20) << "Couldn't find configured target connection for bucket " << bucket.name << ", using default connection" << dendl;
-
- *conn_conf = conf.default_conf.conn.get();
- *connection = default_conn.get();
- return 0;
- }
-
- ldout(sync_env->cct, 20) << "Found configured target connection for bucket " << bucket.name << ", using connection id=" << target->connection_id << dendl;
-
- auto iter = connections.find(target->connection_id);
- if (iter == connections.end()) {
- ldout(sync_env->cct, 0) << "ERROR: connection " << target->connection_id << " is not configured" << dendl;
- return -EINVAL;
- }
- *connection = iter->second.conn.get();
- *conn_conf = &iter->second.conf;
- return 0;
+ void get_profile(const rgw_bucket& bucket, std::shared_ptr<AWSSyncConfig_Profile> *ptarget) {
+ conf.find_profile(bucket, ptarget);
+ assert(ptarget);
}
};
class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF
{
RGWDataSyncEnv *sync_env;
- const AWSSyncConfig_Connection *conf;
- RGWRESTConn *conn;
+ std::shared_ptr<AWSSyncConfig_Profile> target;
rgw_obj dest_obj;
string etag;
public:
RGWCoroutinesEnv *_env,
RGWCoroutine *_caller,
RGWDataSyncEnv *_sync_env,
- const AWSSyncConfig_Connection *_conf,
- RGWRESTConn* _conn,
+ std::shared_ptr<AWSSyncConfig_Profile>& _target,
rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager),
- sync_env(_sync_env), conf(_conf), conn(_conn), dest_obj(_dest_obj) {
+ sync_env(_sync_env), target(_target), dest_obj(_dest_obj) {
}
int init() {
rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() },
{ "partNumber", buf },
{ nullptr, nullptr } };
- conn->put_obj_send_init(dest_obj, params, &out_req);
+ target->conn->put_obj_send_init(dest_obj, params, &out_req);
} else {
- conn->put_obj_send_init(dest_obj, nullptr, &out_req);
+ target->conn->put_obj_send_init(dest_obj, nullptr, &out_req);
}
set_req(out_req);
map<int, vector<string> > access_map;
- for (auto& grant : acl.get_grant_map()) {
- auto& orig_grantee = grant.first;
- auto& perm = grant.second;
-
- string grantee;
+ if (target->acls) {
+ for (auto& grant : acl.get_grant_map()) {
+ auto& orig_grantee = grant.first;
+ auto& perm = grant.second;
- auto iter = conf->acl_mappings.find(orig_grantee);
- if (iter == conf->acl_mappings.end()) {
- ldout(sync_env->cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
- continue;
- }
+ string grantee;
- grantee = iter->second.dest_id;
+ const auto& am = target->acls->acl_mappings;
- string type;
-
- switch (iter->second.type) {
- case ACL_TYPE_CANON_USER:
- type = "id";
- break;
- case ACL_TYPE_EMAIL_USER:
- type = "emailAddress";
- break;
- case ACL_TYPE_GROUP:
- type = "uri";
- break;
- default:
+ auto iter = am.find(orig_grantee);
+ if (iter == am.end()) {
+ ldout(sync_env->cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl;
continue;
- }
+ }
- string tv = type + "=" + grantee;
+ grantee = iter->second.dest_id;
+
+ string type;
+
+ switch (iter->second.type) {
+ case ACL_TYPE_CANON_USER:
+ type = "id";
+ break;
+ case ACL_TYPE_EMAIL_USER:
+ type = "emailAddress";
+ break;
+ case ACL_TYPE_GROUP:
+ type = "uri";
+ break;
+ default:
+ continue;
+ }
- int flags = perm.get_permission().get_permissions();
- if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
- access_map[flags].push_back(tv);
- continue;
- }
+ string tv = type + "=" + grantee;
+
+ int flags = perm.get_permission().get_permissions();
+ if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) {
+ access_map[flags].push_back(tv);
+ continue;
+ }
- for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
- if (flags & i) {
- access_map[i].push_back(tv);
+ for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) {
+ if (flags & i) {
+ access_map[i].push_back(tv);
+ }
}
}
}
RGWAccessControlPolicy policy;
- r->send_ready(conn->get_key(), new_attrs, policy, false);
+ r->send_ready(target->conn->get_key(), new_attrs, policy, false);
}
void handle_headers(const map<string, string>& headers) {
class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
RGWRESTConn *source_conn;
- const AWSSyncConfig_Connection *dest_conf;
- RGWRESTConn *dest_conn;
+ std::shared_ptr<AWSSyncConfig_Profile> target;
rgw_obj src_obj;
rgw_obj dest_obj;
RGWRESTConn *_source_conn,
const rgw_obj& _src_obj,
const rgw_sync_aws_src_obj_properties& _src_properties,
- const AWSSyncConfig_Connection *_dest_conf,
- RGWRESTConn *_dest_conn,
+ std::shared_ptr<AWSSyncConfig_Profile> _target,
const rgw_obj& _dest_obj) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
source_conn(_source_conn),
- dest_conf(_dest_conf),
- dest_conn(_dest_conn),
+ target(_target),
src_obj(_src_obj),
dest_obj(_dest_obj),
src_properties(_src_properties) {}
src_properties));
/* init output */
- out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conf, dest_conn,
+ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, target,
dest_obj));
yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf));
class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
RGWRESTConn *source_conn;
- const AWSSyncConfig_Connection *dest_conf;
- RGWRESTConn *dest_conn;
+ std::shared_ptr<AWSSyncConfig_Profile> target;
rgw_obj src_obj;
rgw_obj dest_obj;
RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncEnv *_sync_env,
RGWRESTConn *_source_conn,
const rgw_obj& _src_obj,
- const AWSSyncConfig_Connection *_dest_conf,
- RGWRESTConn *_dest_conn,
+ std::shared_ptr<AWSSyncConfig_Profile>& _target,
const rgw_obj& _dest_obj,
const rgw_sync_aws_src_obj_properties& _src_properties,
const string& _upload_id,
string *_petag) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
source_conn(_source_conn),
- dest_conf(_dest_conf),
- dest_conn(_dest_conn),
+ target(_target),
src_obj(_src_obj),
dest_obj(_dest_obj),
src_properties(_src_properties),
in_crf->set_range(part_info.ofs, part_info.size);
/* init output */
- out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conf, dest_conn,
+ out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, target,
dest_obj));
out_crf->set_multipart(upload_id, part_info.part_num, part_info.size);
class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- const AWSSyncConfig& conf;
+ AWSSyncConfig& conf;
RGWRESTConn *source_conn;
- const AWSSyncConfig_Connection *dest_conf;
- RGWRESTConn *dest_conn;
+ std::shared_ptr<AWSSyncConfig_Profile> target;
rgw_obj src_obj;
rgw_obj dest_obj;
public:
RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env,
- const AWSSyncConfig& _conf,
+ AWSSyncConfig& _conf,
RGWRESTConn *_source_conn,
const rgw_obj& _src_obj,
- const AWSSyncConfig_Connection *_dest_conf,
- RGWRESTConn *_dest_conn,
+ std::shared_ptr<AWSSyncConfig_Profile>& _target,
const rgw_obj& _dest_obj,
uint64_t _obj_size,
const rgw_sync_aws_src_obj_properties& _src_properties) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
conf(_conf),
source_conn(_source_conn),
- dest_conf(_dest_conf),
- dest_conn(_dest_conn),
+ target(_target),
src_obj(_src_obj),
dest_obj(_dest_obj),
obj_size(_obj_size),
if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size ||
status.src_properties.etag != src_properties.etag) {
- yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
+ yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, target->conn.get(), dest_obj, status_obj, status.upload_id));
retcode = -ENOENT;
}
}
if (retcode == -ENOENT) {
- yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, &status.upload_id));
+ yield call(new RGWAWSInitMultipartCR(sync_env, target->conn.get(), dest_obj, status.obj_size, &status.upload_id));
if (retcode < 0) {
return set_cr_error(retcode);
}
call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env,
source_conn, src_obj,
- dest_conf,
- dest_conn, dest_obj,
+ target,
+ dest_obj,
status.src_properties,
status.upload_id,
cur_part_info,
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
ret_err = retcode;
- yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
+ yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, target->conn.get(), dest_obj, status_obj, status.upload_id));
return set_cr_error(ret_err);
}
ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl;
}
- yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, status.upload_id, status.parts));
+ yield call(new RGWAWSCompleteMultipartCR(sync_env, target->conn.get(), dest_obj, status.upload_id, status.parts));
if (retcode < 0) {
ldout(sync_env->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl;
ret_err = retcode;
- yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id));
+ yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, target->conn.get(), dest_obj, status_obj, status.upload_id));
return set_cr_error(ret_err);
}
// maybe use Fetch Remote Obj instead?
class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR {
- const AWSSyncInstanceEnv& instance;
+ AWSSyncInstanceEnv& instance;
RGWRESTConn *source_conn{nullptr};
- RGWRESTConn *dest_conn{nullptr};
- const AWSSyncConfig_Connection *dest_conf{nullptr};
+ std::shared_ptr<AWSSyncConfig_Profile> target;
bufferlist res;
unordered_map <string, bool> bucket_created;
string target_bucket_name;
RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info,
rgw_obj_key& _key,
- const AWSSyncInstanceEnv& _instance) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
+ AWSSyncInstanceEnv& _instance) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key),
instance(_instance)
{}
return set_cr_error(-EINVAL);
}
- instance.conf.get_target(bucket_info, key, &target_bucket_name, &target_obj_name);
-
- ret = instance.get_conn(sync_env, bucket_info.bucket, &dest_conf, &dest_conn);
- if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to get dest connection for bucket " << bucket_info.bucket << dendl;
- return set_cr_error(ret);
- }
+ instance.get_profile(bucket_info.bucket, &target);
+ instance.conf.get_target(target, bucket_info, key, &target_bucket_name, &target_obj_name);
if (bucket_created.find(target_bucket_name) == bucket_created.end()){
yield {
ldout(sync_env->cct,0) << "AWS: creating bucket " << target_bucket_name << dendl;
bufferlist bl;
- call(new RGWPutRawRESTResourceCR <bufferlist> (sync_env->cct, dest_conn,
+ call(new RGWPutRawRESTResourceCR <bufferlist> (sync_env->cct, target->conn.get(),
sync_env->http_manager,
target_bucket_name, nullptr, bl, &out_bl));
}
if (size < instance.conf.s3.multipart_sync_threshold) {
call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj,
src_properties,
- dest_conf,
- dest_conn, dest_obj));
+ target,
+ dest_obj));
} else {
call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj,
- dest_conf, dest_conn,
- dest_obj, size, src_properties));
+ target, dest_obj, size, src_properties));
}
}
if (retcode < 0) {
};
class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
- const AWSSyncInstanceEnv& instance;
+ AWSSyncInstanceEnv& instance;
public:
RGWAWSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- const AWSSyncInstanceEnv& _instance) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ AWSSyncInstanceEnv& _instance) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
instance(_instance) {
}
class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env{nullptr};
- RGWRESTConn *dest_conn{nullptr};
- const AWSSyncConfig_Connection *dest_conf{nullptr};
+ std::shared_ptr<AWSSyncConfig_Profile> target;
RGWBucketInfo bucket_info;
rgw_obj_key key;
ceph::real_time mtime;
- const AWSSyncInstanceEnv& instance;
+ AWSSyncInstanceEnv& instance;
int ret{0};
public:
RGWAWSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
- const AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
bucket_info(_bucket_info), key(_key),
mtime(_mtime), instance(_instance) {}
int operate() override {
ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone
<< " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
- string path = instance.conf.get_path(bucket_info, key);
+ instance.get_profile(bucket_info.bucket, &target);
+ string path = instance.conf.get_path(target, bucket_info, key);
ldout(sync_env->cct, 0) << "AWS: removing aws object at" << path << dendl;
- ret = instance.get_conn(sync_env, bucket_info.bucket, &dest_conf, &dest_conn);
- if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to get dest connection for bucket " << bucket_info.bucket << dendl;
- return set_cr_error(ret);
- }
- call(new RGWDeleteRESTResourceCR(sync_env->cct, dest_conn,
+ call(new RGWDeleteRESTResourceCR(sync_env->cct, target->conn.get(),
sync_env->http_manager,
path, nullptr /* params */));
}
CephContext *cct;
AWSSyncInstanceEnv instance;
public:
- RGWAWSDataSyncModule(CephContext *_cct, const AWSSyncConfig& _conf) :
+ RGWAWSDataSyncModule(CephContext *_cct, AWSSyncConfig& _conf) :
cct(_cct),
instance(_conf) {
}
class RGWAWSSyncModuleInstance : public RGWSyncModuleInstance {
RGWAWSDataSyncModule data_handler;
public:
- RGWAWSSyncModuleInstance(CephContext *cct, const AWSSyncConfig& _conf) : data_handler(cct, _conf) {}
+ RGWAWSSyncModuleInstance(CephContext *cct, AWSSyncConfig& _conf) : data_handler(cct, _conf) {}
RGWDataSyncModule *get_data_handler() override {
return &data_handler;
}