status.push_back("data is caught up with master");
} else {
push_ss(ss, status, tab) << "data is behind on " << total_behind << " shards";
+
+ map<int, rgw_datalog_shard_data> master_pos;
+ ret = sync.read_source_log_shards_next(shards_behind, &master_pos);
+ if (ret < 0) {
+ derr << "ERROR: failed to fetch next positions (" << cpp_strerror(-ret) << ")" << dendl;
+ } else {
+ utime_t oldest;
+ for (auto iter : master_pos) {
+ rgw_datalog_shard_data& shard_data = iter.second;
+
+ if (!shard_data.entries.empty()) {
+ rgw_datalog_entry& entry = shard_data.entries.front();
+ if (oldest.is_zero()) {
+ oldest = entry.timestamp;
+ } else if (!entry.timestamp.is_zero() && entry.timestamp < oldest) {
+ oldest = entry.timestamp;
+ }
+ }
+ }
+
+ if (!oldest.is_zero()) {
+ push_ss(ss, status, tab) << "oldest change not applied: " << oldest;
+ }
+ }
}
flush_ss(ss, status);
JSONDecoder::decode_json("num_objects", num_shards, obj);
}
-struct rgw_datalog_entry {
- string key;
- utime_t timestamp;
-
- void decode_json(JSONObj *obj);
-};
-
-struct rgw_datalog_shard_data {
- string marker;
- bool truncated;
- vector<rgw_datalog_entry> entries;
-
- void decode_json(JSONObj *obj);
-};
-
void rgw_datalog_entry::decode_json(JSONObj *obj) {
JSONDecoder::decode_json("key", key, obj);
return true;
}
+class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
+ RGWDataSyncEnv *sync_env;
+ RGWRESTReadResource *http_op;
+
+ int shard_id;
+ string marker;
+ uint32_t max_entries;
+ rgw_datalog_shard_data *result;
+
+public:
+ RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
+ const string& _marker, uint32_t _max_entries,
+ rgw_datalog_shard_data *_result)
+ : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
+ shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
+
+ int send_request() {
+ RGWRESTConn *conn = sync_env->conn;
+ RGWRados *store = sync_env->store;
+
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%d", shard_id);
+
+ char max_entries_buf[32];
+ snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
+
+ const char *marker_key = (marker.empty() ? "" : "marker");
+
+ rgw_http_param_pair pairs[] = { { "type", "data" },
+ { "id", buf },
+ { "max-entries", max_entries_buf },
+ { marker_key, marker.c_str() },
+ { NULL, NULL } };
+
+ string p = "/admin/log/";
+
+ http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
+ http_op->set_user_info((void *)stack);
+
+ int ret = http_op->aio_read();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
+ log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
+ http_op->put();
+ return ret;
+ }
+
+ return 0;
+ }
+
+ int request_complete() {
+ int ret = http_op->wait(result);
+ if (ret < 0 && ret != -ENOENT) {
+ ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
+ return ret;
+ }
+ return 0;
+ }
+};
+
+class RGWListRemoteDataLogCR : public RGWShardCollectCR {
+ RGWDataSyncEnv *sync_env;
+
+ map<int, string> shards;
+ int max_entries_per_shard;
+ map<int, rgw_datalog_shard_data> *result;
+
+ map<int, string>::iterator iter;
+#define READ_DATALOG_MAX_CONCURRENT 10
+
+public:
+ RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
+ map<int, string>& _shards,
+ int _max_entries_per_shard,
+ map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
+ sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
+ result(_result) {
+ shards.swap(_shards);
+ iter = shards.begin();
+ }
+ bool spawn_next();
+};
+
+bool RGWListRemoteDataLogCR::spawn_next() {
+ if (iter == shards.end()) {
+ return false;
+ }
+
+ spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
+ ++iter;
+ return true;
+}
+
class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
}
+int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
+{
+ if (store->is_meta_master()) {
+ return 0;
+ }
+
+ return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
+}
+
int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger)
{
if (initialized) {
};
WRITE_CLASS_ENCODER(rgw_data_sync_status)
+struct rgw_datalog_entry {
+ string key;
+ utime_t timestamp;
+
+ void decode_json(JSONObj *obj);
+};
+
+struct rgw_datalog_shard_data {
+ string marker;
+ bool truncated;
+ vector<rgw_datalog_entry> entries;
+
+ void decode_json(JSONObj *obj);
+};
+
class RGWAsyncRadosProcessor;
class RGWDataSyncStatusManager;
class RGWDataSyncControlCR;
int read_log_info(rgw_datalog_info *log_info);
int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
+ int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result);
int get_shard_info(int shard_id);
int read_sync_status(rgw_data_sync_status *sync_status);
int init_sync_status(int num_shards);
int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
return source_log.read_source_log_shards_info(shards_info);
}
+ int read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result) {
+ return source_log.read_source_log_shards_next(shard_markers, result);
+ }
int run() { return source_log.run_sync(num_shards, sync_status); }