+#include <boost/utility/string_ref.hpp>
+
#include "common/ceph_json.h"
#include "common/RWLock.h"
#include "common/RefCountedObj.h"
}
};
-static string bucket_shard_str(const string& bucket_name, const string& bucket_id, int shard_id)
-{
- char shard_str[16];
- snprintf(shard_str, sizeof(shard_str), "%d", shard_id);
- return bucket_name + ":" + bucket_id + ":" + shard_str;
+// ostream wrappers to print buckets without copying strings
+struct bucket_str {
+ const rgw_bucket& b;
+ bucket_str(const rgw_bucket& b) : b(b) {}
+};
+std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
+ auto& b = rhs.b;
+ if (!b.tenant.empty()) {
+ out << b.tenant << '/';
+ }
+ out << b.name;
+ if (!b.bucket_id.empty()) {
+ out << ':' << b.bucket_id;
+ }
+ return out;
+}
+
+struct bucket_shard_str {
+ const rgw_bucket_shard& bs;
+ bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
+};
+std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
+ auto& bs = rhs.bs;
+ out << bucket_str{bs.bucket};
+ if (bs.shard_id >= 0) {
+ out << ':' << bs.shard_id;
+ }
+ return out;
}
class RGWRunBucketSyncCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- string bucket_name;
- string bucket_id;
+ rgw_bucket_shard bs;
RGWBucketInfo bucket_info;
- int shard_id;
rgw_bucket_shard_sync_info sync_status;
RGWMetaSyncEnv meta_sync_env;
RGWDataSyncDebugLogger logger;
public:
- RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env,
- const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct),
- sync_env(_sync_env),
- bucket_name(_bucket_name),
- bucket_id(_bucket_id), shard_id(_shard_id) {
-
- logger.init(sync_env, "Bucket", bucket_shard_str(bucket_name, bucket_id, shard_id));
+ RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs) {
+ logger.init(sync_env, "Bucket", bs.get_key());
}
int operate();
};
-static int parse_bucket_shard(CephContext *cct, const string& raw_key, string *bucket_name, string *bucket_instance, int *shard_id)
-{
- ssize_t pos = raw_key.find(':');
- *bucket_name = raw_key.substr(0, pos);
- *bucket_instance = raw_key.substr(pos + 1);
- pos = bucket_instance->find(':');
- *shard_id = -1;
- if (pos >= 0) {
- string err;
- string s = bucket_instance->substr(pos + 1);
- *shard_id = strict_strtol(s.c_str(), 10, &err);
- if (!err.empty()) {
- ldout(cct, 0) << "ERROR: failed to parse bucket instance key: " << *bucket_instance << dendl;
- return -EINVAL;
- }
-
- *bucket_instance = bucket_instance->substr(0, pos);
- }
- return 0;
-}
-
class RGWDataSyncSingleEntryCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
string raw_key;
string entry_marker;
- string bucket_name;
- string bucket_instance;
+ rgw_bucket_shard bs;
int sync_status;
reenter(this) {
do {
yield {
- int shard_id;
- int ret = parse_bucket_shard(sync_env->cct, raw_key, &bucket_name, &bucket_instance, &shard_id);
+ int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
+ &bs.bucket, &bs.shard_id);
if (ret < 0) {
return set_cr_error(-EIO);
}
if (marker_tracker) {
marker_tracker->reset_need_retry(raw_key);
}
- call(new RGWRunBucketSyncCoroutine(sync_env, bucket_name, bucket_instance, shard_id));
+ call(new RGWRunBucketSyncCoroutine(sync_env, bs));
}
} while (marker_tracker && marker_tracker->need_retry(raw_key));
sync_status = retcode;
if (sync_status < 0) {
- yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", bucket_name + ":" + bucket_instance,
+ yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
-sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
if (retcode < 0) {
ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
return string(buf);
}
-int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name,
- const string& _bucket_id, int _shard_id, RGWSyncErrorLogger *_error_logger)
+int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
+ const rgw_bucket& bucket, int shard_id,
+ RGWSyncErrorLogger *_error_logger)
{
conn = _conn;
source_zone = _source_zone;
- bucket_name = _bucket_name;
- bucket_id = _bucket_id;
- shard_id = _shard_id;
+ bs.bucket = bucket;
+ bs.shard_id = shard_id;
sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone);
class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
-
- string bucket_name;
- string bucket_id;
- int shard_id;
-
- string instance_key;
+ const string instance_key;
bucket_index_marker_info *info;
public:
RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
- const string& _bucket_name, const string& _bucket_id, int _shard_id,
- bucket_index_marker_info *_info) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id),
- info(_info) {
- instance_key = bucket_name + ":" + bucket_id;
- if (shard_id >= 0) {
- char buf[16];
- snprintf(buf, sizeof(buf), ":%d", shard_id);
- instance_key.append(buf);
- }
- }
+ const rgw_bucket_shard& bs,
+ bucket_index_marker_info *_info)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ instance_key(bs.get_key()), info(_info) {}
int operate() {
reenter(this) {
RGWDataSyncEnv *sync_env;
RGWRados *store;
- string bucket_name;
- string bucket_id;
- int shard_id;
-
+ rgw_bucket_shard bs;
string sync_status_oid;
string lock_name;
bucket_index_marker_info info;
public:
RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
- const string& _bucket_name, const string& _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id) {
+ const rgw_bucket_shard& bs)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs) {
store = sync_env->store;
lock_name = "sync_lock";
gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
string cookie = buf;
- sync_status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
+ sync_status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
}
int operate() {
}
}
/* fetch current position in logs */
- yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bucket_name, bucket_id, shard_id, &info));
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
if (retcode < 0 && retcode != -ENOENT) {
ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
return set_cr_error(retcode);
RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
{
- return new RGWInitBucketShardSyncStatusCoroutine(&sync_env,
- bucket_name, bucket_id, shard_id);
+ return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs);
}
template <class T>
map<string, bufferlist> attrs;
public:
RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
- const string& _bucket_name, const string _bucket_id, int _shard_id,
- rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_sync_env->cct),
- sync_env(_sync_env),
- oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, _bucket_name, _bucket_id, _shard_id)),
- status(_status) {}
+ const rgw_bucket_shard& bs,
+ rgw_bucket_shard_sync_info *_status)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
+ status(_status) {}
int operate();
};
}
RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
{
- return new RGWReadBucketSyncStatusCoroutine(&sync_env, bucket_name, bucket_id, shard_id, sync_status);
+ return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
}
RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
class RGWListBucketShardCR: public RGWCoroutine {
RGWDataSyncEnv *sync_env;
-
- string bucket_name;
- string bucket_id;
- int shard_id;
-
- string instance_key;
+ const rgw_bucket_shard& bs;
+ const string instance_key;
rgw_obj_key marker_position;
bucket_list_result *result;
public:
- RGWListBucketShardCR(RGWDataSyncEnv *_sync_env,
- const string& _bucket_name, const string& _bucket_id, int _shard_id,
- rgw_obj_key& _marker_position,
- bucket_list_result *_result) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id),
- marker_position(_marker_position),
- result(_result) {
- instance_key = bucket_name + ":" + bucket_id;
- if (shard_id >= 0) {
- char buf[16];
- snprintf(buf, sizeof(buf), ":%d", shard_id);
- instance_key.append(buf);
- }
- }
+ RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
+ rgw_obj_key& _marker_position, bucket_list_result *_result)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
+ instance_key(bs.get_key()), marker_position(_marker_position),
+ result(_result) {}
int operate() {
reenter(this) {
{ "key-marker" , marker_position.name.c_str() },
{ "version-id-marker" , marker_position.instance.c_str() },
{ NULL, NULL } };
-
- string p = string("/") + bucket_name;
+ // don't include tenant in the url, it's already part of instance_key
+ string p = string("/") + bs.bucket.name;
call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
}
if (retcode < 0) {
class RGWListBucketIndexLogCR: public RGWCoroutine {
RGWDataSyncEnv *sync_env;
-
- string bucket_name;
- string bucket_id;
- int shard_id;
-
- string instance_key;
+ const string instance_key;
string marker;
list<rgw_bi_log_entry> *result;
public:
- RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env,
- const string& _bucket_name, const string& _bucket_id, int _shard_id,
- string& _marker,
- list<rgw_bi_log_entry> *_result) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id),
- marker(_marker),
- result(_result) {
- instance_key = bucket_name + ":" + bucket_id;
- if (shard_id >= 0) {
- char buf[16];
- snprintf(buf, sizeof(buf), ":%d", shard_id);
- instance_key.append(buf);
- }
- }
+ RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
+ string& _marker, list<rgw_bi_log_entry> *_result)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ instance_key(bs.get_key()), marker(_marker), result(_result) {}
int operate() {
reenter(this) {
RGWDataSyncEnv *sync_env;
RGWBucketInfo *bucket_info;
- int shard_id;
+ const rgw_bucket_shard& bs;
rgw_obj_key key;
bool versioned;
public:
RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo *_bucket_info, int _shard_id,
+ RGWBucketInfo *_bucket_info,
+ const rgw_bucket_shard& bs,
const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
real_time& _timestamp,
const bucket_entry_owner& _owner,
RGWModifyOp _op, RGWPendingState _op_state,
const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
- bucket_info(_bucket_info), shard_id(_shard_id),
+ bucket_info(_bucket_info), bs(bs),
key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
owner(_owner),
timestamp(_timestamp), op(_op),
marker_tracker(_marker_tracker),
sync_status(0) {
stringstream ss;
- ss << bucket_shard_str(bucket_info->bucket.name, bucket_info->bucket.bucket_id, shard_id) << "/" << key << "[" << versioned_epoch << "]";
+ ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch << "]";
set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
set_status("init");
if (retcode < 0 && retcode != -ENOENT) {
set_status() << "failed to sync obj; retcode=" << retcode;
- rgw_bucket& bucket = bucket_info->bucket;
- ldout(sync_env->cct, 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key.name << dendl;
- error_ss << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key.name;
+ ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
+ << bucket_shard_str{bs} << "/" << key.name << dendl;
+ error_ss << bucket_shard_str{bs} << "/" << key.name;
sync_status = retcode;
}
if (!error_ss.str().empty()) {
class RGWBucketShardFullSyncCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- string bucket_name;
- string bucket_id;
- int shard_id;
+ const rgw_bucket_shard& bs;
RGWBucketInfo *bucket_info;
bucket_list_result list_result;
list<bucket_list_entry>::iterator entries_iter;
RGWDataSyncDebugLogger logger;
public:
- RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env,
- const string& _bucket_name, const string _bucket_id, int _shard_id,
+ RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
RGWBucketInfo *_bucket_info, rgw_bucket_shard_full_sync_marker& _full_marker) : RGWCoroutine(_sync_env->cct),
sync_env(_sync_env),
- bucket_name(_bucket_name),
- bucket_id(_bucket_id), shard_id(_shard_id),
+ bs(bs),
bucket_info(_bucket_info),
full_marker(_full_marker), marker_tracker(NULL),
spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL),
op(CLS_RGW_OP_ADD),
total_entries(0), lease_cr(NULL) {
- status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
- logger.init(sync_env, "BucketFull", bucket_shard_str(bucket_name, bucket_id, shard_id));
+ status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
+ logger.init(sync_env, "BucketFull", bs.get_key());
}
~RGWBucketShardFullSyncCR() {
do {
set_status("listing remote bucket");
ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
- yield call(new RGWListBucketShardCR(sync_env, bucket_name, bucket_id, shard_id,
- list_marker, &list_result));
+ yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
+ &list_result));
if (retcode < 0 && retcode != -ENOENT) {
set_status("failed bucket listing, going down");
yield lease_cr->go_down();
}
entries_iter = list_result.entries.begin();
for (; entries_iter != list_result.entries.end(); ++entries_iter) {
- ldout(sync_env->cct, 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl;
+ ldout(sync_env->cct, 20) << "[full sync] syncing object: "
+ << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
entry = &(*entries_iter);
total_entries++;
list_marker = entries_iter->key;
op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
yield {
- spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>(sync_env, bucket_info, shard_id,
+ spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>(sync_env, bucket_info, bs,
entry->key,
false, /* versioned, only matters for object removal */
entry->versioned_epoch, entry->mtime,
sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
map<string, bufferlist> attrs;
sync_status.encode_state_attr(attrs);
- string oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
+ string oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs);
RGWRados *store = sync_env->store;
call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
oid, attrs));
yield lease_cr->go_down();
drain_all();
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id
- << " retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
+ << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
return set_cr_done();
class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
- string bucket_name;
- string bucket_id;
- int shard_id;
+ const rgw_bucket_shard& bs;
RGWBucketInfo *bucket_info;
list<rgw_bi_log_entry> list_result;
list<rgw_bi_log_entry>::iterator entries_iter;
map<string, pair<real_time, RGWModifyOp> > squash_map;
rgw_bucket_shard_inc_sync_marker inc_marker;
rgw_obj_key key;
- rgw_bi_log_entry *entry;
- RGWBucketIncSyncShardMarkerTrack *marker_tracker;
- int spawn_window;
- bool updated_status;
- RGWContinuousLeaseCR *lease_cr;
- string status_oid;
+ rgw_bi_log_entry *entry{nullptr};
+ RGWBucketIncSyncShardMarkerTrack *marker_tracker{nullptr};
+ const int spawn_window{BUCKET_SYNC_SPAWN_WINDOW};
+ bool updated_status{false};
+ RGWContinuousLeaseCR *lease_cr{nullptr};
+ const string status_oid;
string name;
string instance;
public:
RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
- const string& _bucket_name, const string _bucket_id, int _shard_id,
- RGWBucketInfo *_bucket_info, rgw_bucket_shard_inc_sync_marker& _inc_marker) : RGWCoroutine(_sync_env->cct),
- sync_env(_sync_env),
- bucket_name(_bucket_name),
- bucket_id(_bucket_id), shard_id(_shard_id),
- bucket_info(_bucket_info),
- inc_marker(_inc_marker), entry(NULL), marker_tracker(NULL),
- spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false),
- lease_cr(NULL) {
- status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
- set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id;
+ const rgw_bucket_shard& bs,
+ RGWBucketInfo *_bucket_info,
+ rgw_bucket_shard_inc_sync_marker& _inc_marker)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
+ bucket_info(_bucket_info), inc_marker(_inc_marker),
+ status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
+ set_description() << "bucket shard incremental sync bucket="
+ << bucket_shard_str{bs};
set_status("init");
- logger.init(sync_env, "BucketInc", bucket_shard_str(bucket_name, bucket_id, shard_id));
+ logger.init(sync_env, "BucketInc", bs.get_key());
}
~RGWBucketShardIncrementalSyncCR() {
do {
ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
set_status() << "listing bilog; position=" << inc_marker.position;
- yield call(new RGWListBucketIndexLogCR(sync_env, bucket_name, bucket_id, shard_id,
- inc_marker.position, &list_result));
+ yield call(new RGWListBucketIndexLogCR(sync_env, bs, inc_marker.position,
+ &list_result));
if (retcode < 0 && retcode != -ENOENT) {
/* wait for all operations to complete */
drain_all_but(1);
set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
if (entry->op == CLS_RGW_OP_CANCEL) {
set_status() << "canceled operation, skipping";
- ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
+ ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
+ << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
if (entry->state != CLS_RGW_STATE_COMPLETE) {
set_status() << "non-complete operation, skipping";
- ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl;
+ ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
+ << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
if (make_pair<>(entry->timestamp, entry->op) != squash_map[entry->object]) {
set_status() << "squashed operation, skipping";
- ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": squashed operation" << dendl;
+ ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
+ << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
/* not updating high marker though */
continue;
}
- ldout(sync_env->cct, 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
+ ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
+ << bucket_shard_str{bs} << "/" << key << dendl;
updated_status = false;
while (!marker_tracker->can_do_op(key)) {
if (!updated_status) {
versioned_epoch = entry->ver.epoch;
}
ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
- spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(sync_env, bucket_info, shard_id,
+ spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(sync_env, bucket_info, bs,
key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op,
entry->state, cur_id, marker_tracker), false);
}
int RGWRunBucketSyncCoroutine::operate()
{
reenter(this) {
- yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bucket_name, bucket_id, shard_id, &sync_status));
+ yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
if (retcode < 0 && retcode != -ENOENT) {
- ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
+ << bucket_shard_str{bs} << dendl;
return set_cr_error(retcode);
}
- ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << ": " << sync_status.state << dendl;
+ ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
+ << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
- yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket_name, bucket_id, &bucket_info));
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
if (retcode == -ENOENT) {
/* bucket instance info has not been synced in yet, fetch it now */
yield {
- ldout(sync_env->cct, 10) << "no local info for bucket " << bucket_name << ":" << bucket_id << ": fetching metadata" << dendl;
- string raw_key = string("bucket.instance:") + bucket_name + ":" + bucket_id;
+ ldout(sync_env->cct, 10) << "no local info for bucket "
+ << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
+ string raw_key = string("bucket.instance:") + bs.bucket.get_key();
meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
NULL /* no marker tracker */));
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_name << ":" << bucket_id << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
return set_cr_error(retcode);
}
- yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket_name, bucket_id, &bucket_info));
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
return set_cr_error(retcode);
}
yield {
if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
- call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bucket_name, bucket_id, shard_id));
+ call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs));
sync_status.state = rgw_bucket_shard_sync_info::StateFullSync;
}
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
+ << " failed, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
yield {
if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
- call(new RGWBucketShardFullSyncCR(sync_env, bucket_name, bucket_id, shard_id,
- &bucket_info, sync_status.full_marker));
+ call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
+ sync_status.full_marker));
sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
}
}
if (retcode < 0) {
- ldout(sync_env->cct, 5) << "full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
+ << " failed, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
yield {
if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
- call(new RGWBucketShardIncrementalSyncCR(sync_env, bucket_name, bucket_id, shard_id,
- &bucket_info, sync_status.inc_marker));
+ call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
+ sync_status.inc_marker));
}
}
if (retcode < 0) {
- ldout(sync_env->cct, 5) << "incremental sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
+ << " failed, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
{
- return new RGWRunBucketSyncCoroutine(&sync_env, bucket_name, bucket_id, shard_id);
+ return new RGWRunBucketSyncCoroutine(&sync_env, bs);
}
int RGWBucketSyncStatusManager::init()
}
- string key = bucket_name + ":" + bucket_id;
+ const string key = bucket.get_key();
rgw_http_param_pair pairs[] = { { "key", key.c_str() },
{ NULL, NULL } };
for (int i = 0; i < effective_num_shards; i++) {
RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
- ret = l->init(source_zone, conn, bucket_name, bucket_id, (num_shards ? i : -1), error_logger);
+ ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
return ret;
int ret = cr_mgr.run(stacks);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " << bucket_name << ":" << bucket_id << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
+ << bucket_str{bucket} << dendl;
return ret;
}
int ret = cr_mgr.run(stacks);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " << bucket_name << ":" << bucket_id << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
+ << bucket_str{bucket} << dendl;
return ret;
}
return 0;
}
-string RGWBucketSyncStatusManager::status_oid(const string& source_zone, const string& bucket_name, const string& bucket_id, int shard_id)
+string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
+ const rgw_bucket_shard& bs)
{
- string oid = bucket_status_oid_prefix + "." + source_zone + ":" + bucket_name + ":" + bucket_id;
- if (shard_id >= 0) {
- char buf[16];
- snprintf(buf, sizeof(buf), ":%d", shard_id);
- oid.append(buf);
- }
- return oid;
+ return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
}