return r;
}
- global_status_obj = rgw_obj(store->get_zone_params().log_pool, mdlog_sync_status_oid);
+ global_status_obj = rgw_obj(store->get_zone_params().log_pool, status_oid());
r = master_log.init();
if (r < 0) {
return 0;
}
+string RGWMetaSyncStatusManager::status_oid()
+{
+ return mdlog_sync_status_oid;
+}
+
string RGWMetaSyncStatusManager::shard_obj_name(int shard_id)
{
char buf[mdlog_sync_status_shard_prefix.size() + 16];
};
class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
+ RGWMetaSyncEnv *sync_env;
int shard_id;
string marker;
string *pmarker;
RGWAsyncReadMDLogEntries *req;
public:
- RGWReadMDLogEntriesCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env,
int _shard_id, string*_marker, int _max_entries,
- list<cls_log_entry> *_entries, bool *_truncated) : RGWSimpleCoroutine(_store->ctx()),
- async_rados(_async_rados),
- store(_store),
+ list<cls_log_entry> *_entries, bool *_truncated) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
entries(_entries), truncated(_truncated) {
}
int send_request() {
marker = *pmarker;
req = new RGWAsyncReadMDLogEntries(stack->create_completion_notifier(),
- store, shard_id, &marker, max_entries, entries, truncated);
- async_rados->queue(req);
+ sync_env->store, shard_id, &marker, max_entries, entries, truncated);
+ sync_env->async_rados->queue(req);
return 0;
}
};
class RGWInitSyncStatusCoroutine : public RGWCoroutine {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
- RGWHTTPManager *http_manager;
+ RGWMetaSyncEnv *sync_env;
RGWObjectCtx& obj_ctx;
rgw_meta_sync_info status;
map<int, RGWMetadataLogInfo> shards_info;
RGWContinuousLeaseCR *lease_cr;
public:
- RGWInitSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr,
- RGWObjectCtx& _obj_ctx, uint32_t _num_shards) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
- http_manager(_http_mgr),
+ RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
+ RGWObjectCtx& _obj_ctx, uint32_t _num_shards) : RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
obj_ctx(_obj_ctx), lease_cr(NULL) {
status.num_shards = _num_shards;
}
yield {
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
- lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid,
+ RGWRados *store = sync_env->store;
+ lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid,
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
yield;
}
yield {
- call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+ RGWRados *store = sync_env->store;
+ call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
mdlog_sync_status_oid, status));
}
/* fetch current position in logs */
yield {
for (int i = 0; i < (int)status.num_shards; i++) {
- spawn(new RGWReadRemoteMDLogShardInfoCR(store, http_manager, async_rados, i, &shards_info[i]), false);
+ spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env->store, sync_env->http_manager, sync_env->async_rados, i, &shards_info[i]), false);
}
}
for (int i = 0; i < (int)status.num_shards; i++) {
rgw_meta_sync_marker marker;
marker.next_step_marker = shards_info[i].marker;
- spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+ RGWRados *store = sync_env->store;
+ spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
RGWMetaSyncStatusManager::shard_obj_name(i), marker), true);
}
}
yield {
status.state = rgw_meta_sync_info::StateBuildingFullSyncMaps;
- call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+ RGWRados *store = sync_env->store;
+ call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados, store, store->get_zone_params().log_pool,
mdlog_sync_status_oid, status));
}
yield lease_cr->go_down();
};
class RGWReadSyncStatusCoroutine : public RGWSimpleRadosReadCR<rgw_meta_sync_info> {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
+ RGWMetaSyncEnv *sync_env;
RGWObjectCtx& obj_ctx;
rgw_meta_sync_status *sync_status;
public:
- RGWReadSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWReadSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
RGWObjectCtx& _obj_ctx,
- rgw_meta_sync_status *_status) : RGWSimpleRadosReadCR(_async_rados, _store, _obj_ctx,
- _store->get_zone_params().log_pool,
+ rgw_meta_sync_status *_status) : RGWSimpleRadosReadCR(_sync_env->async_rados, _sync_env->store, _obj_ctx,
+ _sync_env->store->get_zone_params().log_pool,
mdlog_sync_status_oid,
&_status->sync_info),
- async_rados(_async_rados), store(_store),
+ sync_env(_sync_env),
obj_ctx(_obj_ctx),
sync_status(_status) {}
return retcode;
}
+ RGWRados *store = sync_env->store;
map<uint32_t, rgw_meta_sync_marker>& markers = sync_status->sync_markers;
for (int i = 0; i < (int)data.num_shards; i++) {
- spawn(new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ spawn(new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store, obj_ctx, store->get_zone_params().log_pool,
RGWMetaSyncStatusManager::shard_obj_name(i), &markers[i]), true);
}
return 0;
}
class RGWFetchAllMetaCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
+ RGWMetaSyncEnv *sync_env;
int num_shards;
map<uint32_t, rgw_meta_sync_marker>& markers;
public:
- RGWFetchAllMetaCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, int _num_shards,
- map<uint32_t, rgw_meta_sync_marker>& _markers) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
+ RGWFetchAllMetaCR(RGWMetaSyncEnv *_sync_env, int _num_shards,
+ map<uint32_t, rgw_meta_sync_marker>& _markers) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
num_shards(_num_shards),
req_ret(0), entries_index(NULL), lease_cr(NULL), lost_lock(false), markers(_markers) {
}
}
int operate() {
- RGWRESTConn *conn = store->rest_master_conn;
+ RGWRESTConn *conn = sync_env->conn;
reenter(this) {
yield {
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
- lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool, mdlog_sync_status_oid,
+ lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, sync_env->store, sync_env->store->get_zone_params().log_pool, mdlog_sync_status_oid,
lock_name, lock_duration, this);
lease_cr->get();
spawn(lease_cr, false);
set_sleeping(true);
yield;
}
- entries_index = new RGWShardedOmapCRManager(async_rados, store, this, num_shards,
- store->get_zone_params().log_pool, mdlog_sync_full_sync_index_prefix);
+ entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, sync_env->store, this, num_shards,
+ sync_env->store->get_zone_params().log_pool,
+ mdlog_sync_full_sync_index_prefix);
yield {
- call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), conn, http_manager,
+ call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
"/admin/metadata", NULL, §ions));
}
if (get_ret_status() < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata sections" << dendl;
+ ldout(cct, 0) << "ERROR: failed to fetch metadata sections" << dendl;
return set_state(RGWCoroutine_Error);
}
rearrange_sections();
yield {
string entrypoint = string("/admin/metadata/") + *sections_iter;
#warning need a better scaling solution here, requires streaming output
- call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), conn, http_manager,
+ call(new RGWReadRESTResourceCR<list<string> >(cct, conn, sync_env->http_manager,
entrypoint, NULL, &result));
}
yield {
if (get_ret_status() < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata section: " << *sections_iter << dendl;
+ ldout(cct, 0) << "ERROR: failed to fetch metadata section: " << *sections_iter << dendl;
return set_state(RGWCoroutine_Error);
}
for (list<string>::iterator iter = result.begin(); iter != result.end(); ++iter) {
lost_lock = true;
break;
}
- ldout(store->ctx(), 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
+ ldout(cct, 20) << "list metadata: section=" << *sections_iter << " key=" << *iter << dendl;
string s = *sections_iter + ":" + *iter;
entries_index->append(s);
#warning error handling of shards
int shard_id = (int)iter->first;
rgw_meta_sync_marker& marker = iter->second;
marker.total_entries = entries_index->get_total_entries(shard_id);
- spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+ spawn(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, sync_env->store, sync_env->store->get_zone_params().log_pool,
RGWMetaSyncStatusManager::shard_obj_name(shard_id), marker), true);
}
}
class RGWReadRemoteMetadataCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
+ RGWMetaSyncEnv *sync_env;
RGWRESTReadResource *http_op;
bufferlist *pbl;
public:
- RGWReadRemoteMetadataCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- const string& _section, const string& _key, bufferlist *_pbl) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
+ RGWReadRemoteMetadataCR(RGWMetaSyncEnv *_sync_env,
+ const string& _section, const string& _key, bufferlist *_pbl) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
http_op(NULL),
section(_section),
key(_key),
}
int operate() {
- RGWRESTConn *conn = store->rest_master_conn;
+ RGWRESTConn *conn = sync_env->conn;
reenter(this) {
yield {
rgw_http_param_pair pairs[] = { { "key" , key.c_str()},
string p = string("/admin/metadata/") + section + "/" + key;
- http_op = new RGWRESTReadResource(conn, p, pairs, NULL, http_manager);
+ http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
http_op->set_user_info((void *)stack);
int ret = http_op->aio_read();
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog data" << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to fetch mdlog data" << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
http_op->put();
return set_cr_error(ret);
class RGWMetaStoreEntryCR : public RGWSimpleCoroutine {
- RGWAsyncRadosProcessor *async_rados;
- RGWRados *store;
+ RGWMetaSyncEnv *sync_env;
string raw_key;
bufferlist bl;
RGWAsyncMetaStoreEntry *req;
public:
- RGWMetaStoreEntryCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store,
+ RGWMetaStoreEntryCR(RGWMetaSyncEnv *_sync_env,
const string& _raw_key,
- bufferlist& _bl) : RGWSimpleCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+ bufferlist& _bl) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
raw_key(_raw_key), bl(_bl), req(NULL) {
}
int send_request() {
req = new RGWAsyncMetaStoreEntry(stack->create_completion_notifier(),
- store, raw_key, bl);
- async_rados->queue(req);
+ sync_env->store, raw_key, bl);
+ sync_env->async_rados->queue(req);
return 0;
}
#define META_SYNC_UPDATE_MARKER_WINDOW 10
class RGWMetaSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string> {
- RGWRados *store;
- RGWAsyncRadosProcessor *async_rados;
+ RGWMetaSyncEnv *sync_env;
string marker_oid;
rgw_meta_sync_marker sync_marker;
public:
- RGWMetaSyncShardMarkerTrack(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWMetaSyncShardMarkerTrack(RGWMetaSyncEnv *_sync_env,
const string& _marker_oid,
const rgw_meta_sync_marker& _marker) : RGWSyncShardMarkerTrack(META_SYNC_UPDATE_MARKER_WINDOW),
- store(_store),
- async_rados(_async_rados),
+ sync_env(_sync_env),
marker_oid(_marker_oid),
sync_marker(_marker) {}
RGWCoroutine *store_marker(const string& new_marker) {
sync_marker.marker = new_marker;
- ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
- return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+ ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
+ RGWRados *store = sync_env->store;
+ return new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
marker_oid, sync_marker);
}
};
class RGWMetaSyncSingleEntryCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
+ RGWMetaSyncEnv *sync_env;
string raw_key;
string entry_marker;
RGWMetaSyncShardMarkerTrack *marker_tracker;
public:
- RGWMetaSyncSingleEntryCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
- const string& _raw_key, const string& _entry_marker, RGWMetaSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
+ RGWMetaSyncSingleEntryCR(RGWMetaSyncEnv *_sync_env,
+ const string& _raw_key, const string& _entry_marker, RGWMetaSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_sync_env->cct),
+ sync_env(_sync_env),
raw_key(_raw_key), entry_marker(_entry_marker),
pos(0), sync_status(0),
marker_tracker(_marker_tracker) {
pos = raw_key.find(':');
section = raw_key.substr(0, pos);
key = raw_key.substr(pos + 1);
- sync_status = call(new RGWReadRemoteMetadataCR(store, http_manager, async_rados, section, key, &md_bl));
+ sync_status = call(new RGWReadRemoteMetadataCR(sync_env, section, key, &md_bl));
}
if (sync_status < 0) {
return set_cr_error(sync_status);
}
- yield call(new RGWMetaStoreEntryCR(async_rados, store, raw_key, md_bl));
+ yield call(new RGWMetaStoreEntryCR(sync_env, raw_key, md_bl));
sync_status = retcode;
yield {
/* update marker */
int ret = call(marker_tracker->finish(entry_marker));
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl;
return set_cr_error(sync_status);
}
}
};
class RGWCloneMetaLogCoroutine : public RGWCoroutine {
- RGWRados *store;
+ RGWMetaSyncEnv *sync_env;
RGWMetadataLog *mdlog;
- RGWHTTPManager *http_manager;
int shard_id;
string marker;
rgw_mdlog_shard_data data;
public:
- RGWCloneMetaLogCoroutine(RGWRados *_store, RGWHTTPManager *_mgr,
- int _id, const string& _marker, string *_new_marker) : RGWCoroutine(_store->ctx()), store(_store),
- mdlog(store->meta_mgr->get_log()),
- http_manager(_mgr), shard_id(_id),
+ RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env,
+ int _id, const string& _marker, string *_new_marker) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ mdlog(sync_env->store->meta_mgr->get_log()),
+ shard_id(_id),
marker(_marker), truncated(false), new_marker(_new_marker),
max_entries(CLONE_MAX_ENTRIES),
http_op(NULL), md_op_notifier(NULL),
};
class RGWMetaSyncShardCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
+ RGWMetaSyncEnv *sync_env;
rgw_bucket pool;
bool *reset_backoff;
public:
- RGWMetaSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env,
rgw_bucket& _pool,
uint32_t _shard_id, rgw_meta_sync_marker& _marker,
- bool *_reset_backoff) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
+ bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
pool(_pool),
shard_id(_shard_id),
sync_marker(_marker),
case rgw_meta_sync_marker::FullSync:
r = full_sync();
if (r < 0) {
- ldout(store->ctx(), 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
+ ldout(sync_env->cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
return set_cr_error(r);
}
return 0;
case rgw_meta_sync_marker::IncrementalSync:
r = incremental_sync();
if (r < 0) {
- ldout(store->ctx(), 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
+ ldout(sync_env->cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
return set_cr_error(r);
}
return 0;
if (lease_cr) {
lease_cr->put();
}
- lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool,
+ RGWRados *store = sync_env->store;
+ lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
RGWMetaSyncStatusManager::shard_obj_name(shard_id),
lock_name, lock_duration, this);
lease_cr->get();
*reset_backoff = true;
/* prepare marker tracker */
- set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados,
+ set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
RGWMetaSyncStatusManager::shard_obj_name(shard_id),
sync_marker));
/* sync! */
lost_lock = true;
break;
}
- yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries));
+ yield return call(new RGWRadosGetOmapKeysCR(sync_env->store, pool, oid, sync_marker.marker, &entries, max_entries));
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
return retcode;
}
iter = entries.begin();
for (; iter != entries.end(); ++iter) {
- ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
marker_tracker->start(iter->first);
// fetch remote and write locally
- yield spawn(new RGWMetaSyncSingleEntryCR(store, http_manager, async_rados, iter->first, iter->first, marker_tracker), false);
+ yield spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false);
if (retcode < 0) {
return retcode;
}
sync_marker.state = rgw_meta_sync_marker::IncrementalSync;
sync_marker.marker = sync_marker.next_step_marker;
sync_marker.next_step_marker.clear();
- call(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(async_rados, store, store->get_zone_params().log_pool,
+ RGWRados *store = sync_env->store;
+ call(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker));
}
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
return retcode;
}
}
yield {
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
- lease_cr = new RGWContinuousLeaseCR(async_rados, store, store->get_zone_params().log_pool,
+ RGWRados *store = sync_env->store;
+ lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
RGWMetaSyncStatusManager::shard_obj_name(shard_id),
lock_name, lock_duration, this);
lease_cr->get();
*reset_backoff = true;
}
mdlog_marker = sync_marker.marker;
- set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados,
+ set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
RGWMetaSyncStatusManager::shard_obj_name(shard_id),
sync_marker));
/* inc sync */
break;
}
#define INCREMENTAL_MAX_ENTRIES 100
- ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (mdlog_marker <= sync_marker.marker) {
/* we're at the tip, try to bring more entries */
- ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
- yield call(new RGWCloneMetaLogCoroutine(store, http_manager, shard_id, mdlog_marker, &mdlog_marker));
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
+ yield call(new RGWCloneMetaLogCoroutine(sync_env, shard_id, mdlog_marker, &mdlog_marker));
}
- ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (mdlog_marker > sync_marker.marker) {
- yield call(new RGWReadMDLogEntriesCR(async_rados, store, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
+ yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
- ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
marker_tracker->start(log_iter->id);
raw_key = log_iter->section + ":" + log_iter->name;
- yield spawn(new RGWMetaSyncSingleEntryCR(store, http_manager, async_rados, raw_key, log_iter->id, marker_tracker), false);
+ yield spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
if (retcode < 0) {
return retcode;
}
}
}
- ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (mdlog_marker == sync_marker.marker) {
#define INCREMENTAL_INTERVAL 20
yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
}
};
-class RGWMetaSyncShardControlCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
+class RGWMetaSyncShardControlCR : public RGWCoroutine
+{
+ RGWMetaSyncEnv *sync_env;
rgw_bucket pool;
bool reset_backoff;
public:
- RGWMetaSyncShardControlCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env,
rgw_bucket& _pool,
- uint32_t _shard_id, rgw_meta_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
+ uint32_t _shard_id, rgw_meta_sync_marker& _marker) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
pool(_pool),
shard_id(_shard_id),
- sync_marker(_marker), obj_ctx(store), reset_backoff(false) {
+ sync_marker(_marker), obj_ctx(sync_env->store), reset_backoff(false) {
}
int operate() {
reenter(this) {
while (true) {
yield {
- call(new RGWMetaSyncShardCR(store, http_manager, async_rados, pool, shard_id, sync_marker, &reset_backoff));
+ call(new RGWMetaSyncShardCR(sync_env, pool, shard_id, sync_marker, &reset_backoff));
}
if (retcode < 0 && retcode != -EBUSY) {
- ldout(store->ctx(), 0) << "ERROR: RGWMetaSyncShardCR() returned " << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: RGWMetaSyncShardCR() returned " << retcode << dendl;
return set_cr_error(retcode);
}
if (reset_backoff) {
}
yield backoff.backoff(this);
yield {
- call(new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ RGWRados *store = sync_env->store;
+ call(new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store, obj_ctx, store->get_zone_params().log_pool,
RGWMetaSyncStatusManager::shard_obj_name(shard_id), &sync_marker));
}
if (retcode < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to read sync state for metadata shard id=" << shard_id << " retcode=" << retcode << dendl;
+ ldout(sync_env->cct, 0) << "ERROR: failed to read sync state for metadata shard id=" << shard_id << " retcode=" << retcode << dendl;
return set_cr_error(retcode);
}
}
};
class RGWMetaSyncCR : public RGWCoroutine {
- RGWRados *store;
- RGWHTTPManager *http_manager;
- RGWAsyncRadosProcessor *async_rados;
+ RGWMetaSyncEnv *sync_env;
rgw_meta_sync_status sync_status;
public:
- RGWMetaSyncCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, rgw_meta_sync_status& _sync_status) : RGWCoroutine(_store->ctx()), store(_store),
- http_manager(_mgr),
- async_rados(_async_rados),
+ RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, rgw_meta_sync_status& _sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
sync_status(_sync_status) {
}
uint32_t shard_id = iter->first;
rgw_meta_sync_marker marker;
- RGWMetaSyncShardControlCR *shard_cr = new RGWMetaSyncShardControlCR(store, http_manager, async_rados, store->get_zone_params().log_pool,
+ RGWMetaSyncShardControlCR *shard_cr = new RGWMetaSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
shard_id,
sync_status.sync_markers[shard_id]);
int RGWRemoteMetaLog::clone_shards(int num_shards, vector<string>& clone_markers)
{
+ RGWMetaSyncEnv sync_env;
+
+ init_sync_env(&sync_env);
+
list<RGWCoroutinesStack *> stacks;
for (int i = 0; i < (int)num_shards; i++) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
- int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i], NULL));
+ int r = stack->call(new RGWCloneMetaLogCoroutine(&sync_env, i, clone_markers[i], NULL));
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: stack->call() returned r=" << r << dendl;
return r;
int RGWRemoteMetaLog::fetch(int num_shards, vector<string>& clone_markers)
{
+ RGWMetaSyncEnv sync_env;
+
+ init_sync_env(&sync_env);
+
list<RGWCoroutinesStack *> stacks;
for (int i = 0; i < (int)num_shards; i++) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
- int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i], NULL));
+ int r = stack->call(new RGWCloneMetaLogCoroutine(&sync_env, i, clone_markers[i], NULL));
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: stack->call() returned r=" << r << dendl;
return r;
return 0;
}
+ RGWMetaSyncEnv sync_env;
+
+ init_sync_env(&sync_env);
+
RGWObjectCtx obj_ctx(store, NULL);
- return run(new RGWReadSyncStatusCoroutine(async_rados, store, obj_ctx, sync_status));
+ return run(new RGWReadSyncStatusCoroutine(&sync_env, obj_ctx, sync_status));
}
int RGWRemoteMetaLog::init_sync_status(int num_shards)
return 0;
}
+ RGWMetaSyncEnv sync_env;
+
+ init_sync_env(&sync_env);
+
RGWObjectCtx obj_ctx(store, NULL);
- return run(new RGWInitSyncStatusCoroutine(async_rados, store, &http_manager, obj_ctx, num_shards));
+ return run(new RGWInitSyncStatusCoroutine(&sync_env, obj_ctx, num_shards));
}
int RGWRemoteMetaLog::set_sync_info(const rgw_meta_sync_info& sync_info)
{
return run(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(async_rados, store, store->get_zone_params().log_pool,
- mdlog_sync_status_oid, sync_info));
+ status_manager->status_oid(), sync_info));
}
int RGWRemoteMetaLog::run_sync(int num_shards, rgw_meta_sync_status& sync_status)
return 0;
}
+ RGWMetaSyncEnv sync_env;
+
+ init_sync_env(&sync_env);
+
RGWObjectCtx obj_ctx(store, NULL);
int r;
do {
- r = run(new RGWReadSyncStatusCoroutine(async_rados, store, obj_ctx, &sync_status));
+ r = run(new RGWReadSyncStatusCoroutine(&sync_env, obj_ctx, &sync_status));
if (r < 0 && r != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
return r;
if (sync_status.sync_info.state == rgw_meta_sync_info::StateInit) {
ldout(store->ctx(), 20) << __func__ << "(): init" << dendl;
- r = run(new RGWInitSyncStatusCoroutine(async_rados, store, &http_manager, obj_ctx, num_shards));
+ r = run(new RGWInitSyncStatusCoroutine(&sync_env, obj_ctx, num_shards));
if (r == -EBUSY) {
backoff.backoff_sleep();
continue;
} while (sync_status.sync_info.state == rgw_meta_sync_info::StateInit);
do {
- r = run(new RGWReadSyncStatusCoroutine(async_rados, store, obj_ctx, &sync_status));
+ r = run(new RGWReadSyncStatusCoroutine(&sync_env, obj_ctx, &sync_status));
if (r < 0 && r != -ENOENT) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status r=" << r << dendl;
return r;
switch ((rgw_meta_sync_info::SyncState)sync_status.sync_info.state) {
case rgw_meta_sync_info::StateBuildingFullSyncMaps:
ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl;
- r = run(new RGWFetchAllMetaCR(store, &http_manager, async_rados, num_shards, sync_status.sync_markers));
+ r = run(new RGWFetchAllMetaCR(&sync_env, num_shards, sync_status.sync_markers));
if (r == -EBUSY) {
backoff.backoff_sleep();
continue;
/* fall through */
case rgw_meta_sync_info::StateSync:
ldout(store->ctx(), 20) << __func__ << "(): sync" << dendl;
- meta_sync_cr = new RGWMetaSyncCR(store, &http_manager, async_rados, sync_status);
+ meta_sync_cr = new RGWMetaSyncCR(&sync_env, sync_status);
r = run(meta_sync_cr);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to fetch all metadata keys" << dendl;
reenter(this) {
do {
yield {
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
+ ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": init request" << dendl;
return state_init();
}
yield {
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
+ ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status" << dendl;
return state_read_shard_status();
}
yield {
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
+ ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": reading shard status complete" << dendl;
return state_read_shard_status_complete();
}
yield {
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
+ ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": sending rest request" << dendl;
return state_send_rest_request();
}
yield {
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
+ ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": receiving rest response" << dendl;
return state_receive_rest_response();
}
yield {
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
+ ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries" << dendl;
return state_store_mdlog_entries();
}
} while (truncated);
yield {
- ldout(store->ctx(), 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
+ ldout(cct, 20) << __func__ << ": shard_id=" << shard_id << ": storing mdlog entries complete" << dendl;
return state_store_mdlog_entries_complete();
}
}
{
int ret = mdlog->get_info_async(shard_id, &shard_info, stack->get_completion_mgr(), (void *)stack, &req_ret);
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
+ ldout(cct, 0) << "ERROR: mdlog->get_info_async() returned ret=" << ret << dendl;
return set_cr_error(ret);
}
int RGWCloneMetaLogCoroutine::state_read_shard_status_complete()
{
- ldout(store->ctx(), 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
+ ldout(cct, 20) << "shard_id=" << shard_id << " marker=" << shard_info.marker << " last_update=" << shard_info.last_update << dendl;
marker = shard_info.marker;
int RGWCloneMetaLogCoroutine::state_send_rest_request()
{
- RGWRESTConn *conn = store->rest_master_conn;
+ RGWRESTConn *conn = sync_env->conn;
char buf[32];
snprintf(buf, sizeof(buf), "%d", shard_id);
{ marker_key, marker.c_str() },
{ NULL, NULL } };
- http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, http_manager);
+ http_op = new RGWRESTReadResource(conn, "/admin/log", pairs, NULL, sync_env->http_manager);
http_op->set_user_info((void *)stack);
int ret = http_op->aio_read();
if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog data" << dendl;
+ ldout(cct, 0) << "ERROR: failed to fetch mdlog data" << dendl;
log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
http_op->put();
return ret;
int ret = http_op->wait(&data);
if (ret < 0) {
error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
- ldout(store->ctx(), 0) << "ERROR: failed to wait for op, ret=" << ret << dendl;
+ ldout(cct, 0) << "ERROR: failed to wait for op, ret=" << ret << dendl;
http_op->put();
return set_cr_error(ret);
}
http_op->put();
- ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
+ ldout(cct, 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
truncated = ((int)data.entries.size() == max_entries);
vector<rgw_mdlog_entry>::iterator iter;
for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
rgw_mdlog_entry& entry = *iter;
- ldout(store->ctx(), 20) << "entry: name=" << entry.name << dendl;
+ ldout(cct, 20) << "entry: name=" << entry.name << dendl;
cls_log_entry dest_entry;
dest_entry.id = entry.id;
RGWAioCompletionNotifier *cn = stack->create_completion_notifier();
+ RGWRados *store = sync_env->store;
int ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id, cn->completion());
if (ret < 0) {
cn->put();
- ldout(store->ctx(), 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
+ ldout(cct, 10) << "failed to store md log entries shard_id=" << shard_id << " ret=" << ret << dendl;
return set_cr_error(ret);
}
return io_block(0);