};
class RGWReadDataSyncStatusCoroutine : public RGWSimpleRadosReadCR<rgw_data_sync_info> {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
- RGWObjectCtx& obj_ctx;
+ RGWDataSyncEnv *sync_env;
- string source_zone;
+ RGWObjectCtx& obj_ctx;
rgw_data_sync_status *sync_status;
public:
- RGWReadDataSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
- RGWObjectCtx& _obj_ctx, const string& _source_zone,
- rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_async_rados, _store, _obj_ctx,
- _store->get_zone_params().log_pool,
- RGWDataSyncStatusManager::sync_status_oid(_source_zone),
+ RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, RGWObjectCtx& _obj_ctx,
+ rgw_data_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store, _obj_ctx,
+ _sync_env->store->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::sync_status_oid(_sync_env->source_zone),
&_status->sync_info),
- async_rados(_async_rados), store(_store),
- obj_ctx(_obj_ctx), source_zone(_source_zone),
+ sync_env(_sync_env),
+ obj_ctx(_obj_ctx),
sync_status(_status) {}
int handle_data(rgw_data_sync_info& data);
}
map<uint32_t, rgw_data_sync_marker>& markers = sync_status->sync_markers;
+ RGWRados *store = sync_env->store;
for (int i = 0; i < (int)data.num_shards; i++) {
- spawn(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
- RGWDataSyncStatusManager::shard_obj_name(source_zone, i), &markers[i]), true);
+ spawn(new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i), &markers[i]), true);
}
return 0;
}
class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
+ RGWDataSyncEnv *sync_env;
RGWRESTReadResource *http_op;
RGWDataChangesLogInfo *shard_info;
public:
- RGWReadRemoteDataLogShardInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn,
- int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn),
+ RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
+ int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
http_op(NULL),
shard_id(_shard_id),
shard_info(_shard_info) {
string p = "/admin/log/";
- http_op = new RGWRESTReadResource(conn, p, pairs, NULL, http_manager);
+ http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
http_op->set_user_info((void *)stack);
int ret = http_op->aio_read();
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
return set_cr_error(ret);
}
};
class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
+ RGWDataSyncEnv *sync_env;
RGWRESTReadResource *http_op;
read_remote_data_log_response response;
public:
- RGWReadRemoteDataLogShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn,
- int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn),
+ RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
+ int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
http_op(NULL),
shard_id(_shard_id),
pmarker(_pmarker),
string p = "/admin/log/";
- http_op = new RGWRESTReadResource(conn, p, pairs, NULL, http_manager);
+ http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
http_op->set_user_info((void *)stack);
int ret = http_op->aio_read();
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
return set_cr_error(ret);
}
};
class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
- RGWAsyncRadosProcessor *async_rados;
+ RGWDataSyncEnv *sync_env;
+
RGWRados *store;
- RGWHTTPManager *http_manager;
RGWObjectCtx& obj_ctx;
- string source_zone;
string sync_status_oid;
rgw_data_sync_info status;
map<int, RGWDataChangesLogInfo> shards_info;
public:
- RGWInitDataSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr,
- RGWObjectCtx& _obj_ctx, const string& _source_zone, uint32_t _num_shards) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
- http_manager(_http_mgr),
- obj_ctx(_obj_ctx), source_zone(_source_zone) {
+ RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
+ RGWObjectCtx& _obj_ctx, uint32_t _num_shards) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env), store(sync_env->store),
+ obj_ctx(_obj_ctx) {
lock_name = "sync_lock";
status.num_shards = _num_shards;
gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
string cookie = buf;
- sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(source_zone);
+ sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
}
int operate() {
reenter(this) {
yield {
uint32_t lock_duration = 30;
- call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+ call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie, lock_duration));
if (retcode < 0) {
ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
}
}
yield {
- call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+ call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
sync_status_oid, status));
}
yield { /* take lock again, we just recreated the object */
uint32_t lock_duration = 30;
- call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+ call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie, lock_duration));
if (retcode < 0) {
ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
}
/* fetch current position in logs */
yield {
- RGWRESTConn *conn = store->get_zone_conn_by_id(source_zone);
+ RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
if (!conn) {
- ldout(cct, 0) << "ERROR: connection to zone " << source_zone << " does not exist!" << dendl;
+ ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
return set_cr_error(-EIO);
}
for (int i = 0; i < (int)status.num_shards; i++) {
- spawn(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, conn, i, &shards_info[i]), true);
+ spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
}
}
while (collect(&ret)) {
RGWDataChangesLogInfo& info = shards_info[i];
marker.next_step_marker = info.marker;
marker.timestamp = info.last_update;
- spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
- RGWDataSyncStatusManager::shard_obj_name(source_zone, i), marker), true);
+ spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i), marker), true);
}
}
yield {
status.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
- call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+ call(new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
sync_status_oid, status));
}
yield { /* unlock */
- call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+ call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie));
}
while (collect(&ret)) {
rgw_http_param_pair pairs[] = { { "type", "data" },
{ NULL, NULL } };
- int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
+ int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
return ret;
return 0;
}
- source_zone = _source_zone;
- conn = _conn;
+ sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, NULL /* error_logger */, _source_zone);
int ret = http_manager.set_threaded();
if (ret < 0) {
{ NULL, NULL } };
RGWDataChangesLogInfo info;
- int ret = conn->get_json_resource("/admin/log", pairs, info);
+ int ret = sync_env.conn->get_json_resource("/admin/log", pairs, info);
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
return ret;
int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
{
RGWObjectCtx obj_ctx(store, NULL);
- int r = run(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, sync_status));
+ int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, obj_ctx, sync_status));
if (r == -ENOENT) {
r = 0;
}
int RGWRemoteDataLog::init_sync_status(int num_shards)
{
RGWObjectCtx obj_ctx(store, NULL);
- return run(new RGWInitDataSyncStatusCoroutine(async_rados, store, &http_manager, obj_ctx, source_zone, num_shards));
+ return run(new RGWInitDataSyncStatusCoroutine(&sync_env, obj_ctx, num_shards));
}
static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
};
class RGWListBucketIndexesCR : public RGWCoroutine {
+ RGWDataSyncEnv *sync_env;
+
RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
- string source_zone;
rgw_data_sync_status *sync_status;
int num_shards;
bool failed;
public:
- RGWListBucketIndexesCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn,
- const string& _source_zone,
- rgw_data_sync_status *_sync_status) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn), source_zone(_source_zone),
- sync_status(_sync_status),
+ RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
+ rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ store(sync_env->store), sync_status(_sync_status),
req_ret(0), entries_index(NULL), i(0), failed(false) {
- oid_prefix = datalog_sync_full_sync_index_prefix + "." + source_zone;
+ oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
path = "/admin/metadata/bucket.instance";
num_shards = sync_status->sync_info.num_shards;
}
int operate() {
reenter(this) {
- entries_index = new RGWShardedOmapCRManager(async_rados, store, this, num_shards,
+ entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
store->get_zone_params().log_pool,
oid_prefix);
yield {
string entrypoint = string("/admin/metadata/bucket.instance");
#warning need a better scaling solution here, requires streaming output
- call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), conn, http_manager,
+ call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
entrypoint, NULL, &result));
}
if (get_ret_status() < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
return set_state(RGWCoroutine_Error);
}
for (iter = result.begin(); iter != result.end(); ++iter) {
- ldout(store->ctx(), 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
+ ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
key = *iter;
rgw_http_param_pair pairs[] = { { "key", key.c_str() },
{ NULL, NULL } };
- call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, http_manager, path, pairs, &meta_info));
+ call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
}
num_shards = meta_info.data.get_bucket_info().num_shards;
int shard_id = (int)iter->first;
rgw_data_sync_marker& marker = iter->second;
marker.total_entries = entries_index->get_total_entries(shard_id);
- spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
- RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id), marker), true);
+ spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id), marker), true);
}
}
int ret;
#define DATA_SYNC_UPDATE_MARKER_WINDOW 1
class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
- RGWRados *store;
- RGWAsyncRadosProcessor *async_rados;
+ RGWDataSyncEnv *sync_env;
string marker_oid;
rgw_data_sync_marker sync_marker;
}
public:
- RGWDataSyncShardMarkerTrack(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
const string& _marker_oid,
const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
- store(_store),
- async_rados(_async_rados),
+ sync_env(_sync_env),
marker_oid(_marker_oid),
sync_marker(_marker) {}
sync_marker.marker = new_marker;
sync_marker.pos = index_pos;
- ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
- return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+ ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+ RGWRados *store = sync_env->store;
+
+ return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
marker_oid, sync_marker);
}
};
class RGWRunBucketSyncCoroutine : public RGWCoroutine {
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
- RGWRados *store;
- string source_zone;
+ RGWDataSyncEnv *sync_env;
string bucket_name;
string bucket_id;
RGWBucketInfo bucket_info;
int shard_id;
rgw_bucket_shard_sync_info sync_status;
- RGWMetaSyncEnv sync_env;
+ RGWMetaSyncEnv meta_sync_env;
public:
- RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, RGWRados *_store,
- const string& _source_zone,
- const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()),
- http_manager(_mgr), async_rados(_async_rados), conn(_conn),
- store(_store),
- source_zone(_source_zone),
+ RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env,
+ const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
bucket_name(_bucket_name),
bucket_id(_bucket_id), shard_id(_shard_id) {}
}
class RGWDataSyncSingleEntryCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
-
- RGWRESTConn *conn;
- string source_zone;
+ RGWDataSyncEnv *sync_env;
string raw_key;
string entry_marker;
RGWDataSyncShardMarkerTrack *marker_tracker;
public:
- RGWDataSyncSingleEntryCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, const string& _source_zone,
- const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn), source_zone(_source_zone),
+ RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
+ const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
raw_key(_raw_key), entry_marker(_entry_marker),
sync_status(0),
marker_tracker(_marker_tracker) {
- set_description() << "data sync single entry (source_zone=" << source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
+ set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
}
int operate() {
do {
yield {
int shard_id;
- int ret = parse_bucket_shard(store->ctx(), raw_key, &bucket_name, &bucket_instance, &shard_id);
+ int ret = parse_bucket_shard(sync_env->cct, raw_key, &bucket_name, &bucket_instance, &shard_id);
if (ret < 0) {
return set_cr_error(-EIO);
}
marker_tracker->reset_need_retry(raw_key);
- call(new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_instance, shard_id));
+ call(new RGWRunBucketSyncCoroutine(sync_env, bucket_name, bucket_instance, shard_id));
}
} while (marker_tracker->need_retry(raw_key));
#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
class RGWDataSyncShardCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
+ RGWDataSyncEnv *sync_env;
rgw_bucket pool;
- string source_zone;
uint32_t shard_id;
rgw_data_sync_marker sync_marker;
RGWContinuousLeaseCR *lease_cr;
string status_oid;
public:
- RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
- uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn),
+ RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
+ rgw_bucket& _pool,
+ uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
pool(_pool),
- source_zone(_source_zone),
shard_id(_shard_id),
sync_marker(_marker),
marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
lease_cr(NULL) {
- set_description() << "data sync shard source_zone=" << source_zone << " shard_id=" << shard_id;
- status_oid = RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id);
+ set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
+ status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
}
~RGWDataSyncShardCR() {
lease_cr->abort();
lease_cr->put();
}
- lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid,
+ RGWRados *store = sync_env->store;
+ lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
set_sleeping(true);
yield;
}
- oid = full_data_sync_index_shard_oid(source_zone, shard_id);
- set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados,
- status_oid,
- sync_marker));
+ oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
+ set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
total_entries = sync_marker.pos;
do {
- yield call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
+ yield call(new RGWRadosGetOmapKeysCR(sync_env->store, pool, oid, sync_marker.marker, &entries, max_entries));
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
iter = entries.begin();
for (; iter != entries.end(); ++iter) {
- ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
total_entries++;
if (!marker_tracker->start(iter->first, total_entries, utime_t())) {
- ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
} else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false);
+ yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false);
if (retcode < 0) {
lease_cr->go_down();
drain_all();
sync_marker.state = rgw_data_sync_marker::IncrementalSync;
sync_marker.marker = sync_marker.next_step_marker;
sync_marker.next_step_marker.clear();
- call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+ RGWRados *store = sync_env->store;
+ call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
status_oid, sync_marker));
}
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
lease_cr->go_down();
return set_cr_error(retcode);
}
yield;
}
set_status("lease acquired");
- set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados,
- status_oid,
- sync_marker));
+ set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
do {
current_modified.clear();
inc_lock.Lock();
/* process out of band updates */
for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
yield {
- ldout(store->ctx(), 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
- spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, *modified_iter, string(), marker_tracker), false);
+ ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker), false);
}
}
- yield call(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, conn, shard_id, &shard_info));
+ yield call(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &shard_info));
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to fetch remote data log info: ret=" << retcode << dendl;
lease_cr->go_down();
drain_all();
return set_cr_error(retcode);
}
datalog_marker = shard_info.marker;
#define INCREMENTAL_MAX_ENTRIES 100
- ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (datalog_marker > sync_marker.marker) {
spawned_keys.clear();
- yield call(new RGWReadRemoteDataLogShardCR(store, http_manager, async_rados, conn, shard_id, &sync_marker.marker, &log_entries, &truncated));
+ yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
- ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
- ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
continue;
}
if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
- ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
} else {
/*
* don't spawn the same key more than once. We can do that as long as we don't yield
*/
if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
spawned_keys.insert(log_iter->entry.key);
- spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
+ spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
if (retcode < 0) {
lease_cr->go_down();
drain_all();
int ret;
while (collect(&ret)) {
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we should have reported this error */
#warning deal with error
}
}
}
}
- ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (datalog_marker == sync_marker.marker) {
#define INCREMENTAL_INTERVAL 20
yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
};
class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
+ RGWDataSyncEnv *sync_env;
rgw_bucket pool;
- string source_zone;
uint32_t shard_id;
rgw_data_sync_marker sync_marker;
public:
- RGWDataSyncShardControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone,
- uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn),
+ RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_bucket& _pool,
+ uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct),
+ sync_env(_sync_env),
pool(_pool),
- source_zone(_source_zone),
shard_id(_shard_id),
sync_marker(_marker) {
}
RGWCoroutine *alloc_cr() {
- return new RGWDataSyncShardCR(store, http_manager, async_rados, conn, pool, source_zone, shard_id, sync_marker, backoff_ptr());
+ return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
}
void append_modified_shards(set<string>& keys) {
};
class RGWDataSyncCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
- string source_zone;
+ RGWDataSyncEnv *sync_env;
uint32_t num_shards;
RGWObjectCtx obj_ctx;
bool *reset_backoff;
public:
- RGWDataSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, const string& _source_zone, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn),
- source_zone(_source_zone),
+ RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
num_shards(_num_shards),
- obj_ctx(store),
+ obj_ctx(sync_env->store),
marker_tracker(NULL),
shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
reset_backoff(_reset_backoff) {
reenter(this) {
/* read sync status */
- yield call(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
+ yield call(new RGWReadDataSyncStatusCoroutine(sync_env, obj_ctx, &sync_status));
if (retcode == -ENOENT) {
sync_status.sync_info.num_shards = num_shards;
} else if (retcode < 0 && retcode != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
/* state: init status */
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
- ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
- yield call(new RGWInitDataSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, sync_status.sync_info.num_shards));
+ ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
+ yield call(new RGWInitDataSyncStatusCoroutine(sync_env, obj_ctx, sync_status.sync_info.num_shards));
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
sync_status.sync_info.num_shards = num_shards;
yield call(set_sync_info_cr());
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
/* state: building full sync maps */
- ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
- yield call(new RGWListBucketIndexesCR(store, http_manager, async_rados, conn, source_zone, &sync_status));
+ ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
+ yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
sync_status.sync_info.state = rgw_data_sync_info::StateSync;
/* update new state */
yield call(set_sync_info_cr());
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
case rgw_data_sync_info::StateSync:
for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
iter != sync_status.sync_markers.end(); ++iter) {
- RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(store, http_manager, async_rados,
- conn, store->get_zone_params().log_pool, source_zone,
+ RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
iter->first, iter->second);
shard_crs_lock.Lock();
shard_crs[iter->first] = cr;
}
RGWCoroutine *set_sync_info_cr() {
- return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(async_rados, store, store->get_zone_params().log_pool,
- RGWDataSyncStatusManager::sync_status_oid(source_zone),
+ RGWRados *store = sync_env->store;
+ return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
+ RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone),
sync_status.sync_info);
}
class RGWDataSyncControlCR : public RGWBackoffControlCR
{
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
- string source_zone;
+ RGWDataSyncEnv *sync_env;
uint32_t num_shards;
public:
- RGWDataSyncControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, const string& _source_zone, uint32_t _num_shards) : RGWBackoffControlCR(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn),
- source_zone(_source_zone), num_shards(_num_shards) {
+ RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct),
+ sync_env(_sync_env), num_shards(_num_shards) {
}
RGWCoroutine *alloc_cr() {
- return new RGWDataSyncCR(store, http_manager, async_rados, conn, source_zone, num_shards, backoff_ptr());
+ return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
}
void wakeup(int shard_id, set<string>& keys) {
{
RGWObjectCtx obj_ctx(store, NULL);
- int r = run(new RGWReadDataSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, &sync_status));
+ int r = run(new RGWReadDataSyncStatusCoroutine(&sync_env, obj_ctx, &sync_status));
if (r < 0 && r != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sync status from source_zone=" << source_zone << " r=" << r << dendl;
+ ldout(store->ctx(), 0) << "ERROR: failed to read sync status from source_zone=" << sync_env.source_zone << " r=" << r << dendl;
return r;
}
lock.get_write();
- data_sync_cr = new RGWDataSyncControlCR(store, &http_manager, async_rados, conn, source_zone, num_shards);
+ data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
lock.unlock();
r = run(data_sync_cr);
if (r < 0) {
bucket_id = _bucket_id;
shard_id = _shard_id;
+ sync_env.init(store->ctx(), store, conn, async_rados, http_manager, error_logger, source_zone);
+
return 0;
}
};
class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
-
- RGWRESTConn *conn;
+ RGWDataSyncEnv *sync_env;
string bucket_name;
string bucket_id;
bucket_index_marker_info *info;
public:
- RGWReadRemoteBucketIndexLogInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn,
+ RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
const string& _bucket_name, const string& _bucket_id, int _shard_id,
- bucket_index_marker_info *_info) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn),
+ bucket_index_marker_info *_info) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id),
info(_info) {
instance_key = bucket_name + ":" + bucket_id;
{ NULL, NULL } };
string p = "/admin/log/";
- call(new RGWReadRESTResourceCR<bucket_index_marker_info>(store->ctx(), conn, http_manager, p, pairs, info));
+ call(new RGWReadRESTResourceCR<bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
}
if (retcode < 0) {
return set_cr_error(retcode);
}
};
-class RGWReadBucketShardSyncStatusCR : public RGWSimpleRadosReadCR<rgw_bucket_shard_sync_info> {
- map<string, bufferlist> attrs;
-public:
- RGWReadBucketShardSyncStatusCR(RGWAsyncRadosProcessor *async_rados, RGWRados *store,
- RGWObjectCtx& obj_ctx, const string& source_zone,
- const string& bucket_name, const string bucket_id, int shard_id,
- rgw_bucket_shard_sync_info *status) : RGWSimpleRadosReadCR(async_rados, store, obj_ctx,
- store->get_zone_params().log_pool,
- RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id),
- status) {}
-
-};
-
-
class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
- RGWAsyncRadosProcessor *async_rados;
+ RGWDataSyncEnv *sync_env;
RGWRados *store;
- RGWHTTPManager *http_manager;
- string source_zone;
- RGWRESTConn *conn;
+
string bucket_name;
string bucket_id;
int shard_id;
bucket_index_marker_info info;
public:
- RGWInitBucketShardSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr,
- const string& _source_zone, RGWRESTConn *_conn,
- const string& _bucket_name, const string& _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
- http_manager(_http_mgr),
- source_zone(_source_zone), conn(_conn),
+ RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
+ const string& _bucket_name, const string& _bucket_id, int _shard_id) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id) {
+ store = sync_env->store;
lock_name = "sync_lock";
#define COOKIE_LEN 16
gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
string cookie = buf;
- sync_status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id);
+ sync_status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
}
int operate() {
reenter(this) {
yield {
uint32_t lock_duration = 30;
- call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+ call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie, lock_duration));
if (retcode < 0) {
ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
return set_cr_error(retcode);
}
}
- yield call(new RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+ yield call(new RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
sync_status_oid, status));
yield { /* take lock again, we just recreated the object */
uint32_t lock_duration = 30;
- call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+ call(new RGWSimpleRadosLockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie, lock_duration));
if (retcode < 0) {
ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
}
}
/* fetch current position in logs */
- yield call(new RGWReadRemoteBucketIndexLogInfoCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, &info));
+ yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bucket_name, bucket_id, shard_id, &info));
if (retcode < 0 && retcode != -ENOENT) {
ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
return set_cr_error(retcode);
status.inc_marker.position = info.max_marker;
map<string, bufferlist> attrs;
status.encode_all_attrs(attrs);
- call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+ call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
sync_status_oid, attrs));
}
yield { /* unlock */
- call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+ call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
lock_name, cookie));
}
return set_cr_done();
RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
{
- return new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, source_zone,
- conn, bucket_name, bucket_id, shard_id);
+ return new RGWInitBucketShardSyncStatusCoroutine(&sync_env,
+ bucket_name, bucket_id, shard_id);
}
template <class T>
}
class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
+ RGWDataSyncEnv *sync_env;
RGWObjectCtx obj_ctx;
string oid;
rgw_bucket_shard_sync_info *status;
map<string, bufferlist> attrs;
public:
- RGWReadBucketSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
- const string& _source_zone,
+ RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
const string& _bucket_name, const string _bucket_id, int _shard_id,
- rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_store->ctx()),
- async_rados(_async_rados),
- store(_store),
- obj_ctx(_store),
- oid(RGWBucketSyncStatusManager::status_oid(_source_zone, _bucket_name, _bucket_id, _shard_id)),
+ rgw_bucket_shard_sync_info *_status) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
+ obj_ctx(sync_env->store),
+ oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, _bucket_name, _bucket_id, _shard_id)),
status(_status) {}
int operate();
};
int RGWReadBucketSyncStatusCoroutine::operate()
{
reenter(this) {
- yield call(new RGWSimpleRadosReadAttrsCR(async_rados, store, obj_ctx,
- store->get_zone_params().log_pool,
+ yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store, obj_ctx,
+ sync_env->store->get_zone_params().log_pool,
oid,
&attrs));
if (retcode == -ENOENT) {
return set_cr_done();
}
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
return set_cr_error(retcode);
}
- status->decode_from_attrs(store->ctx(), attrs);
+ status->decode_from_attrs(sync_env->cct, attrs);
return set_cr_done();
}
return 0;
}
RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
{
- return new RGWReadBucketSyncStatusCoroutine(async_rados, store, source_zone,
- bucket_name, bucket_id, shard_id, sync_status);
+ return new RGWReadBucketSyncStatusCoroutine(&sync_env, bucket_name, bucket_id, shard_id, sync_status);
}
RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
};
class RGWListBucketShardCR: public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
-
- RGWRESTConn *conn;
+ RGWDataSyncEnv *sync_env;
string bucket_name;
string bucket_id;
bucket_list_result *result;
public:
- RGWListBucketShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn,
+ RGWListBucketShardCR(RGWDataSyncEnv *_sync_env,
const string& _bucket_name, const string& _bucket_id, int _shard_id,
rgw_obj_key& _marker_position,
- bucket_list_result *_result) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn),
+ bucket_list_result *_result) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id),
marker_position(_marker_position),
result(_result) {
{ NULL, NULL } };
string p = string("/") + bucket_name;
- call(new RGWReadRESTResourceCR<bucket_list_result>(store->ctx(), conn, http_manager, p, pairs, result));
+ call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
}
if (retcode < 0) {
return set_cr_error(retcode);
};
class RGWListBucketIndexLogCR: public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
-
- RGWRESTConn *conn;
+ RGWDataSyncEnv *sync_env;
string bucket_name;
string bucket_id;
list<rgw_bi_log_entry> *result;
public:
- RGWListBucketIndexLogCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn,
- const string& _bucket_name, const string& _bucket_id, int _shard_id,
- string& _marker,
- list<rgw_bi_log_entry> *_result) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- conn(_conn),
+ RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env,
+ const string& _bucket_name, const string& _bucket_id, int _shard_id,
+ string& _marker,
+ list<rgw_bi_log_entry> *_result) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id),
marker(_marker),
result(_result) {
{ "type", "bucket-index" },
{ NULL, NULL } };
- call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(store->ctx(), conn, http_manager, "/admin/log", pairs, result));
+ call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
}
if (retcode < 0) {
return set_cr_error(retcode);
#define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
- RGWRados *store;
- RGWAsyncRadosProcessor *async_rados;
+ RGWDataSyncEnv *sync_env;
string marker_oid;
rgw_bucket_shard_full_sync_marker sync_marker;
public:
- RGWBucketFullSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
+ RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
const string& _marker_oid,
const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
- store(_store),
- async_rados(_async_rados),
+ sync_env(_sync_env),
marker_oid(_marker_oid),
sync_marker(_marker) {}
map<string, bufferlist> attrs;
sync_marker.encode_attr(attrs);
- ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
- return new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+ RGWRados *store = sync_env->store;
+
+ ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+ return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
marker_oid, attrs);
}
};
class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
- RGWRados *store;
- RGWAsyncRadosProcessor *async_rados;
+ RGWDataSyncEnv *sync_env;
string marker_oid;
rgw_bucket_shard_inc_sync_marker sync_marker;
}
public:
- RGWBucketIncSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
+ RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
const string& _marker_oid,
const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
- store(_store),
- async_rados(_async_rados),
+ sync_env(_sync_env),
marker_oid(_marker_oid),
sync_marker(_marker) {}
map<string, bufferlist> attrs;
sync_marker.encode_attr(attrs);
- ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
- return new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+ RGWRados *store = sync_env->store;
+
+ ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+ return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
marker_oid, attrs);
}
template <class T, class K>
class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
- RGWRados *store;
- RGWAsyncRadosProcessor *async_rados;
+ RGWDataSyncEnv *sync_env;
- string source_zone;
RGWBucketInfo *bucket_info;
int shard_id;
public:
- RGWBucketSyncSingleEntryCR(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
- const string& _source_zone, RGWBucketInfo *_bucket_info, int _shard_id,
+ RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
+ RGWBucketInfo *_bucket_info, int _shard_id,
const rgw_obj_key& _key, bool _versioned, uint64_t _versioned_epoch,
utime_t& _timestamp,
const bucket_entry_owner& _owner,
RGWModifyOp _op, RGWPendingState _op_state,
- const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
- async_rados(_async_rados),
- source_zone(_source_zone),
+ const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
bucket_info(_bucket_info), shard_id(_shard_id),
key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
owner(_owner),
entry_marker(_entry_marker),
marker_tracker(_marker_tracker),
sync_status(0) {
- set_description() << "bucket sync single entry (source_zone=" << source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
- ldout(store->ctx(), 20) << "bucket sync single entry (source_zone=" << source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
+ set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
+ ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
set_status("init");
}
op == CLS_RGW_OP_LINK_OLH) {
if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
set_status("skipping entry");
- ldout(store->ctx(), 10) << "bucket skipping sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
+ ldout(sync_env->cct, 10) << "bucket skipping sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
goto done;
}
set_status("syncing obj");
- ldout(store->ctx(), 5) << "bucket sync: sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
- call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
+ ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
+ call(new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info,
key, versioned_epoch,
true));
} else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
versioned = true;
}
- call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned, versioned_epoch, NULL, NULL, false, ×tamp));
+ call(new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info, key, versioned, versioned_epoch, NULL, NULL, false, ×tamp));
} else if (op == CLS_RGW_OP_LINK_OLH_DM) {
set_status("creating delete marker");
- ldout(store->ctx(), 10) << "creating delete marker: obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
- call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, ×tamp));
+ ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
+ call(new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, *bucket_info, key, versioned, versioned_epoch, &owner.id, &owner.display_name, true, ×tamp));
}
}
} while (marker_tracker->need_retry(key));
if (retcode < 0 && retcode != -ENOENT) {
set_status() << "failed to sync obj; retcode=" << retcode;
rgw_bucket& bucket = bucket_info->bucket;
- ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl;
sync_status = retcode;
}
done:
#define BUCKET_SYNC_SPAWN_WINDOW 20
class RGWBucketShardFullSyncCR : public RGWCoroutine {
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
- RGWRados *store;
- string source_zone;
+ RGWDataSyncEnv *sync_env;
string bucket_name;
string bucket_id;
int shard_id;
string status_oid;
public:
- RGWBucketShardFullSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, RGWRados *_store,
- const string& _source_zone,
+ RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env,
const string& _bucket_name, const string _bucket_id, int _shard_id,
- RGWBucketInfo *_bucket_info, rgw_bucket_shard_full_sync_marker& _full_marker) : RGWCoroutine(_store->ctx()),
- http_manager(_mgr), async_rados(_async_rados), conn(_conn),
- store(_store),
- source_zone(_source_zone),
+ RGWBucketInfo *_bucket_info, rgw_bucket_shard_full_sync_marker& _full_marker) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
bucket_name(_bucket_name),
bucket_id(_bucket_id), shard_id(_shard_id),
bucket_info(_bucket_info),
spawn_window(BUCKET_SYNC_SPAWN_WINDOW), entry(NULL),
op(CLS_RGW_OP_ADD),
total_entries(0), lease_cr(NULL) {
- status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id);
+ status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
}
~RGWBucketShardFullSyncCR() {
set_status("acquiring sync lock");
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
- lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid,
+ RGWRados *store = sync_env->store;
+ lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
}
set_status("lock acquired");
list_marker = full_marker.position;
- marker_tracker = new RGWBucketFullSyncShardMarkerTrack(store, async_rados,
- status_oid,
- full_marker);
+ marker_tracker = new RGWBucketFullSyncShardMarkerTrack(sync_env, status_oid, full_marker);
total_entries = full_marker.count;
do {
set_status("listing remote bucket");
- ldout(store->ctx(), 20) << __func__ << "(): listing bucket for full sync" << dendl;
- yield call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
- list_marker, &list_result));
+ ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
+ yield call(new RGWListBucketShardCR(sync_env, bucket_name, bucket_id, shard_id,
+ list_marker, &list_result));
if (retcode < 0 && retcode != -ENOENT) {
set_status("failed bucket listing, going down");
yield lease_cr->go_down();
}
entries_iter = list_result.entries.begin();
for (; entries_iter != list_result.entries.end(); ++entries_iter) {
- ldout(store->ctx(), 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl;
+ ldout(sync_env->cct, 20) << "[full sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << entries_iter->key << dendl;
entry = &(*entries_iter);
total_entries++;
list_marker = entries_iter->key;
if (!marker_tracker->start(entry->key, total_entries, utime_t())) {
- ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
} else {
op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
yield {
- spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
+ spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>(sync_env, bucket_info, shard_id,
entry->key,
false, /* versioned, only matters for object removal */
entry->versioned_epoch, entry->mtime,
yield wait_for_child();
while (collect(&ret)) {
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we should have reported this error */
#warning deal with error
}
sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
map<string, bufferlist> attrs;
sync_status.encode_state_attr(attrs);
- string oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id);
- call(new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
+ string oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
+ RGWRados *store = sync_env->store;
+ call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
oid, attrs));
}
yield lease_cr->go_down();
drain_all();
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id
+ ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket " << bucket_name << ":" << bucket_id << ":" << shard_id
<< " retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
}
class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
- RGWRESTConn *conn;
- RGWRados *store;
- string source_zone;
+ RGWDataSyncEnv *sync_env;
string bucket_name;
string bucket_id;
int shard_id;
public:
- RGWBucketShardIncrementalSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- RGWRESTConn *_conn, RGWRados *_store,
- const string& _source_zone,
+ RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
const string& _bucket_name, const string _bucket_id, int _shard_id,
- RGWBucketInfo *_bucket_info, rgw_bucket_shard_inc_sync_marker& _inc_marker) : RGWCoroutine(_store->ctx()),
- http_manager(_mgr), async_rados(_async_rados), conn(_conn),
- store(_store),
- source_zone(_source_zone),
+ RGWBucketInfo *_bucket_info, rgw_bucket_shard_inc_sync_marker& _inc_marker) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
bucket_name(_bucket_name),
bucket_id(_bucket_id), shard_id(_shard_id),
bucket_info(_bucket_info),
inc_marker(_inc_marker), entry(NULL), marker_tracker(NULL),
spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false),
lease_cr(NULL) {
- status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id);
+ status_oid = RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bucket_name, bucket_id, shard_id);
set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id;
set_status("init");
}
set_status("acquiring sync lock");
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
- lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, status_oid,
+ RGWRados *store = sync_env->store;
+ lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, status_oid,
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
set_sleeping(true);
yield;
}
- marker_tracker = new RGWBucketIncSyncShardMarkerTrack(store, async_rados,
+ marker_tracker = new RGWBucketIncSyncShardMarkerTrack(sync_env,
status_oid,
inc_marker);
do {
- ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
+ ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
set_status() << "listing bilog; position=" << inc_marker.position;
- yield call(new RGWListBucketIndexLogCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
+ yield call(new RGWListBucketIndexLogCR(sync_env, bucket_name, bucket_id, shard_id,
inc_marker.position, &list_result));
if (retcode < 0 && retcode != -ENOENT) {
/* wait for all operations to complete */
if (!rgw_obj::parse_raw_oid(entries_iter->object, &name, &instance, &ns)) {
set_status() << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry";
- ldout(store->ctx(), 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl;
+ ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl;
continue;
}
- ldout(store->ctx(), 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
+ ldout(sync_env->cct, 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
if (!ns.empty()) {
set_status() << "skipping entry in namespace: " << entries_iter->object;
- ldout(store->ctx(), 20) << "skipping entry in namespace: " << entries_iter->object << dendl;
+ ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entries_iter->object << dendl;
continue;
}
set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op;
if (entry->op == CLS_RGW_OP_CANCEL) {
set_status() << "canceled operation, skipping";
- ldout(store->ctx(), 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
+ ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
continue;
}
if (entry->state != CLS_RGW_STATE_COMPLETE) {
set_status() << "non-complete operation, skipping";
- ldout(store->ctx(), 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl;
+ ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl;
continue;
}
- ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
+ ldout(sync_env->cct, 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
updated_status = false;
while (!marker_tracker->can_do_op(key, entry->op)) {
if (!updated_status) {
set_status() << "can't do op, conflicting inflight operation";
updated_status = true;
}
- ldout(store->ctx(), 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
+ ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
yield wait_for_child();
}
if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) {
set_status() << "can't do op, sync already in progress for object";
- ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
marker_tracker->try_update_high_marker(entry->id, 0, entries_iter->timestamp);
continue;
}
// yield {
set_status() << "start object sync";
if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) {
- ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
} else {
uint64_t versioned_epoch = 0;
bucket_entry_owner owner(entry->owner, entry->owner_display_name);
if (entry->ver.pool < 0) {
versioned_epoch = entry->ver.epoch;
}
- spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
+ spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(sync_env, bucket_info, shard_id,
key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op,
entry->state, entry->id, marker_tracker), false);
}
yield wait_for_child();
while (collect(&ret)) {
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: a sync operation returned error" << dendl;
/* we should have reported this error */
#warning deal with error
}
int RGWRunBucketSyncCoroutine::operate()
{
reenter(this) {
- yield call(new RGWReadBucketSyncStatusCoroutine(async_rados, store, source_zone, bucket_name, bucket_id, shard_id, &sync_status));
+ yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bucket_name, bucket_id, shard_id, &sync_status));
if (retcode < 0 && retcode != -ENOENT) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl;
return set_cr_error(retcode);
}
- ldout(store->ctx(), 20) << __func__ << "(): sync status for bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << ": " << sync_status.state << dendl;
+ ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket " << bucket_name << ":" << bucket_id << ":" << shard_id << ": " << sync_status.state << dendl;
- yield call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info));
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket_name, bucket_id, &bucket_info));
if (retcode == -ENOENT) {
/* bucket instance info has not been synced in yet, fetch it now */
yield {
- ldout(store->ctx(), 10) << "no local info for bucket " << bucket_name << ":" << bucket_id << ": fetching metadata" << dendl;
+ ldout(sync_env->cct, 10) << "no local info for bucket " << bucket_name << ":" << bucket_id << ": fetching metadata" << dendl;
string raw_key = string("bucket.instance:") + bucket_name + ":" + bucket_id;
- sync_env.init(cct, store, store->rest_master_conn, async_rados, http_manager);
+ meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
- call(new RGWMetaSyncSingleEntryCR(&sync_env, raw_key,
+ call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
string() /* no marker */,
MDLOG_STATUS_COMPLETE,
NULL /* no marker tracker */));
}
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket instance info for " << bucket_name << ":" << bucket_id << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_name << ":" << bucket_id << dendl;
return set_cr_error(retcode);
}
- yield call(new RGWGetBucketInstanceInfoCR(async_rados, store, bucket_name, bucket_id, &bucket_info));
+ yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bucket_name, bucket_id, &bucket_info));
}
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_name << " bucket_id=" << bucket_id << dendl;
return set_cr_error(retcode);
}
yield {
if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
- call(new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, source_zone,
- conn, bucket_name, bucket_id, shard_id));
+ call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bucket_name, bucket_id, shard_id));
sync_status.state = rgw_bucket_shard_sync_info::StateFullSync;
}
}
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: init sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
yield {
if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
- call(new RGWBucketShardFullSyncCR(http_manager, async_rados, conn, store,
- source_zone, bucket_name, bucket_id, shard_id,
+ call(new RGWBucketShardFullSyncCR(sync_env, bucket_name, bucket_id, shard_id,
&bucket_info, sync_status.full_marker));
sync_status.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
}
}
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: full sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
yield {
if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
- call(new RGWBucketShardIncrementalSyncCR(http_manager, async_rados, conn, store,
- source_zone, bucket_name, bucket_id, shard_id,
+ call(new RGWBucketShardIncrementalSyncCR(sync_env, bucket_name, bucket_id, shard_id,
&bucket_info, sync_status.inc_marker));
}
}
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: incremental sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: incremental sync on " << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << " failed, retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
{
- return new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, source_zone, bucket_name, bucket_id, shard_id);
+ return new RGWRunBucketSyncCoroutine(&sync_env, bucket_name, bucket_id, shard_id);
}
int RGWBucketSyncStatusManager::init()