From: Yehuda Sadeh Date: Thu, 10 Mar 2016 00:01:51 +0000 (-0800) Subject: rgw_admin: report oldest data not synced X-Git-Tag: v10.1.0~140^2~3 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=453bc556b6ad709aa8c35939290e2f313e91e1bc;p=ceph.git rgw_admin: report oldest data not synced in radosgw-admin sync status Signed-off-by: Yehuda Sadeh --- diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 3e933a647b85..bce9f5859afd 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -1832,6 +1832,30 @@ static void get_data_sync_status(const string& source_zone, list& status status.push_back("data is caught up with master"); } else { push_ss(ss, status, tab) << "data is behind on " << total_behind << " shards"; + + map master_pos; + ret = sync.read_source_log_shards_next(shards_behind, &master_pos); + if (ret < 0) { + derr << "ERROR: failed to fetch next positions (" << cpp_strerror(-ret) << ")" << dendl; + } else { + utime_t oldest; + for (auto iter : master_pos) { + rgw_datalog_shard_data& shard_data = iter.second; + + if (!shard_data.entries.empty()) { + rgw_datalog_entry& entry = shard_data.entries.front(); + if (oldest.is_zero()) { + oldest = entry.timestamp; + } else if (!entry.timestamp.is_zero() && entry.timestamp < oldest) { + oldest = entry.timestamp; + } + } + } + + if (!oldest.is_zero()) { + push_ss(ss, status, tab) << "oldest change not applied: " << oldest; + } + } } flush_ss(ss, status); diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index e922e629bf8e..314cd682c99c 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -30,21 +30,6 @@ void rgw_datalog_info::decode_json(JSONObj *obj) { JSONDecoder::decode_json("num_objects", num_shards, obj); } -struct rgw_datalog_entry { - string key; - utime_t timestamp; - - void decode_json(JSONObj *obj); -}; - -struct rgw_datalog_shard_data { - string marker; - bool truncated; - vector entries; - - void decode_json(JSONObj *obj); -}; - void rgw_datalog_entry::decode_json(JSONObj *obj) { JSONDecoder::decode_json("key", key, obj); @@ -261,6 +246,99 @@ bool RGWReadRemoteDataLogInfoCR::spawn_next() { return true; } +class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine { + RGWDataSyncEnv *sync_env; + RGWRESTReadResource *http_op; + + int shard_id; + string marker; + uint32_t max_entries; + rgw_datalog_shard_data *result; + +public: + RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id, + const string& _marker, uint32_t _max_entries, + rgw_datalog_shard_data *_result) + : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL), + shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {} + + int send_request() { + RGWRESTConn *conn = sync_env->conn; + RGWRados *store = sync_env->store; + + char buf[32]; + snprintf(buf, sizeof(buf), "%d", shard_id); + + char max_entries_buf[32]; + snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries); + + const char *marker_key = (marker.empty() ? "" : "marker"); + + rgw_http_param_pair pairs[] = { { "type", "data" }, + { "id", buf }, + { "max-entries", max_entries_buf }, + { marker_key, marker.c_str() }, + { NULL, NULL } }; + + string p = "/admin/log/"; + + http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager); + http_op->set_user_info((void *)stack); + + int ret = http_op->aio_read(); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl; + log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl; + http_op->put(); + return ret; + } + + return 0; + } + + int request_complete() { + int ret = http_op->wait(result); + if (ret < 0 && ret != -ENOENT) { + ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl; + return ret; + } + return 0; + } +}; + +class RGWListRemoteDataLogCR : public RGWShardCollectCR { + RGWDataSyncEnv *sync_env; + + map shards; + int max_entries_per_shard; + map *result; + + map::iterator iter; +#define READ_DATALOG_MAX_CONCURRENT 10 + +public: + RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env, + map& _shards, + int _max_entries_per_shard, + map *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT), + sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard), + result(_result) { + shards.swap(_shards); + iter = shards.begin(); + } + bool spawn_next(); +}; + +bool RGWListRemoteDataLogCR::spawn_next() { + if (iter == shards.end()) { + return false; + } + + spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false); + ++iter; + return true; +} + class RGWInitDataSyncStatusCoroutine : public RGWCoroutine { RGWDataSyncEnv *sync_env; @@ -391,6 +469,15 @@ int RGWRemoteDataLog::read_source_log_shards_info(map shard_markers, map *result) +{ + if (store->is_meta_master()) { + return 0; + } + + return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result)); +} + int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger) { if (initialized) { diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index ee5d268c0880..ac3abfb9454f 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -138,6 +138,21 @@ struct rgw_data_sync_status { }; WRITE_CLASS_ENCODER(rgw_data_sync_status) +struct rgw_datalog_entry { + string key; + utime_t timestamp; + + void decode_json(JSONObj *obj); +}; + +struct rgw_datalog_shard_data { + string marker; + bool truncated; + vector entries; + + void decode_json(JSONObj *obj); +}; + class RGWAsyncRadosProcessor; class RGWDataSyncStatusManager; class RGWDataSyncControlCR; @@ -193,6 +208,7 @@ public: int read_log_info(rgw_datalog_info *log_info); int read_source_log_shards_info(map *shards_info); + int read_source_log_shards_next(map shard_markers, map *result); int get_shard_info(int shard_id); int read_sync_status(rgw_data_sync_status *sync_status); int init_sync_status(int num_shards); @@ -244,6 +260,9 @@ public: int read_source_log_shards_info(map *shards_info) { return source_log.read_source_log_shards_info(shards_info); } + int read_source_log_shards_next(map shard_markers, map *result) { + return source_log.read_source_log_shards_next(shard_markers, result); + } int run() { return source_log.run_sync(num_shards, sync_status); }