OPT_PERIOD_LIST,
OPT_PERIOD_UPDATE,
OPT_PERIOD_COMMIT,
+ OPT_SYNC_STATUS,
};
static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_cmd, bool *need_more)
return OPT_REPLICALOG_UPDATE;
if (strcmp(cmd, "delete") == 0)
return OPT_REPLICALOG_DELETE;
+ } else if (strcmp(prev_cmd, "sync") == 0) {
+ if (strcmp(cmd, "status") == 0)
+ return OPT_SYNC_STATUS;
}
return -EINVAL;
return 0;
}
+static void get_md_sync_status(list<string>& status)
+{
+ RGWMetaSyncStatusManager sync(store, store->get_async_rados());
+
+ int ret = sync.init();
+ if (ret < 0) {
+ status.push_back(string("failed to retrieve sync info: sync.init() failed: ") + cpp_strerror(-ret));
+ return;
+ }
+
+ ret = sync.read_sync_status();
+ if (ret < 0) {
+ status.push_back(string("failed to read sync status: ") + cpp_strerror(-ret));
+ return;
+ }
+
+ const rgw_meta_sync_status& sync_status = sync.get_sync_status();
+
+ string status_str;
+ switch (sync_status.sync_info.state) {
+ case rgw_meta_sync_info::StateInit:
+ status_str = "init";
+ break;
+ case rgw_meta_sync_info::StateBuildingFullSyncMaps:
+ status_str = "preparing for full sync";
+ break;
+ case rgw_meta_sync_info::StateSync:
+ status_str = "syncing";
+ break;
+ default:
+ status_str = "unknown";
+ }
+
+ status.push_back(status_str);
+
+ uint64_t full_total = 0;
+ uint64_t full_complete = 0;
+
+ int num_full = 0;
+ int num_inc = 0;
+ int total_shards = 0;
+
+ for (auto marker_iter : sync_status.sync_markers) {
+ full_total += marker_iter.second.total_entries;
+ total_shards++;
+ if (marker_iter.second.state == rgw_meta_sync_marker::SyncState::FullSync) {
+ num_full++;
+ full_complete += marker_iter.second.pos;
+ } else {
+ full_complete += marker_iter.second.total_entries;
+ }
+ if (marker_iter.second.state == rgw_meta_sync_marker::SyncState::IncrementalSync) {
+ num_inc++;
+ }
+ }
+
+ stringstream ss;
+ ss << "full sync: " << num_full << "/" << total_shards << " shards";
+ status.push_back(ss.str());
+ ss.str("");
+
+ if (num_full > 0) {
+ ss << "full sync: " << full_total - full_complete << " entries to sync";
+ status.push_back(ss.str());
+ ss.str("");
+ }
+
+ ss << "incremental sync: " << num_inc << "/" << total_shards << " shards";
+ status.push_back(ss.str());
+ ss.str("");
+
+ rgw_mdlog_info log_info;
+ ret = sync.read_log_info(&log_info);
+ if (ret < 0) {
+ status.push_back(string("failed to fetch local sync status: ") + cpp_strerror(-ret));
+ return;
+ }
+
+ map<int, RGWMetadataLogInfo> master_shards_info;
+ string master_period;
+
+ ret = sync.read_master_log_shards_info(&master_period, &master_shards_info);
+ if (ret < 0) {
+ status.push_back(string("failed to fetch master sync status: ") + cpp_strerror(-ret));
+ return;
+ }
+
+ vector<int> shards_behind;
+
+ if (sync_status.sync_info.period != master_period) {
+ status.push_back(string("master is on a different period: master_period=" + master_period + " local_period=" + sync_status.sync_info.period));
+ } else {
+ for (auto local_iter : sync_status.sync_markers) {
+ int shard_id = local_iter.first;
+ auto iter = master_shards_info.find(shard_id);
+
+ if (iter == master_shards_info.end()) {
+ /* huh? */
+ derr << "ERROR: could not find remote sync shard status for shard_id=" << shard_id << dendl;
+ continue;
+ }
+ auto master_marker = iter->second.marker;
+ if (master_marker > local_iter.second.marker) {
+ shards_behind.push_back(shard_id);
+ }
+ }
+ }
+
+ int total_behind = shards_behind.size() + (sync_status.sync_info.num_shards - num_inc);
+ if (total_behind == 0) {
+ status.push_back("metadata is caught up with master");
+ } else {
+ ss << "metadata is behind on " << total_behind << " shards";
+ status.push_back(ss.str());
+ ss.str("");
+ }
+
+}
+
+static void tab_dump(const string& header, int width, const list<string>& entries)
+{
+ string s = header;
+
+ for (auto e : entries) {
+ cout << std::setw(width) << s << std::setw(1) << " " << e << std::endl;
+ s.clear();
+ }
+}
+
+
+static void sync_status(Formatter *formatter)
+{
+ RGWZoneGroup zonegroup = store->get_zonegroup();
+ RGWZone& zone = store->get_zone();
+
+ int width = 20;
+
+ cout << std::setw(width) << "zonegroup" << std::setw(1) << " " << zonegroup.get_id() << " (" << zonegroup.get_name() << ")" << std::endl;
+ cout << std::setw(width) << "zone" << std::setw(1) << " " << zone.id << " (" << zone.name << ")" << std::endl;
+
+ list<string> md_status;
+
+ if (zone.id == zonegroup.master_zone) {
+ md_status.push_back("no sync (zone is master)");
+ } else {
+ get_md_sync_status(md_status);
+ }
+
+ tab_dump("metadata sync", 20, md_status);
+}
+
int main(int argc, char **argv)
{
vector<const char*> args;
}
}
+ if (opt_cmd == OPT_SYNC_STATUS) {
+ sync_status(formatter);
+ }
+
if (opt_cmd == OPT_METADATA_SYNC_STATUS) {
RGWMetaSyncStatusManager sync(store, store->get_async_rados());
JSONDecoder::decode_json("entries", entries, obj);
};
+class RGWShardCollectCR : public RGWCoroutine {
+ CephContext *cct;
+
+ int cur_shard;
+ int current_running;
+ int max_concurrent;
+ int status;
+
+public:
+ RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
+ current_running(0),
+ max_concurrent(_max_concurrent),
+ status(0) {}
+
+ virtual bool spawn_next() = 0;
+
+ int operate() {
+ reenter(this) {
+ while (spawn_next()) {
+ current_running++;
+
+ while (current_running >= max_concurrent) {
+ int child_ret;
+ yield wait_for_child();
+ if (collect_next(&child_ret)) {
+ current_running--;
+ if (child_ret < 0 && child_ret != -ENOENT) {
+ ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
+ status = child_ret;
+ }
+ }
+ }
+ }
+ while (current_running > 0) {
+ int child_ret;
+ yield wait_for_child();
+ if (collect_next(&child_ret)) {
+ current_running--;
+ if (child_ret < 0 && child_ret != -ENOENT) {
+ ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
+ status = child_ret;
+ }
+ }
+ }
+ if (status < 0) {
+ return set_cr_error(status);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+
+};
+
+class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
+ RGWMetaSyncEnv *sync_env;
+ RGWMetadataLog *mdlog;
+
+ const std::string& period;
+ int num_shards;
+ map<int, RGWMetadataLogInfo> *mdlog_info;
+
+ int shard_id;
+#define READ_MDLOG_MAX_CONCURRENT 10
+
+public:
+ RGWReadRemoteMDLogInfoCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
+ const std::string& period, int _num_shards,
+ map<int, RGWMetadataLogInfo> *_mdlog_info) : RGWShardCollectCR(_sync_env->cct, READ_MDLOG_MAX_CONCURRENT),
+ sync_env(_sync_env), mdlog(mdlog),
+ period(period), num_shards(_num_shards),
+ mdlog_info(_mdlog_info), shard_id(0) {}
+ bool spawn_next();
+};
+
RGWRemoteMetaLog::~RGWRemoteMetaLog()
{
delete error_logger;
return 0;
}
+int RGWRemoteMetaLog::read_master_log_shards_info(string *master_period, map<int, RGWMetadataLogInfo> *shards_info)
+{
+ if (store->is_meta_master()) {
+ return 0;
+ }
+
+ rgw_mdlog_info log_info;
+ int ret = read_log_info(&log_info);
+ if (ret < 0) {
+ return ret;
+ }
+
+ *master_period = log_info.period;
+
+ RGWObjectCtx obj_ctx(store, NULL);
+ auto mdlog = store->meta_mgr->get_log(log_info.period);
+ return run(new RGWReadRemoteMDLogInfoCR(&sync_env, mdlog, log_info.period, log_info.num_shards, shards_info));
+}
+
int RGWRemoteMetaLog::init()
{
conn = store->rest_master_conn;
}
};
+class RGWListRemoteMDLogShardCR : public RGWSimpleCoroutine {
+ RGWMetaSyncEnv *sync_env;
+ RGWRESTReadResource *http_op;
+
+ const std::string& period;
+ int shard_id;
+ string marker;
+ uint32_t max_entries;
+ rgw_mdlog_shard_data *result;
+
+public:
+ RGWListRemoteMDLogShardCR(RGWMetaSyncEnv *env, const std::string& period,
+ int _shard_id, const string& _marker, uint32_t _max_entries,
+ rgw_mdlog_shard_data *_result)
+ : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
+ period(period), shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
+
+ int send_request() {
+ RGWRESTConn *conn = sync_env->conn;
+ RGWRados *store = sync_env->store;
+
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%d", shard_id);
+
+ char max_entries_buf[32];
+ snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
+
+ const char *marker_key = (marker.empty() ? "" : "marker");
+
+ rgw_http_param_pair pairs[] = { { "type", "metadata" },
+ { "id", buf },
+ { "period", period.c_str() },
+ { "max-entries", max_entries_buf },
+ { marker_key, marker.c_str() },
+ { NULL, NULL } };
+
+ string p = "/admin/log/";
+
+ http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
+ http_op->set_user_info((void *)stack);
+
+ int ret = http_op->aio_read();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
+ log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
+ http_op->put();
+ return ret;
+ }
+
+ return 0;
+ }
+
+ int handle_response() {
+ int ret = http_op->wait(result);
+ if (ret < 0 && ret != -ENOENT) {
+ ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote mdlog shard, ret=" << ret << dendl;
+ return ret;
+ }
+ return 0;
+ }
+};
+
+bool RGWReadRemoteMDLogInfoCR::spawn_next() {
+ spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, period, shard_id, &(*mdlog_info)[shard_id]), false);
+ shard_id++;
+ return (shard_id < num_shards);
+}
+
class RGWInitSyncStatusCoroutine : public RGWCoroutine {
RGWMetaSyncEnv *sync_env;
RGWObjectCtx& obj_ctx;