From: Casey Bodley Date: Wed, 29 Jun 2016 15:58:55 +0000 (-0400) Subject: rgw: carry tenant id with data sync X-Git-Tag: v10.2.3~13^2~5 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=a0ffffa5aad423f363bf10a6b8dbaf19a47262a2;p=ceph.git rgw: carry tenant id with data sync use rgw_bucket_shard to track buckets, which includes tenant id Fixes: http://tracker.ceph.com/issues/16469 Signed-off-by: Casey Bodley (cherry picked from commit d945e2099525e39588b572e357b115df98c8cdca) --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 8e0187d37d09..6d9fc32b6cbc 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1522,10 +1522,10 @@ static int update_period(const string& realm_id, const string& realm_name, return 0; } -static int init_bucket_for_sync(const string& tenant, const string& bucket_name, string& bucket_id) +static int init_bucket_for_sync(const string& tenant, const string& bucket_name, + const string& bucket_id, rgw_bucket& bucket) { RGWBucketInfo bucket_info; - rgw_bucket bucket; int ret = init_bucket(tenant, bucket_name, bucket_id, bucket_info, bucket); if (ret == -ENOENT) { @@ -1533,8 +1533,6 @@ static int init_bucket_for_sync(const string& tenant, const string& bucket_name, cerr << "ERROR: bucket id specified" << std::endl; return EINVAL; } - } else { - bucket_id = bucket.bucket_id; } if (ret < 0) { cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl; @@ -5121,11 +5119,12 @@ next: cerr << "ERROR: bucket not specified" << std::endl; return EINVAL; } - int ret = init_bucket_for_sync(tenant, bucket_name, bucket_id); + rgw_bucket bucket; + int ret = init_bucket_for_sync(tenant, bucket_name, bucket_id, bucket); if (ret < 0) { return -ret; } - RGWBucketSyncStatusManager sync(store, source_zone, bucket_name, bucket_id); + RGWBucketSyncStatusManager sync(store, source_zone, bucket); ret = sync.init(); if (ret < 0) { @@ -5148,11 +5147,12 @@ next: cerr << "ERROR: bucket not specified" << std::endl; return EINVAL; } - int ret = init_bucket_for_sync(tenant, bucket_name, bucket_id); + rgw_bucket bucket; + int ret = init_bucket_for_sync(tenant, bucket_name, bucket_id, bucket); if (ret < 0) { return -ret; } - RGWBucketSyncStatusManager sync(store, source_zone, bucket_name, bucket_id); + RGWBucketSyncStatusManager sync(store, source_zone, bucket); ret = sync.init(); if (ret < 0) { @@ -5180,11 +5180,12 @@ next: cerr << "ERROR: bucket not specified" << std::endl; return EINVAL; } - int ret = init_bucket_for_sync(tenant, bucket_name, bucket_id); + rgw_bucket bucket; + int ret = init_bucket_for_sync(tenant, bucket_name, bucket_id, bucket); if (ret < 0) { return -ret; } - RGWBucketSyncStatusManager sync(store, source_zone, bucket_name, bucket_id); + RGWBucketSyncStatusManager sync(store, source_zone, bucket); ret = sync.init(); if (ret < 0) { diff --git a/src/rgw/rgw_cr_rados.h b/src/rgw/rgw_cr_rados.h index 70d9b7bcdc55..fe337bce840a 100644 --- a/src/rgw/rgw_cr_rados.h +++ b/src/rgw/rgw_cr_rados.h @@ -685,10 +685,10 @@ class RGWGetBucketInstanceInfoCR : public RGWSimpleCoroutine { public: RGWGetBucketInstanceInfoCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, - const string& _bucket_name, const string& _bucket_id, - RGWBucketInfo *_bucket_info) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), - bucket_name(_bucket_name), bucket_id(_bucket_id), - bucket_info(_bucket_info), req(NULL) {} + const rgw_bucket& bucket, RGWBucketInfo *_bucket_info) + : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), + bucket_name(bucket.name), bucket_id(bucket.bucket_id), + bucket_info(_bucket_info), req(NULL) {} ~RGWGetBucketInstanceInfoCR() { request_cleanup(); } diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 2cece4cf5082..f9804b64d7d2 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -1,3 +1,5 @@ +#include + #include "common/ceph_json.h" #include "common/RWLock.h" #include "common/RefCountedObj.h" @@ -797,66 +799,61 @@ public: } }; -static string bucket_shard_str(const string& bucket_name, const string& bucket_id, int shard_id) -{ - char shard_str[16]; - snprintf(shard_str, sizeof(shard_str), "%d", shard_id); - return bucket_name + ":" + bucket_id + ":" + shard_str; +// ostream wrappers to print buckets without copying strings +struct bucket_str { + const rgw_bucket& b; + bucket_str(const rgw_bucket& b) : b(b) {} +}; +std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) { + auto& b = rhs.b; + if (!b.tenant.empty()) { + out << b.tenant << '/'; + } + out << b.name; + if (!b.bucket_id.empty()) { + out << ':' << b.bucket_id; + } + return out; +} + +struct bucket_shard_str { + const rgw_bucket_shard& bs; + bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {} +}; +std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) { + auto& bs = rhs.bs; + out << bucket_str{bs.bucket}; + if (bs.shard_id >= 0) { + out << ':' << bs.shard_id; + } + return out; } class RGWRunBucketSyncCoroutine : public RGWCoroutine { RGWDataSyncEnv *sync_env; - string bucket_name; - string bucket_id; + rgw_bucket_shard bs; RGWBucketInfo bucket_info; - int shard_id; rgw_bucket_shard_sync_info sync_status; RGWMetaSyncEnv meta_sync_env; RGWDataSyncDebugLogger logger; public: - RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, - const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - bucket_name(_bucket_name), - bucket_id(_bucket_id), shard_id(_shard_id) { - - logger.init(sync_env, "Bucket", bucket_shard_str(bucket_name, bucket_id, shard_id)); + RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs) { + logger.init(sync_env, "Bucket", bs.get_key()); } int operate(); }; -static int parse_bucket_shard(CephContext *cct, const string& raw_key, string *bucket_name, string *bucket_instance, int *shard_id) -{ - ssize_t pos = raw_key.find(':'); - *bucket_name = raw_key.substr(0, pos); - *bucket_instance = raw_key.substr(pos + 1); - pos = bucket_instance->find(':'); - *shard_id = -1; - if (pos >= 0) { - string err; - string s = bucket_instance->substr(pos + 1); - *shard_id = strict_strtol(s.c_str(), 10, &err); - if (!err.empty()) { - ldout(cct, 0) << "ERROR: failed to parse bucket instance key: " << *bucket_instance << dendl; - return -EINVAL; - } - - *bucket_instance = bucket_instance->substr(0, pos); - } - return 0; -} - class RGWDataSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; string raw_key; string entry_marker; - string bucket_name; - string bucket_instance; + rgw_bucket_shard bs; int sync_status; @@ -885,22 +882,22 @@ public: reenter(this) { do { yield { - int shard_id; - int ret = parse_bucket_shard(sync_env->cct, raw_key, &bucket_name, &bucket_instance, &shard_id); + int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key, + &bs.bucket, &bs.shard_id); if (ret < 0) { return set_cr_error(-EIO); } if (marker_tracker) { marker_tracker->reset_need_retry(raw_key); } - call(new RGWRunBucketSyncCoroutine(sync_env, bucket_name, bucket_instance, shard_id)); + call(new RGWRunBucketSyncCoroutine(sync_env, bs)); } } while (marker_tracker && marker_tracker->need_retry(raw_key)); sync_status = retcode; if (sync_status < 0) { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", bucket_name + ":" + bucket_instance, + yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key, -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status))); if (retcode < 0) { ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl; @@ -1563,14 +1560,14 @@ string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int s return string(buf); } -int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name, - const string& _bucket_id, int _shard_id, RGWSyncErrorLogger *_error_logger) +int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, + const rgw_bucket& bucket, int shard_id, + RGWSyncErrorLogger *_error_logger) { conn = _conn; source_zone = _source_zone; - bucket_name = _bucket_name; - bucket_id = _bucket_id; - shard_id = _shard_id; + bs.bucket = bucket; + bs.shard_id = shard_id; sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone); @@ -1591,28 +1588,16 @@ struct bucket_index_marker_info { class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - - string bucket_name; - string bucket_id; - int shard_id; - - string instance_key; + const string instance_key; bucket_index_marker_info *info; public: RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env, - const string& _bucket_name, const string& _bucket_id, int _shard_id, - bucket_index_marker_info *_info) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), - info(_info) { - instance_key = bucket_name + ":" + bucket_id; - if (shard_id >= 0) { - char buf[16]; - snprintf(buf, sizeof(buf), ":%d", shard_id); - instance_key.append(buf); - } - } + const rgw_bucket_shard& bs, + bucket_index_marker_info *_info) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + instance_key(bs.get_key()), info(_info) {} int operate() { reenter(this) { @@ -1638,10 +1623,7 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { RGWDataSyncEnv *sync_env; RGWRados *store; - string bucket_name; - string bucket_id; - int shard_id; - + rgw_bucket_shard bs; string sync_status_oid; string lock_name; @@ -1651,8 +1633,8 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { bucket_index_marker_info info; public: RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, - const string& _bucket_name, const string& _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id) { + const rgw_bucket_shard& bs) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs) { store = sync_env->store; lock_name = "sync_lock"; @@ -1662,7 +1644,7 @@ public: gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); string cookie = buf; - sync_status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id); + sync_status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs); } int operate() { @@ -1688,7 +1670,7 @@ public: } } /* fetch current position in logs */ - yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bucket_name, bucket_id, shard_id, &info)); + yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info)); if (retcode < 0 && retcode != -ENOENT) { ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl; return set_cr_error(retcode); @@ -1713,8 +1695,7 @@ public: RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr() { - return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, - bucket_name, bucket_id, shard_id); + return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs); } template @@ -1771,11 +1752,11 @@ class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine { map attrs; public: RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, - const string& _bucket_name, const string _bucket_id, int _shard_id, - rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, _bucket_name, _bucket_id, _shard_id)), - status(_status) {} + const rgw_bucket_shard& bs, + rgw_bucket_shard_sync_info *_status) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)), + status(_status) {} int operate(); }; @@ -1801,7 +1782,7 @@ int RGWReadBucketSyncStatusCoroutine::operate() } RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status) { - return new RGWReadBucketSyncStatusCoroutine(&sync_env, bucket_name, bucket_id, shard_id, sync_status); + return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status); } RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() { @@ -1888,31 +1869,18 @@ struct bucket_list_result { class RGWListBucketShardCR: public RGWCoroutine { RGWDataSyncEnv *sync_env; - - string bucket_name; - string bucket_id; - int shard_id; - - string instance_key; + const rgw_bucket_shard& bs; + const string instance_key; rgw_obj_key marker_position; bucket_list_result *result; public: - RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, - const string& _bucket_name, const string& _bucket_id, int _shard_id, - rgw_obj_key& _marker_position, - bucket_list_result *_result) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), - marker_position(_marker_position), - result(_result) { - instance_key = bucket_name + ":" + bucket_id; - if (shard_id >= 0) { - char buf[16]; - snprintf(buf, sizeof(buf), ":%d", shard_id); - instance_key.append(buf); - } - } + RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, + rgw_obj_key& _marker_position, bucket_list_result *_result) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), + instance_key(bs.get_key()), marker_position(_marker_position), + result(_result) {} int operate() { reenter(this) { @@ -1924,8 +1892,8 @@ public: { "key-marker" , marker_position.name.c_str() }, { "version-id-marker" , marker_position.instance.c_str() }, { NULL, NULL } }; - - string p = string("/") + bucket_name; + // don't include tenant in the url, it's already part of instance_key + string p = string("/") + bs.bucket.name; call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result)); } if (retcode < 0) { @@ -1939,31 +1907,16 @@ public: class RGWListBucketIndexLogCR: public RGWCoroutine { RGWDataSyncEnv *sync_env; - - string bucket_name; - string bucket_id; - int shard_id; - - string instance_key; + const string instance_key; string marker; list *result; public: - RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, - const string& _bucket_name, const string& _bucket_id, int _shard_id, - string& _marker, - list *_result) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), - marker(_marker), - result(_result) { - instance_key = bucket_name + ":" + bucket_id; - if (shard_id >= 0) { - char buf[16]; - snprintf(buf, sizeof(buf), ":%d", shard_id); - instance_key.append(buf); - } - } + RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, + string& _marker, list *_result) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + instance_key(bs.get_key()), marker(_marker), result(_result) {} int operate() { reenter(this) { @@ -2085,7 +2038,7 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; RGWBucketInfo *bucket_info; - int shard_id; + const rgw_bucket_shard& bs; rgw_obj_key key; bool versioned; @@ -2106,14 +2059,15 @@ class RGWBucketSyncSingleEntryCR : public RGWCoroutine { public: RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env, - RGWBucketInfo *_bucket_info, int _shard_id, + RGWBucketInfo *_bucket_info, + const rgw_bucket_shard& bs, const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch, real_time& _timestamp, const bucket_entry_owner& _owner, RGWModifyOp _op, RGWPendingState _op_state, const T& _entry_marker, RGWSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_info(_bucket_info), shard_id(_shard_id), + bucket_info(_bucket_info), bs(bs), key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch), owner(_owner), timestamp(_timestamp), op(_op), @@ -2122,7 +2076,7 @@ public: marker_tracker(_marker_tracker), sync_status(0) { stringstream ss; - ss << bucket_shard_str(bucket_info->bucket.name, bucket_info->bucket.bucket_id, shard_id) << "/" << key << "[" << versioned_epoch << "]"; + ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]"; set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state; ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl; set_status("init"); @@ -2186,9 +2140,9 @@ public: if (retcode < 0 && retcode != -ENOENT) { set_status() << "failed to sync obj; retcode=" << retcode; - rgw_bucket& bucket = bucket_info->bucket; - ldout(sync_env->cct, 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key.name << dendl; - error_ss << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key.name; + ldout(sync_env->cct, 0) << "ERROR: failed to sync object: " + << bucket_shard_str{bs} << "/" << key.name << dendl; + error_ss << bucket_shard_str{bs} << "/" << key.name; sync_status = retcode; } if (!error_ss.str().empty()) { @@ -2214,9 +2168,7 @@ done: class RGWBucketShardFullSyncCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - string bucket_name; - string bucket_id; - int shard_id; + const rgw_bucket_shard& bs; RGWBucketInfo *bucket_info; bucket_list_result list_result; list::iterator entries_iter; @@ -2235,19 +2187,17 @@ class RGWBucketShardFullSyncCR : public RGWCoroutine { RGWDataSyncDebugLogger logger; public: - RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, - const string& _bucket_name, const string _bucket_id, int _shard_id, + RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, RGWBucketInfo *_bucket_info, rgw_bucket_shard_full_sync_marker& _full_marker) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), - bucket_name(_bucket_name), - bucket_id(_bucket_id), shard_id(_shard_id), + bs(bs), bucket_info(_bucket_info), full_marker(_full_marker), marker_tracker(NULL), spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL), op(CLS_RGW_OP_ADD), total_entries(0), lease_cr(NULL) { - status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id); - logger.init(sync_env, "BucketFull", bucket_shard_str(bucket_name, bucket_id, shard_id)); + status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs); + logger.init(sync_env, "BucketFull", bs.get_key()); } ~RGWBucketShardFullSyncCR() { @@ -2291,8 +2241,8 @@ int RGWBucketShardFullSyncCR::operate() do { set_status("listing remote bucket"); ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl; - yield call(new RGWListBucketShardCR(sync_env, bucket_name, bucket_id, shard_id, - list_marker, &list_result)); + yield call(new RGWListBucketShardCR(sync_env, bs, list_marker, + &list_result)); if (retcode < 0 && retcode != -ENOENT) { set_status("failed bucket listing, going down"); yield lease_cr->go_down(); @@ -2301,7 +2251,8 @@ int RGWBucketShardFullSyncCR::operate() } entries_iter = list_result.entries.begin(); for (; entries_iter != list_result.entries.end(); ++entries_iter) { - ldout(sync_env->cct, 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl; + ldout(sync_env->cct, 20) << "[full sync] syncing object: " + << bucket_shard_str{bs} << "/" << entries_iter->key << dendl; entry = &(*entries_iter); total_entries++; list_marker = entries_iter->key; @@ -2311,7 +2262,7 @@ int RGWBucketShardFullSyncCR::operate() op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH); yield { - spawn(new RGWBucketSyncSingleEntryCR(sync_env, bucket_info, shard_id, + spawn(new RGWBucketSyncSingleEntryCR(sync_env, bucket_info, bs, entry->key, false, /* versioned, only matters for object removal */ entry->versioned_epoch, entry->mtime, @@ -2338,7 +2289,7 @@ int RGWBucketShardFullSyncCR::operate() sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; map attrs; sync_status.encode_state_attr(attrs); - string oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id); + string oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs); RGWRados *store = sync_env->store; call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool, oid, attrs)); @@ -2346,8 +2297,8 @@ int RGWBucketShardFullSyncCR::operate() yield lease_cr->go_down(); drain_all(); if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id - << " retcode=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket " + << bucket_shard_str{bs} << " retcode=" << retcode << dendl; return set_cr_error(retcode); } return set_cr_done(); @@ -2357,21 +2308,19 @@ int RGWBucketShardFullSyncCR::operate() class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; - string bucket_name; - string bucket_id; - int shard_id; + const rgw_bucket_shard& bs; RGWBucketInfo *bucket_info; list list_result; list::iterator entries_iter; map > squash_map; rgw_bucket_shard_inc_sync_marker inc_marker; rgw_obj_key key; - rgw_bi_log_entry *entry; - RGWBucketIncSyncShardMarkerTrack *marker_tracker; - int spawn_window; - bool updated_status; - RGWContinuousLeaseCR *lease_cr; - string status_oid; + rgw_bi_log_entry *entry{nullptr}; + RGWBucketIncSyncShardMarkerTrack *marker_tracker{nullptr}; + const int spawn_window{BUCKET_SYNC_SPAWN_WINDOW}; + bool updated_status{false}; + RGWContinuousLeaseCR *lease_cr{nullptr}; + const string status_oid; string name; string instance; @@ -2383,19 +2332,16 @@ class RGWBucketShardIncrementalSyncCR : public RGWCoroutine { public: RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env, - const string& _bucket_name, const string _bucket_id, int _shard_id, - RGWBucketInfo *_bucket_info, rgw_bucket_shard_inc_sync_marker& _inc_marker) : RGWCoroutine(_sync_env->cct), - sync_env(_sync_env), - bucket_name(_bucket_name), - bucket_id(_bucket_id), shard_id(_shard_id), - bucket_info(_bucket_info), - inc_marker(_inc_marker), entry(NULL), marker_tracker(NULL), - spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false), - lease_cr(NULL) { - status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id); - set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id; + const rgw_bucket_shard& bs, + RGWBucketInfo *_bucket_info, + rgw_bucket_shard_inc_sync_marker& _inc_marker) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs), + bucket_info(_bucket_info), inc_marker(_inc_marker), + status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) { + set_description() << "bucket shard incremental sync bucket=" + << bucket_shard_str{bs}; set_status("init"); - logger.init(sync_env, "BucketInc", bucket_shard_str(bucket_name, bucket_id, shard_id)); + logger.init(sync_env, "BucketInc", bs.get_key()); } ~RGWBucketShardIncrementalSyncCR() { @@ -2437,8 +2383,8 @@ int RGWBucketShardIncrementalSyncCR::operate() do { ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << dendl; set_status() << "listing bilog; position=" << inc_marker.position; - yield call(new RGWListBucketIndexLogCR(sync_env, bucket_name, bucket_id, shard_id, - inc_marker.position, &list_result)); + yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position, + &list_result)); if (retcode < 0 && retcode != -ENOENT) { /* wait for all operations to complete */ drain_all_but(1); @@ -2492,23 +2438,27 @@ int RGWBucketShardIncrementalSyncCR::operate() set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op; if (entry->op == CLS_RGW_OP_CANCEL) { set_status() << "canceled operation, skipping"; - ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl; + ldout(sync_env->cct, 20) << "[inc sync] skipping object: " + << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl; marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp); continue; } if (entry->state != CLS_RGW_STATE_COMPLETE) { set_status() << "non-complete operation, skipping"; - ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl; + ldout(sync_env->cct, 20) << "[inc sync] skipping object: " + << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl; marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp); continue; } if (make_pair<>(entry->timestamp, entry->op) != squash_map[entry->object]) { set_status() << "squashed operation, skipping"; - ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": squashed operation" << dendl; + ldout(sync_env->cct, 20) << "[inc sync] skipping object: " + << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl; /* not updating high marker though */ continue; } - ldout(sync_env->cct, 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl; + ldout(sync_env->cct, 20) << "[inc sync] syncing object: " + << bucket_shard_str{bs} << "/" << key << dendl; updated_status = false; while (!marker_tracker->can_do_op(key)) { if (!updated_status) { @@ -2541,7 +2491,7 @@ int RGWBucketShardIncrementalSyncCR::operate() versioned_epoch = entry->ver.epoch; } ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl; - spawn(new RGWBucketSyncSingleEntryCR(sync_env, bucket_info, shard_id, + spawn(new RGWBucketSyncSingleEntryCR(sync_env, bucket_info, bs, key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op, entry->state, cur_id, marker_tracker), false); } @@ -2584,20 +2534,23 @@ int RGWBucketShardIncrementalSyncCR::operate() int RGWRunBucketSyncCoroutine::operate() { reenter(this) { - yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bucket_name, bucket_id, shard_id, &sync_status)); + yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status)); if (retcode < 0 && retcode != -ENOENT) { - ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket=" + << bucket_shard_str{bs} << dendl; return set_cr_error(retcode); } - ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << ": " << sync_status.state << dendl; + ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket " + << bucket_shard_str{bs} << ": " << sync_status.state << dendl; - yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket_name, bucket_id, &bucket_info)); + yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info)); if (retcode == -ENOENT) { /* bucket instance info has not been synced in yet, fetch it now */ yield { - ldout(sync_env->cct, 10) << "no local info for bucket " << bucket_name << ":" << bucket_id << ": fetching metadata" << dendl; - string raw_key = string("bucket.instance:") + bucket_name + ":" + bucket_id; + ldout(sync_env->cct, 10) << "no local info for bucket " + << bucket_str{bs.bucket} << ": fetching metadata" << dendl; + string raw_key = string("bucket.instance:") + bs.bucket.get_key(); meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger); @@ -2607,48 +2560,51 @@ int RGWRunBucketSyncCoroutine::operate() NULL /* no marker tracker */)); } if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_name << ":" << bucket_id << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl; return set_cr_error(retcode); } - yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket_name, bucket_id, &bucket_info)); + yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info)); } if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl; + ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl; return set_cr_error(retcode); } yield { if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateInit) { - call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bucket_name, bucket_id, shard_id)); + call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs)); sync_status.state = rgw_bucket_shard_sync_info::StateFullSync; } } if (retcode < 0) { - ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; + ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs} + << " failed, retcode=" << retcode << dendl; return set_cr_error(retcode); } yield { if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { - call(new RGWBucketShardFullSyncCR(sync_env, bucket_name, bucket_id, shard_id, - &bucket_info, sync_status.full_marker)); + call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info, + sync_status.full_marker)); sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync; } } if (retcode < 0) { - ldout(sync_env->cct, 5) << "full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; + ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs} + << " failed, retcode=" << retcode << dendl; return set_cr_error(retcode); } yield { if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) { - call(new RGWBucketShardIncrementalSyncCR(sync_env, bucket_name, bucket_id, shard_id, - &bucket_info, sync_status.inc_marker)); + call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info, + sync_status.inc_marker)); } } if (retcode < 0) { - ldout(sync_env->cct, 5) << "incremental sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl; + ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs} + << " failed, retcode=" << retcode << dendl; return set_cr_error(retcode); } @@ -2660,7 +2616,7 @@ int RGWRunBucketSyncCoroutine::operate() RGWCoroutine *RGWRemoteBucketLog::run_sync_cr() { - return new RGWRunBucketSyncCoroutine(&sync_env, bucket_name, bucket_id, shard_id); + return new RGWRunBucketSyncCoroutine(&sync_env, bs); } int RGWBucketSyncStatusManager::init() @@ -2681,7 +2637,7 @@ int RGWBucketSyncStatusManager::init() } - string key = bucket_name + ":" + bucket_id; + const string key = bucket.get_key(); rgw_http_param_pair pairs[] = { { "key", key.c_str() }, { NULL, NULL } }; @@ -2704,7 +2660,7 @@ int RGWBucketSyncStatusManager::init() for (int i = 0; i < effective_num_shards; i++) { RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager); - ret = l->init(source_zone, conn, bucket_name, bucket_id, (num_shards ? i : -1), error_logger); + ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger); if (ret < 0) { ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl; return ret; @@ -2744,7 +2700,8 @@ int RGWBucketSyncStatusManager::read_sync_status() int ret = cr_mgr.run(stacks); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " << bucket_name << ":" << bucket_id << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " + << bucket_str{bucket} << dendl; return ret; } @@ -2765,21 +2722,17 @@ int RGWBucketSyncStatusManager::run() int ret = cr_mgr.run(stacks); if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " << bucket_name << ":" << bucket_id << dendl; + ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " + << bucket_str{bucket} << dendl; return ret; } return 0; } -string RGWBucketSyncStatusManager::status_oid(const string& source_zone, const string& bucket_name, const string& bucket_id, int shard_id) +string RGWBucketSyncStatusManager::status_oid(const string& source_zone, + const rgw_bucket_shard& bs) { - string oid = bucket_status_oid_prefix + "." + source_zone + ":" + bucket_name + ":" + bucket_id; - if (shard_id >= 0) { - char buf[16]; - snprintf(buf, sizeof(buf), ":%d", shard_id); - oid.append(buf); - } - return oid; + return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key(); } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 33b723ac3783..f3fc2f2a2836 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -394,11 +394,9 @@ WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info) class RGWRemoteBucketLog : public RGWCoroutinesManager { RGWRados *store; - RGWRESTConn *conn; + RGWRESTConn *conn{nullptr}; string source_zone; - string bucket_name; - string bucket_id; - int shard_id; + rgw_bucket_shard bs; RGWBucketSyncStatusManager *status_manager; RGWAsyncRadosProcessor *async_rados; @@ -406,16 +404,16 @@ class RGWRemoteBucketLog : public RGWCoroutinesManager { RGWDataSyncEnv sync_env; - RGWBucketSyncCR *sync_cr; + RGWBucketSyncCR *sync_cr{nullptr}; public: RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm, RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx(), _store->get_cr_registry()), store(_store), - conn(NULL), shard_id(0), - status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager), - sync_cr(NULL) {} + status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager) {} - int init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name, const string& _bucket_id, int _shard_id, RGWSyncErrorLogger *_error_logger); + int init(const string& _source_zone, RGWRESTConn *_conn, + const rgw_bucket& bucket, int shard_id, + RGWSyncErrorLogger *_error_logger); void finish(); RGWCoroutine *read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status); @@ -438,8 +436,7 @@ class RGWBucketSyncStatusManager { RGWRESTConn *conn; RGWSyncErrorLogger *error_logger; - string bucket_name; - string bucket_id; + rgw_bucket bucket; map source_logs; @@ -454,13 +451,13 @@ class RGWBucketSyncStatusManager { public: RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone, - const string& _bucket_name, const string& _bucket_id) : store(_store), + const rgw_bucket& bucket) : store(_store), cr_mgr(_store->ctx(), _store->get_cr_registry()), async_rados(NULL), http_manager(store->ctx(), cr_mgr.get_completion_mgr()), source_zone(_source_zone), conn(NULL), error_logger(NULL), - bucket_name(_bucket_name), bucket_id(_bucket_id), + bucket(bucket), num_shards(0) {} ~RGWBucketSyncStatusManager(); @@ -469,7 +466,7 @@ public: map& get_sync_status() { return sync_status; } int init_sync_status(); - static string status_oid(const string& source_zone, const string& bucket_name, const string& bucket_id, int shard_id); + static string status_oid(const string& source_zone, const rgw_bucket_shard& bs); int read_sync_status(); int run();