RGWRados *store;
RGWHTTPManager *http_manager;
RGWAsyncRadosProcessor *async_rados;
+ RGWRESTConn *conn;
RGWRESTReadResource *http_op;
RGWDataChangesLogInfo *shard_info;
public:
- RGWReadRemoteDataLogShardInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWReadRemoteDataLogShardInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn,
int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_store->ctx()), store(_store),
http_manager(_mgr),
async_rados(_async_rados),
+ conn(_conn),
http_op(NULL),
shard_id(_shard_id),
shard_info(_shard_info) {
}
int operate() {
- RGWRESTConn *conn = store->rest_master_conn;
reenter(this) {
yield {
char buf[16];
RGWRados *store;
RGWHTTPManager *http_manager;
RGWAsyncRadosProcessor *async_rados;
+ RGWRESTConn *conn;
RGWRESTReadResource *http_op;
read_remote_data_log_response response;
public:
- RGWReadRemoteDataLogShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+ RGWReadRemoteDataLogShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn,
int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_store->ctx()), store(_store),
http_manager(_mgr),
async_rados(_async_rados),
+ conn(_conn),
http_op(NULL),
shard_id(_shard_id),
pmarker(_pmarker),
}
/* fetch current position in logs */
yield {
+ RGWRESTConn *conn = store->get_zone_conn_by_id(source_zone);
+ if (!conn) {
+ ldout(cct, 0) << "ERROR: connection to zone " << source_zone << " does not exist!" << dendl;
+ return set_cr_error(-EIO);
+ }
for (int i = 0; i < (int)status.num_shards; i++) {
- spawn(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, i, &shards_info[i]), true);
+ spawn(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, conn, i, &shards_info[i]), true);
}
}
while (collect(&ret)) {
delete async_rados;
}
-int RGWRemoteDataLog::list_shards(int num_shards)
-{
- for (int i = 0; i < (int)num_shards; i++) {
- int ret = list_shard(i);
- if (ret < 0) {
- ldout(store->ctx(), 10) << "failed to list shard: ret=" << ret << dendl;
- }
- }
-
- return 0;
-}
-
-int RGWRemoteDataLog::list_shard(int shard_id)
-{
- conn = store->rest_master_conn;
-
- char buf[32];
- snprintf(buf, sizeof(buf), "%d", shard_id);
-
- rgw_http_param_pair pairs[] = { { "type", "data" },
- { "id", buf },
- { NULL, NULL } };
-
- rgw_datalog_shard_data data;
- int ret = conn->get_json_resource("/admin/log", pairs, data);
- if (ret < 0) {
- ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog data" << dendl;
- return ret;
- }
-
- ldout(store->ctx(), 20) << "remote datalog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
-
- vector<rgw_datalog_entry>::iterator iter;
- for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
- rgw_datalog_entry& entry = *iter;
- ldout(store->ctx(), 20) << "entry: key=" << entry.key << dendl;
- }
-
- return 0;
-}
-
int RGWRemoteDataLog::get_shard_info(int shard_id)
{
- conn = store->rest_master_conn;
-
char buf[32];
snprintf(buf, sizeof(buf), "%d", shard_id);
}
yield {
- int ret = call(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, shard_id, &shard_info));
+ int ret = call(new RGWReadRemoteDataLogShardInfoCR(store, http_manager, async_rados, conn, shard_id, &shard_info));
if (ret < 0) {
ldout(store->ctx(), 0) << "ERROR: failed to call RGWReadRemoteDataLogShardInfoCR() ret=" << ret << dendl;
return set_cr_error(ret);
#define INCREMENTAL_MAX_ENTRIES 100
ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " datalog_marker=" << datalog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (datalog_marker > sync_marker.marker) {
- yield call(new RGWReadRemoteDataLogShardCR(store, http_manager, async_rados, shard_id, &sync_marker.marker, &log_entries, &truncated));
+ yield call(new RGWReadRemoteDataLogShardCR(store, http_manager, async_rados, conn, shard_id, &sync_marker.marker, &log_entries, &truncated));
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
if (!marker_tracker->index_key_to_marker(log_iter->log_id, log_iter->entry.key)) {