From 43ee30813f61bf30bbbb802c17f4938a56ac8a9a Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 2 Feb 2018 15:51:49 -0800 Subject: [PATCH] rgw: rework cloud sync configuration Change the configuration structure to separate between connections and acl mappings, and other changes. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_sync_module_aws.cc | 769 +++++++++++++++++++-------------- 1 file changed, 442 insertions(+), 327 deletions(-) diff --git a/src/rgw/rgw_sync_module_aws.cc b/src/rgw/rgw_sync_module_aws.cc index caa5bf4789a6e..b5f503b9d526a 100644 --- a/src/rgw/rgw_sync_module_aws.cc +++ b/src/rgw/rgw_sync_module_aws.cc @@ -42,39 +42,60 @@ static string obj_to_aws_path(const rgw_obj& obj) json configuration definition: { - default = { + "connection": { + "access_key": , + "secret": , + "endpoint": , + "host_style": , + }, + "acls": [ { "type": , + "source_id": , + "dest_id": } ... ], # optional, acl mappings, no mappings if does not exist + "target_path": , # override default + + + # anything below here is for non trivial configuration + # can be used in conjuction with the above + + "default": { "connection": { "access_key": , "secret": , "endpoint": , "host_style" , - "acl_mappings": [ # list of source uids and how they map into destination uids in the dest objects acls - { - "type" : , # optional, default is id - "source_id": , - "dest_id": - } ... }, + "acls": [ # list of source uids and how they map into destination uids in the dest objects acls + { + "type" : , # optional, default is id + "source_id": , + "dest_id": + } ... ] "target_path": "rgwx-${sid}/${bucket}" # how a bucket name is mapped to destination path, # final object name will be target_path + "/" + obj }, "connections": [ { - "connection_id": , + "id": , "access_key": , "secret": , "endpoint": , - "acl_mappings": [ # optional, overrides default - { - "source_id": , - "dest_id": - } ... ] } ... ], - "targets": [ + "acl_profiles": [ { - "source_bucket": , # can specify either specific bucket name (foo), or prefix (foo*) - "target_path": , # (override default) - "connection_id": # optional, if empty references default connection + "id": , # acl mappings + "acls": [ { + "type": , + "source_id": , + "dest_id": + } ... ] + } + ], + "profiles": [ + { + "source_bucket": , # can specify either specific bucket name (foo), or prefix (foo*) + "target_path": , # (override default) + "connection_id": , # optional, if empty references default connection + "acls_id": , # optional, if empty references default mappings } ... ], } @@ -93,47 +114,123 @@ owner: bucket owner */ -struct AWSSyncConfig_Connection { - string connection_id; - string endpoint; - RGWAccessKey key; - HostStyle host_style{PathStyle}; +struct ACLMapping { + ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER}; + string source_id; + string dest_id; - struct ACLMapping { - ACLGranteeTypeEnum type{ACL_TYPE_CANON_USER}; - string source_id; - string dest_id; + ACLMapping() = default; - ACLMapping(ACLGranteeTypeEnum t, - const string& s, - const string& d) : type(t), - source_id(s), - dest_id(d) {} + ACLMapping(ACLGranteeTypeEnum t, + const string& s, + const string& d) : type(t), + source_id(s), + dest_id(d) {} - void dump_conf(CephContext *cct, JSONFormatter& jf) const { - Formatter::ObjectSection os(jf, "acl_mapping"); - string s; - switch (type) { - case ACL_TYPE_EMAIL_USER: - s = "email"; - break; - case ACL_TYPE_GROUP: - s = "uri"; - break; - default: - s = "id"; - break; - } - encode_json("type", s, &jf); - encode_json("source_id", source_id, &jf); - encode_json("dest_id", dest_id, &jf); + void init(const JSONFormattable& config) { + string t = config["type"]; + + if (t == "email") { + type = ACL_TYPE_EMAIL_USER; + } else if (t == "uri") { + type = ACL_TYPE_GROUP; + } else { + type = ACL_TYPE_CANON_USER; } - }; + source_id = config["source_id"]; + dest_id = config["dest_id"]; + } + + void dump_conf(CephContext *cct, JSONFormatter& jf) const { + Formatter::ObjectSection os(jf, "acl_mapping"); + string s; + switch (type) { + case ACL_TYPE_EMAIL_USER: + s = "email"; + break; + case ACL_TYPE_GROUP: + s = "uri"; + break; + default: + s = "id"; + break; + } + encode_json("type", s, &jf); + encode_json("source_id", source_id, &jf); + encode_json("dest_id", dest_id, &jf); + } +}; + +struct ACLMappings { map acl_mappings; void init(const JSONFormattable& config) { - connection_id = config["connectionn_id"]; + for (auto c : config.array()) { + ACLMapping m; + m.init(c); + + acl_mappings.emplace(std::make_pair(m.source_id, m)); + } + } + void dump_conf(CephContext *cct, JSONFormatter& jf) const { + Formatter::ArraySection os(jf, "acls"); + + for (auto& i : acl_mappings) { + i.second.dump_conf(cct, jf); + } + } +}; + +struct AWSSyncConfig_ACLProfiles { + map > acl_profiles; + + void init(const JSONFormattable& config) { + for (auto c : config.array()) { + const string& profile_id = c["id"]; + + std::shared_ptr ap{new ACLMappings}; + ap->init(c["acls"]); + + acl_profiles[profile_id] = ap; + } + } + void dump_conf(CephContext *cct, JSONFormatter& jf) const { + Formatter::ArraySection section(jf, "acl_profiles"); + + for (auto& p : acl_profiles) { + Formatter::ObjectSection section(jf, "profile"); + encode_json("id", p.first, &jf); + p.second->dump_conf(cct, jf); + } + } + + bool find(const string& profile_id, ACLMappings *result) const { + auto iter = acl_profiles.find(profile_id); + if (iter == acl_profiles.end()) { + return false; + } + *result = *iter->second; + return true; + } +}; + +struct AWSSyncConfig_Connection { + string connection_id; + string endpoint; + RGWAccessKey key; + HostStyle host_style{PathStyle}; + + bool has_endpoint{false}; + bool has_key{false}; + bool has_host_style{false}; + + void init(const JSONFormattable& config) { + has_endpoint = config.exists("endpoint"); + has_key = config.exists("access_key") || config.exists("secret"); + has_host_style = config.exists("host_style"); + + connection_id = config["id"]; endpoint = config["endpoint"]; key = RGWAccessKey(config["access_key"], config["secret"]); @@ -143,24 +240,10 @@ struct AWSSyncConfig_Connection { } else { host_style = VirtualStyle; } - - for (auto c : config["acl_mappings"].array()) { - const string& source_id = c["source_id"]; - const string& type_str = c["type"]; - ACLGranteeTypeEnum type; - if (type_str == "email") { - type = ACL_TYPE_EMAIL_USER; - } else if (type_str == "uri") { - type = ACL_TYPE_GROUP; - } else { - type = ACL_TYPE_CANON_USER; - } - acl_mappings.emplace(std::make_pair(source_id, ACLMapping(type, source_id, c["dest_id"]))); - } } void dump_conf(CephContext *cct, JSONFormatter& jf) const { Formatter::ObjectSection section(jf, "connection"); - encode_json("connection_id", connection_id, &jf); + encode_json("id", connection_id, &jf); encode_json("endpoint", endpoint, &jf); string s = (host_style == PathStyle ? "path" : "virtual"); encode_json("host_style", s, &jf); @@ -171,13 +254,6 @@ struct AWSSyncConfig_Connection { string secret = (key.key.empty() ? "" : "******"); encode_json("secret", secret, &jf); } - - { - Formatter::ArraySection os(jf, "acl_mappings"); - for (auto& m : acl_mappings) { - m.second.dump_conf(cct, jf); - } - } } }; @@ -224,37 +300,58 @@ struct AWSSyncConfig_S3 { } }; -struct AWSSyncConfig_Default { - std::shared_ptr conn; +struct AWSSyncConfig_Profile { + string source_bucket; + bool prefix{false}; string target_path; + string connection_id; + string acls_id; + + std::shared_ptr conn_conf; + std::shared_ptr acls; + + std::shared_ptr conn; void init(const JSONFormattable& config) { - conn = make_shared(); - conn->init(config["connection"]); + source_bucket = config["source_bucket"]; + + prefix = (!source_bucket.empty() && source_bucket[source_bucket.size() - 1] == '*'); + + if (prefix) { + source_bucket = source_bucket.substr(0, source_bucket.size() - 1); + } + target_path = config["target_path"]; - if (target_path.empty()) { - target_path = default_target_path; + connection_id = config["connection_id"]; + acls_id = config["acls_id"]; + + if (config.exists("connection")) { + conn_conf = make_shared(); + conn_conf->init(config["connection"]); } - } - void dump_conf(CephContext *cct, JSONFormatter& jf) const { - Formatter::ObjectSection section(jf, "default"); - if (conn) { - conn->dump_conf(cct, jf); + if (config.exists("acls")) { + acls = make_shared(); + acls->init(config["acls"]); } - encode_json("target_path", target_path, &jf); } -}; -struct AWSSyncConfig_Target { - string source_bucket; - string target_path; - string connection_id; - - void init(const JSONFormattable& config) { - source_bucket = config["source_bucket"]; - target_path = config["target_path"]; - connection_id = config["connection_id"]; + void dump_conf(CephContext *cct, JSONFormatter& jf, const char *section = "config") const { + Formatter::ObjectSection config(jf, section); + string sb{source_bucket}; + if (prefix) { + sb.append("*"); + } + encode_json("source_bucket", sb, &jf); + encode_json("target_path", target_path, &jf); + encode_json("connection_id", connection_id, &jf); + encode_json("acls_id", acls_id, &jf); + if (conn_conf.get()) { + conn_conf->dump_conf(cct, jf); + } + if (acls.get()) { + acls->dump_conf(cct, jf); + } } }; @@ -280,31 +377,105 @@ static void apply_meta_param(const string& src, const string& param, const strin struct AWSSyncConfig { - AWSSyncConfig_Default default_conf; + AWSSyncConfig_Profile default_profile; + std::shared_ptr root_profile; - map connections; + map > connections; + AWSSyncConfig_ACLProfiles acl_profiles; + + map > explicit_profiles; AWSSyncConfig_S3 s3; - struct Target { - string path; - string connection_id; - bool prefix{false}; + int init_profile(CephContext *cct, const JSONFormattable& profile_conf, AWSSyncConfig_Profile& profile, + bool connection_must_exist) { + if (!profile.connection_id.empty()) { + if (profile.conn_conf) { + ldout(cct, 0) << "ERROR: ambiguous profile connection configuration, connection_id=" << profile.connection_id << dendl; + return -EINVAL; + } + if (connections.find(profile.connection_id) == connections.end()) { + ldout(cct, 0) << "ERROR: profile configuration reference non-existent connection_id=" << profile.connection_id << dendl; + return -EINVAL; + } + profile.conn_conf = connections[profile.connection_id]; + } else if (!profile.conn_conf) { + profile.connection_id = default_profile.connection_id; + auto i = connections.find(profile.connection_id); + if (i != connections.end()) { + profile.conn_conf = i->second; + } + } + + if (connection_must_exist && !profile.conn_conf) { + ldout(cct, 0) << "ERROR: remote connection undefined for sync profile" << dendl; + return -EINVAL; + } + + if (profile.conn_conf && default_profile.conn_conf) { + if (!profile.conn_conf->has_endpoint) { + profile.conn_conf->endpoint = default_profile.conn_conf->endpoint; + } + if (!profile.conn_conf->has_host_style) { + profile.conn_conf->host_style = default_profile.conn_conf->host_style; + } + if (!profile.conn_conf->has_key) { + profile.conn_conf->key = default_profile.conn_conf->key; + } + } + + ACLMappings acl_mappings; - void dump_conf(CephContext *cct, JSONFormatter& jf) const { - Formatter::ObjectSection section(jf, "target"); - encode_json("path", path, &jf); - encode_json("prefix", prefix, &jf); - encode_json("connection_id", connection_id, &jf); + if (!profile.acls_id.empty()) { + if (!acl_profiles.find(profile.acls_id, &acl_mappings)) { + ldout(cct, 0) << "ERROR: profile configuration reference non-existent acls id=" << profile.acls_id << dendl; + return -EINVAL; + } + profile.acls = acl_profiles.acl_profiles[profile.acls_id]; + } else if (!profile.acls) { + if (default_profile.acls) { + profile.acls = default_profile.acls; + profile.acls_id = default_profile.acls_id; + } } - }; - map explicit_targets; + if (profile.target_path.empty()) { + profile.target_path = default_profile.target_path; + } + if (profile.target_path.empty()) { + profile.target_path = default_target_path; + } + + return 0; + } + + int init_target(CephContext *cct, const JSONFormattable& profile_conf, std::shared_ptr *ptarget) { + std::shared_ptr profile; + profile.reset(new AWSSyncConfig_Profile); + profile->init(profile_conf); + + int ret = init_profile(cct, profile_conf, *profile, true); + if (ret < 0) { + return ret; + } + + auto& sb = profile->source_bucket; + + if (explicit_profiles.find(sb) != explicit_profiles.end()) { + ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl; + } + + explicit_profiles[sb] = profile; + if (ptarget) { + *ptarget = profile; + } + return 0; + } - bool find_target(const rgw_bucket bucket, const Target **result) const { + bool do_find_profile(const rgw_bucket bucket, std::shared_ptr *result) { const string& name = bucket.name; - auto iter = explicit_targets.upper_bound(name); - if (iter == explicit_targets.begin()) { + auto iter = explicit_profiles.upper_bound(name); + if (iter == explicit_profiles.begin()) { return false; } @@ -316,7 +487,7 @@ struct AWSSyncConfig { return false; } - const Target *target = &iter->second; + std::shared_ptr& target = iter->second; if (!target->prefix && name.size() != iter->first.size()) { @@ -327,61 +498,54 @@ struct AWSSyncConfig { return true; } + void find_profile(const rgw_bucket bucket, std::shared_ptr *result) { + if (!do_find_profile(bucket, result)) { + *result = root_profile; + } + } + AWSSyncConfig() {} int init(CephContext *cct, const JSONFormattable& config) { + auto& default_conf = config["default"]; + if (config.exists("default")) { - default_conf.init(config["default"]); + default_profile.init(default_conf); + init_profile(cct, default_conf, default_profile, false); } - auto& default_conn = config["default"]["connection"]; - for (auto conn : config["connections"].array()) { auto new_conn = conn; - new_conn.derive_from(default_conn); - connections[new_conn["connection_id"]].init(new_conn); + std::shared_ptr c{new AWSSyncConfig_Connection}; + c->init(new_conn); + + connections[new_conn["id"]] = c; } + acl_profiles.init(config["acl_profiles"]); + int r = s3.init(cct, config["s3"]); if (r < 0) { return r; } - for (auto target_conf : config["targets"].array()) { - AWSSyncConfig_Target tc; - tc.init(target_conf); - - if (!tc.connection_id.empty() && - connections.find(tc.connection_id) == connections.end()) { - ldout(cct, 0) << "ERROR: targets configuration reference non-existent connection_id=" << tc.connection_id << dendl; - return -EINVAL; - } - - Target t; - t.connection_id = tc.connection_id; - t.path = tc.target_path; - - auto& sb = tc.source_bucket; - - string s; - - t.prefix = (!sb.empty() && sb[sb.size() - 1] == '*'); + auto new_root_conf = config; - if (t.prefix) { - sb = sb.substr(0, sb.size() - 1); - } + r = init_target(cct, new_root_conf, &root_profile); /* the root profile config */ + if (r < 0) { + return r; + } - if (explicit_targets.find(sb) != explicit_targets.end()) { - ldout(cct, 0) << "WARNING: duplicate target configuration in sync module" << dendl; + for (auto target_conf : config["profiles"].array()) { + int r = init_target(cct, target_conf, nullptr); + if (r < 0) { + return r; } - - explicit_targets[sb] = t; } JSONFormatter jf(true); dump_conf(cct, jf); - stringstream ss; jf.flush(ss); @@ -403,37 +567,38 @@ struct AWSSyncConfig { } void update_config(RGWDataSyncEnv *sync_env, const string& sid) { - expand_target(sync_env, sid, default_conf.target_path, &default_conf.target_path); - ldout(sync_env->cct, 20) << "updated target: (default) -> " << default_conf.target_path << dendl; - for (auto& t : explicit_targets) { - expand_target(sync_env, sid, t.second.path, &t.second.path); - ldout(sync_env->cct, 20) << "updated target: " << t.first << " -> " << t.second.path << dendl; + expand_target(sync_env, sid, root_profile->target_path, &root_profile->target_path); + ldout(sync_env->cct, 20) << "updated target: (root) -> " << root_profile->target_path << dendl; + for (auto& t : explicit_profiles) { + expand_target(sync_env, sid, t.second->target_path, &t.second->target_path); + ldout(sync_env->cct, 20) << "updated target: " << t.first << " -> " << t.second->target_path << dendl; } } void dump_conf(CephContext *cct, JSONFormatter& jf) const { Formatter::ObjectSection config(jf, "config"); - default_conf.dump_conf(cct, jf); + root_profile->dump_conf(cct, jf); jf.open_array_section("connections"); for (auto c : connections) { - c.second.dump_conf(cct, jf); + c.second->dump_conf(cct, jf); } jf.close_section(); - s3.dump_conf(cct, jf); + acl_profiles.dump_conf(cct, jf); { // targets - Formatter::ArraySection as(jf, "targets"); - for (auto& t : explicit_targets) { - Formatter::ObjectSection target_section(jf, "target"); + Formatter::ArraySection as(jf, "profiles"); + for (auto& t : explicit_profiles) { + Formatter::ObjectSection target_section(jf, "profile"); encode_json("name", t.first, &jf); - t.second.dump_conf(cct, jf); + t.second->dump_conf(cct, jf); } } } - string get_path(const RGWBucketInfo& bucket_info, - const rgw_obj_key& obj) const { + string get_path(std::shared_ptr& profile, + const RGWBucketInfo& bucket_info, + const rgw_obj_key& obj) { string bucket_str; string owner; if (!bucket_info.owner.tenant.empty()) { @@ -441,17 +606,11 @@ struct AWSSyncConfig { owner += bucket_info.owner.id; } bucket_str += bucket_info.bucket.name; - const Target *target{nullptr}; - const string *path{nullptr}; - if (find_target(bucket_info.bucket, &target)) { - path = &target->path; - } - if (!path || path->empty()) { - path = &default_conf.target_path; - } + + const string& path = profile->target_path; string new_path; - apply_meta_param(*path, "bucket", bucket_str, &new_path); + apply_meta_param(path, "bucket", bucket_str, &new_path); apply_meta_param(new_path, "owner", owner, &new_path); new_path += string("/") + get_key_oid(obj); @@ -459,86 +618,61 @@ struct AWSSyncConfig { return new_path; } - void get_target(const RGWBucketInfo& bucket_info, + void get_target(std::shared_ptr& profile, + const RGWBucketInfo& bucket_info, const rgw_obj_key& obj, string *bucket_name, - string *obj_name) const { - string path = get_path(bucket_info, obj); + string *obj_name) { + string path = get_path(profile, bucket_info, obj); size_t pos = path.find('/'); *bucket_name = path.substr(0, pos); *obj_name = path.substr(pos + 1); } + + void init_conns(RGWDataSyncEnv *sync_env, const string& id) { + update_config(sync_env, id); + + auto& root_conf = root_profile->conn_conf; + + root_profile->conn.reset(new S3RESTConn(sync_env->cct, + sync_env->store, + id, + { root_conf->endpoint }, + root_conf->key, + root_conf->host_style)); + + for (auto i : explicit_profiles) { + auto& c = i.second; + + c->conn.reset(new S3RESTConn(sync_env->cct, + sync_env->store, + id, + { c->conn_conf->endpoint }, + c->conn_conf->key, + c->conn_conf->host_style)); + } + } }; + struct AWSSyncInstanceEnv { AWSSyncConfig conf; string id; - std::unique_ptr default_conn; - struct Connection { - AWSSyncConfig_Connection conf; - std::unique_ptr conn; - }; - - map connections; - - AWSSyncInstanceEnv(const AWSSyncConfig& _conf) : conf(_conf) {} + AWSSyncInstanceEnv(AWSSyncConfig& _conf) : conf(_conf) {} void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) { char buf[32]; snprintf(buf, sizeof(buf), "%llx", (unsigned long long)instance_id); id = buf; - conf.update_config(sync_env, id); - - auto& conn = conf.default_conf.conn; - if (conn) { - default_conn.reset(new S3RESTConn(sync_env->cct, - sync_env->store, - id, - { conn->endpoint }, - conn->key, - conn->host_style)); - } - - for (auto i : conf.connections) { - auto& c = i.second; - - auto& dest = connections[c.connection_id]; - dest.conn.reset(new S3RESTConn(sync_env->cct, - sync_env->store, - id, - { c.endpoint }, - c.key, - c.host_style)); - dest.conf = c; - - } + conf.init_conns(sync_env, id); } - int get_conn(RGWDataSyncEnv *sync_env, const rgw_bucket& bucket, const AWSSyncConfig_Connection **conn_conf, RGWRESTConn **connection) const { - const AWSSyncConfig::Target *target; - - if (!conf.find_target(bucket, &target) || - target->connection_id.empty()) { - ldout(sync_env->cct, 20) << "Couldn't find configured target connection for bucket " << bucket.name << ", using default connection" << dendl; - - *conn_conf = conf.default_conf.conn.get(); - *connection = default_conn.get(); - return 0; - } - - ldout(sync_env->cct, 20) << "Found configured target connection for bucket " << bucket.name << ", using connection id=" << target->connection_id << dendl; - - auto iter = connections.find(target->connection_id); - if (iter == connections.end()) { - ldout(sync_env->cct, 0) << "ERROR: connection " << target->connection_id << " is not configured" << dendl; - return -EINVAL; - } - *connection = iter->second.conn.get(); - *conn_conf = &iter->second.conf; - return 0; + void get_profile(const rgw_bucket& bucket, std::shared_ptr *ptarget) { + conf.find_profile(bucket, ptarget); + assert(ptarget); } }; @@ -643,8 +777,7 @@ public: class RGWAWSStreamPutCRF : public RGWStreamWriteHTTPResourceCRF { RGWDataSyncEnv *sync_env; - const AWSSyncConfig_Connection *conf; - RGWRESTConn *conn; + std::shared_ptr target; rgw_obj dest_obj; string etag; public: @@ -652,10 +785,9 @@ public: RGWCoroutinesEnv *_env, RGWCoroutine *_caller, RGWDataSyncEnv *_sync_env, - const AWSSyncConfig_Connection *_conf, - RGWRESTConn* _conn, + std::shared_ptr& _target, rgw_obj& _dest_obj) : RGWStreamWriteHTTPResourceCRF(_cct, _env, _caller, _sync_env->http_manager), - sync_env(_sync_env), conf(_conf), conn(_conn), dest_obj(_dest_obj) { + sync_env(_sync_env), target(_target), dest_obj(_dest_obj) { } int init() { @@ -668,9 +800,9 @@ public: rgw_http_param_pair params[] = { { "uploadId", multipart.upload_id.c_str() }, { "partNumber", buf }, { nullptr, nullptr } }; - conn->put_obj_send_init(dest_obj, params, &out_req); + target->conn->put_obj_send_init(dest_obj, params, &out_req); } else { - conn->put_obj_send_init(dest_obj, nullptr, &out_req); + target->conn->put_obj_send_init(dest_obj, nullptr, &out_req); } set_req(out_req); @@ -687,47 +819,51 @@ public: map > access_map; - for (auto& grant : acl.get_grant_map()) { - auto& orig_grantee = grant.first; - auto& perm = grant.second; - - string grantee; + if (target->acls) { + for (auto& grant : acl.get_grant_map()) { + auto& orig_grantee = grant.first; + auto& perm = grant.second; - auto iter = conf->acl_mappings.find(orig_grantee); - if (iter == conf->acl_mappings.end()) { - ldout(sync_env->cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl; - continue; - } + string grantee; - grantee = iter->second.dest_id; + const auto& am = target->acls->acl_mappings; - string type; - - switch (iter->second.type) { - case ACL_TYPE_CANON_USER: - type = "id"; - break; - case ACL_TYPE_EMAIL_USER: - type = "emailAddress"; - break; - case ACL_TYPE_GROUP: - type = "uri"; - break; - default: + auto iter = am.find(orig_grantee); + if (iter == am.end()) { + ldout(sync_env->cct, 20) << "acl_mappings: Could not find " << orig_grantee << " .. ignoring" << dendl; continue; - } + } - string tv = type + "=" + grantee; + grantee = iter->second.dest_id; + + string type; + + switch (iter->second.type) { + case ACL_TYPE_CANON_USER: + type = "id"; + break; + case ACL_TYPE_EMAIL_USER: + type = "emailAddress"; + break; + case ACL_TYPE_GROUP: + type = "uri"; + break; + default: + continue; + } - int flags = perm.get_permission().get_permissions(); - if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) { - access_map[flags].push_back(tv); - continue; - } + string tv = type + "=" + grantee; + + int flags = perm.get_permission().get_permissions(); + if ((flags & RGW_PERM_FULL_CONTROL) == RGW_PERM_FULL_CONTROL) { + access_map[flags].push_back(tv); + continue; + } - for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) { - if (flags & i) { - access_map[i].push_back(tv); + for (int i = 1; i <= RGW_PERM_WRITE_ACP; i <<= 1) { + if (flags & i) { + access_map[i].push_back(tv); + } } } } @@ -773,7 +909,7 @@ public: RGWAccessControlPolicy policy; - r->send_ready(conn->get_key(), new_attrs, policy, false); + r->send_ready(target->conn->get_key(), new_attrs, policy, false); } void handle_headers(const map& headers) { @@ -797,8 +933,7 @@ public: class RGWAWSStreamObjToCloudPlainCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; RGWRESTConn *source_conn; - const AWSSyncConfig_Connection *dest_conf; - RGWRESTConn *dest_conn; + std::shared_ptr target; rgw_obj src_obj; rgw_obj dest_obj; @@ -812,13 +947,11 @@ public: RGWRESTConn *_source_conn, const rgw_obj& _src_obj, const rgw_sync_aws_src_obj_properties& _src_properties, - const AWSSyncConfig_Connection *_dest_conf, - RGWRESTConn *_dest_conn, + std::shared_ptr _target, const rgw_obj& _dest_obj) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), source_conn(_source_conn), - dest_conf(_dest_conf), - dest_conn(_dest_conn), + target(_target), src_obj(_src_obj), dest_obj(_dest_obj), src_properties(_src_properties) {} @@ -831,7 +964,7 @@ public: src_properties)); /* init output */ - out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conf, dest_conn, + out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, target, dest_obj)); yield call(new RGWStreamSpliceCR(cct, sync_env->http_manager, in_crf, out_crf)); @@ -849,8 +982,7 @@ public: class RGWAWSStreamObjToCloudMultipartPartCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; RGWRESTConn *source_conn; - const AWSSyncConfig_Connection *dest_conf; - RGWRESTConn *dest_conn; + std::shared_ptr target; rgw_obj src_obj; rgw_obj dest_obj; @@ -869,8 +1001,7 @@ public: RGWAWSStreamObjToCloudMultipartPartCR(RGWDataSyncEnv *_sync_env, RGWRESTConn *_source_conn, const rgw_obj& _src_obj, - const AWSSyncConfig_Connection *_dest_conf, - RGWRESTConn *_dest_conn, + std::shared_ptr& _target, const rgw_obj& _dest_obj, const rgw_sync_aws_src_obj_properties& _src_properties, const string& _upload_id, @@ -878,8 +1009,7 @@ public: string *_petag) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), source_conn(_source_conn), - dest_conf(_dest_conf), - dest_conn(_dest_conn), + target(_target), src_obj(_src_obj), dest_obj(_dest_obj), src_properties(_src_properties), @@ -897,7 +1027,7 @@ public: in_crf->set_range(part_info.ofs, part_info.size); /* init output */ - out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, dest_conf, dest_conn, + out_crf.reset(new RGWAWSStreamPutCRF(cct, get_env(), this, sync_env, target, dest_obj)); out_crf->set_multipart(upload_id, part_info.part_num, part_info.size); @@ -1196,10 +1326,9 @@ public: class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - const AWSSyncConfig& conf; + AWSSyncConfig& conf; RGWRESTConn *source_conn; - const AWSSyncConfig_Connection *dest_conf; - RGWRESTConn *dest_conn; + std::shared_ptr target; rgw_obj src_obj; rgw_obj dest_obj; @@ -1217,19 +1346,17 @@ class RGWAWSStreamObjToCloudMultipartCR : public RGWCoroutine { public: RGWAWSStreamObjToCloudMultipartCR(RGWDataSyncEnv *_sync_env, - const AWSSyncConfig& _conf, + AWSSyncConfig& _conf, RGWRESTConn *_source_conn, const rgw_obj& _src_obj, - const AWSSyncConfig_Connection *_dest_conf, - RGWRESTConn *_dest_conn, + std::shared_ptr& _target, const rgw_obj& _dest_obj, uint64_t _obj_size, const rgw_sync_aws_src_obj_properties& _src_properties) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), conf(_conf), source_conn(_source_conn), - dest_conf(_dest_conf), - dest_conn(_dest_conn), + target(_target), src_obj(_src_obj), dest_obj(_dest_obj), obj_size(_obj_size), @@ -1254,13 +1381,13 @@ public: if (status.src_properties.mtime != src_properties.mtime || status.obj_size != obj_size || status.src_properties.etag != src_properties.etag) { - yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id)); + yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, target->conn.get(), dest_obj, status_obj, status.upload_id)); retcode = -ENOENT; } } if (retcode == -ENOENT) { - yield call(new RGWAWSInitMultipartCR(sync_env, dest_conn, dest_obj, status.obj_size, &status.upload_id)); + yield call(new RGWAWSInitMultipartCR(sync_env, target->conn.get(), dest_obj, status.obj_size, &status.upload_id)); if (retcode < 0) { return set_cr_error(retcode); } @@ -1287,8 +1414,8 @@ public: call(new RGWAWSStreamObjToCloudMultipartPartCR(sync_env, source_conn, src_obj, - dest_conf, - dest_conn, dest_obj, + target, + dest_obj, status.src_properties, status.upload_id, cur_part_info, @@ -1298,7 +1425,7 @@ public: if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: failed to sync obj=" << src_obj << ", sync via multipart upload, upload_id=" << status.upload_id << " part number " << status.cur_part << " (error: " << cpp_strerror(-retcode) << ")" << dendl; ret_err = retcode; - yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id)); + yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, target->conn.get(), dest_obj, status_obj, status.upload_id)); return set_cr_error(ret_err); } @@ -1310,11 +1437,11 @@ public: ldout(sync_env->cct, 20) << "sync of object=" << src_obj << " via multipart upload, finished sending part #" << status.cur_part << " etag=" << pcur_part_info->etag << dendl; } - yield call(new RGWAWSCompleteMultipartCR(sync_env, dest_conn, dest_obj, status.upload_id, status.parts)); + yield call(new RGWAWSCompleteMultipartCR(sync_env, target->conn.get(), dest_obj, status.upload_id, status.parts)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: failed to complete multipart upload of obj=" << src_obj << " (error: " << cpp_strerror(-retcode) << ")" << dendl; ret_err = retcode; - yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, dest_conn, dest_obj, status_obj, status.upload_id)); + yield call(new RGWAWSStreamAbortMultipartUploadCR(sync_env, target->conn.get(), dest_obj, status_obj, status.upload_id)); return set_cr_error(ret_err); } @@ -1354,11 +1481,10 @@ int decode_attr(map& attrs, const char *attr_name, T *result // maybe use Fetch Remote Obj instead? class RGWAWSHandleRemoteObjCBCR: public RGWStatRemoteObjCBCR { - const AWSSyncInstanceEnv& instance; + AWSSyncInstanceEnv& instance; RGWRESTConn *source_conn{nullptr}; - RGWRESTConn *dest_conn{nullptr}; - const AWSSyncConfig_Connection *dest_conf{nullptr}; + std::shared_ptr target; bufferlist res; unordered_map bucket_created; string target_bucket_name; @@ -1383,7 +1509,7 @@ public: RGWAWSHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - const AWSSyncInstanceEnv& _instance) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), + AWSSyncInstanceEnv& _instance) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), instance(_instance) {} @@ -1415,19 +1541,14 @@ public: return set_cr_error(-EINVAL); } - instance.conf.get_target(bucket_info, key, &target_bucket_name, &target_obj_name); - - ret = instance.get_conn(sync_env, bucket_info.bucket, &dest_conf, &dest_conn); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to get dest connection for bucket " << bucket_info.bucket << dendl; - return set_cr_error(ret); - } + instance.get_profile(bucket_info.bucket, &target); + instance.conf.get_target(target, bucket_info, key, &target_bucket_name, &target_obj_name); if (bucket_created.find(target_bucket_name) == bucket_created.end()){ yield { ldout(sync_env->cct,0) << "AWS: creating bucket " << target_bucket_name << dendl; bufferlist bl; - call(new RGWPutRawRESTResourceCR (sync_env->cct, dest_conn, + call(new RGWPutRawRESTResourceCR (sync_env->cct, target->conn.get(), sync_env->http_manager, target_bucket_name, nullptr, bl, &out_bl)); } @@ -1479,12 +1600,11 @@ public: if (size < instance.conf.s3.multipart_sync_threshold) { call(new RGWAWSStreamObjToCloudPlainCR(sync_env, source_conn, src_obj, src_properties, - dest_conf, - dest_conn, dest_obj)); + target, + dest_obj)); } else { call(new RGWAWSStreamObjToCloudMultipartCR(sync_env, instance.conf, source_conn, src_obj, - dest_conf, dest_conn, - dest_obj, size, src_properties)); + target, dest_obj, size, src_properties)); } } if (retcode < 0) { @@ -1499,11 +1619,11 @@ public: }; class RGWAWSHandleRemoteObjCR : public RGWCallStatRemoteObjCR { - const AWSSyncInstanceEnv& instance; + AWSSyncInstanceEnv& instance; public: RGWAWSHandleRemoteObjCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, - const AWSSyncInstanceEnv& _instance) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), + AWSSyncInstanceEnv& _instance) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key), instance(_instance) { } @@ -1516,17 +1636,16 @@ public: class RGWAWSRemoveRemoteObjCBCR : public RGWCoroutine { RGWDataSyncEnv *sync_env{nullptr}; - RGWRESTConn *dest_conn{nullptr}; - const AWSSyncConfig_Connection *dest_conf{nullptr}; + std::shared_ptr target; RGWBucketInfo bucket_info; rgw_obj_key key; ceph::real_time mtime; - const AWSSyncInstanceEnv& instance; + AWSSyncInstanceEnv& instance; int ret{0}; public: RGWAWSRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env, RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime, - const AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + AWSSyncInstanceEnv& _instance) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bucket_info(_bucket_info), key(_key), mtime(_mtime), instance(_instance) {} int operate() override { @@ -1534,15 +1653,11 @@ public: ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl; yield { - string path = instance.conf.get_path(bucket_info, key); + instance.get_profile(bucket_info.bucket, &target); + string path = instance.conf.get_path(target, bucket_info, key); ldout(sync_env->cct, 0) << "AWS: removing aws object at" << path << dendl; - ret = instance.get_conn(sync_env, bucket_info.bucket, &dest_conf, &dest_conn); - if (ret < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to get dest connection for bucket " << bucket_info.bucket << dendl; - return set_cr_error(ret); - } - call(new RGWDeleteRESTResourceCR(sync_env->cct, dest_conn, + call(new RGWDeleteRESTResourceCR(sync_env->cct, target->conn.get(), sync_env->http_manager, path, nullptr /* params */)); } @@ -1561,7 +1676,7 @@ class RGWAWSDataSyncModule: public RGWDataSyncModule { CephContext *cct; AWSSyncInstanceEnv instance; public: - RGWAWSDataSyncModule(CephContext *_cct, const AWSSyncConfig& _conf) : + RGWAWSDataSyncModule(CephContext *_cct, AWSSyncConfig& _conf) : cct(_cct), instance(_conf) { } @@ -1594,7 +1709,7 @@ public: class RGWAWSSyncModuleInstance : public RGWSyncModuleInstance { RGWAWSDataSyncModule data_handler; public: - RGWAWSSyncModuleInstance(CephContext *cct, const AWSSyncConfig& _conf) : data_handler(cct, _conf) {} + RGWAWSSyncModuleInstance(CephContext *cct, AWSSyncConfig& _conf) : data_handler(cct, _conf) {} RGWDataSyncModule *get_data_handler() override { return &data_handler; } -- 2.39.5