class RGWReadRemoteMDLogShardInfoCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
-
+ RGWMetaSyncEnv *env;
RGWRESTReadResource *http_op;
+ const std::string& period;
int shard_id;
RGWMetadataLogInfo *shard_info;
public:
- RGWReadRemoteMDLogShardInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- int _shard_id, RGWMetadataLogInfo *_shard_info) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
- http_op(NULL),
- shard_id(_shard_id),
- shard_info(_shard_info) {
- }
+ RGWReadRemoteMDLogShardInfoCR(RGWMetaSyncEnv *env, const std::string& period,
+ int _shard_id, RGWMetadataLogInfo *_shard_info)
+ : RGWCoroutine(env->store->ctx()), env(env), http_op(NULL),
+ period(period), shard_id(_shard_id), shard_info(_shard_info) {}
int operate() {
+ auto store = env->store;
RGWRESTConn *conn = store->rest_master_conn;
reenter(this) {
yield {
snprintf(buf, sizeof(buf), "%d", shard_id);
rgw_http_param_pair pairs[] = { { "type" , "metadata" },
{ "id", buf },
+ { "period", period.c_str() },
{ "info" , NULL },
{ NULL, NULL } };
string p = "/admin/log/";
- http_op = new RGWRESTReadResource(conn, p, pairs, NULL, http_manager);
+ http_op = new RGWRESTReadResource(conn, p, pairs, NULL,
+ env->http_manager);
http_op->set_user_info((void *)stack);
RGWContinuousLeaseCR *lease_cr;
public:
RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
- RGWObjectCtx& _obj_ctx, uint32_t _num_shards) : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
- obj_ctx(_obj_ctx), lease_cr(NULL) {
- status.num_shards = _num_shards;
- }
+ RGWObjectCtx& _obj_ctx,
+ const rgw_meta_sync_info &status)
+ : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
+ obj_ctx(_obj_ctx), status(status), lease_cr(NULL) {}
~RGWInitSyncStatusCoroutine() {
if (lease_cr) {
set_status("fetching remote log position");
yield {
for (int i = 0; i < (int)status.num_shards; i++) {
- spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env->store, sync_env->http_manager, sync_env->async_rados, i, &shards_info[i]), false);
+ spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period,
+ i, &shards_info[i]), false);
}
}
RGWMetaSyncEnv *sync_env;
RGWMetadataLog *mdlog;
+ const std::string& period;
int shard_id;
string marker;
bool truncated;
rgw_mdlog_shard_data data;
public:
- RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env,
- int _id, const string& _marker, string *_new_marker) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- mdlog(sync_env->store->meta_mgr->get_log()),
- shard_id(_id),
- marker(_marker), truncated(false), new_marker(_new_marker),
- max_entries(CLONE_MAX_ENTRIES),
- http_op(NULL),
- req_ret(0) {
+ RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, const std::string& period,
+ int _id, const string& _marker, string *_new_marker)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ mdlog(sync_env->store->meta_mgr->get_log()), period(period),
+ shard_id(_id), marker(_marker), truncated(false), new_marker(_new_marker),
+ max_entries(CLONE_MAX_ENTRIES), http_op(NULL), req_ret(0) {
if (new_marker) {
*new_marker = marker;
}
rgw_bucket pool;
+ const std::string& period;
uint32_t shard_id;
rgw_meta_sync_marker sync_marker;
string marker;
public:
RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env,
- rgw_bucket& _pool,
+ rgw_bucket& _pool, const std::string& period,
uint32_t _shard_id, rgw_meta_sync_marker& _marker,
bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
pool(_pool),
+ period(period),
shard_id(_shard_id),
sync_marker(_marker),
marker_tracker(NULL), truncated(false), inc_lock("RGWMetaSyncShardCR::inc_lock"),
if (mdlog_marker <= max_marker) {
/* we're at the tip, try to bring more entries */
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
- yield call(new RGWCloneMetaLogCoroutine(sync_env, shard_id, mdlog_marker, &mdlog_marker));
+ yield call(new RGWCloneMetaLogCoroutine(sync_env, period, shard_id,
+ mdlog_marker, &mdlog_marker));
}
if (retcode < 0) {
ldout(sync_env->cct, 10) << *this << ": failed to fetch more log entries, retcode=" << retcode << dendl;
rgw_bucket pool;
+ const std::string& period;
uint32_t shard_id;
rgw_meta_sync_marker sync_marker;
public:
RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env,
- rgw_bucket& _pool,
- uint32_t _shard_id,
- rgw_meta_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct), sync_env(_sync_env),
- pool(_pool),
- shard_id(_shard_id),
- sync_marker(_marker), obj_ctx(sync_env->store) {
- char buf[16];
- snprintf(buf, sizeof(buf), "%d", shard_id);
- }
+ rgw_bucket& _pool, const std::string& period,
+ uint32_t _shard_id, rgw_meta_sync_marker& _marker)
+ : RGWBackoffControlCR(_sync_env->cct), sync_env(_sync_env),
+ pool(_pool), period(period), shard_id(_shard_id),
+ sync_marker(_marker), obj_ctx(sync_env->store) {}
RGWCoroutine *alloc_cr() {
- return new RGWMetaSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
+ return new RGWMetaSyncShardCR(sync_env, pool, period, shard_id,
+ sync_marker, backoff_ptr());
}
RGWCoroutine *alloc_finisher_cr() {
public:
- RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, rgw_meta_sync_status& _sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const rgw_meta_sync_status& _sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
sync_status(_sync_status) {
}
RGWMetaSyncShardControlCR *shard_cr = new RGWMetaSyncShardControlCR(sync_env,
sync_env->store->get_zone_params().log_pool,
- shard_id,
+ sync_status.sync_info.period, shard_id,
sync_status.sync_markers[shard_id]);
RGWObjectCtx obj_ctx(store, NULL);
return run(new RGWInitSyncStatusCoroutine(&sync_env, obj_ctx,
- sync_status.sync_info.num_shards));
+ sync_status.sync_info));
}
int RGWRemoteMetaLog::store_sync_info()
if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
r = run(new RGWInitSyncStatusCoroutine(&sync_env, obj_ctx,
- sync_status.sync_info.num_shards));
+ sync_status.sync_info));
if (r == -EBUSY) {
backoff.backoff_sleep();
continue;
rgw_http_param_pair pairs[] = { { "type", "metadata" },
{ "id", buf },
+ { "period", period.c_str() },
{ "max-entries", max_entries_buf },
{ marker_key, marker.c_str() },
{ NULL, NULL } };