ss.str("");
}
-stringstream& push_ss(stringstream& ss, list<string>& l)
+stringstream& push_ss(stringstream& ss, list<string>& l, int tab = 0)
{
flush_ss(ss, l);
+ if (tab > 0) {
+ ss << setw(tab) << "" << setw(1);
+ }
return ss;
}
flush_ss(ss, status);
}
+static void get_data_sync_status(const string& source_zone, list<string>& status, int tab)
+{
+ RGWDataSyncStatusManager sync(store, store->get_async_rados(), source_zone);
+
+ stringstream ss;
+
+ int ret = sync.init();
+ if (ret < 0) {
+ push_ss(ss, status, tab) << string("failed to retrieve sync info: ") + cpp_strerror(-ret);
+ flush_ss(ss, status);
+ return;
+ }
+
+ ret = sync.read_sync_status();
+ if (ret < 0) {
+ status.push_back(string("failed to read sync status: ") + cpp_strerror(-ret));
+ return;
+ }
+
+ const rgw_data_sync_status& sync_status = sync.get_sync_status();
+
+ string status_str;
+ switch (sync_status.sync_info.state) {
+ case rgw_data_sync_info::StateInit:
+ status_str = "init";
+ break;
+ case rgw_data_sync_info::StateBuildingFullSyncMaps:
+ status_str = "preparing for full sync";
+ break;
+ case rgw_data_sync_info::StateSync:
+ status_str = "syncing";
+ break;
+ default:
+ status_str = "unknown";
+ }
+
+ push_ss(ss, status, tab) << status_str;
+
+ uint64_t full_total = 0;
+ uint64_t full_complete = 0;
+
+ int num_full = 0;
+ int num_inc = 0;
+ int total_shards = 0;
+
+ for (auto marker_iter : sync_status.sync_markers) {
+ full_total += marker_iter.second.total_entries;
+ total_shards++;
+ if (marker_iter.second.state == rgw_data_sync_marker::SyncState::FullSync) {
+ num_full++;
+ full_complete += marker_iter.second.pos;
+ } else {
+ full_complete += marker_iter.second.total_entries;
+ }
+ if (marker_iter.second.state == rgw_data_sync_marker::SyncState::IncrementalSync) {
+ num_inc++;
+ }
+ }
+
+ push_ss(ss, status, tab) << "full sync: " << num_full << "/" << total_shards << " shards";
+
+ if (num_full > 0) {
+ push_ss(ss, status, tab) << "full sync: " << full_total - full_complete << " buckets to sync";
+ }
+
+ push_ss(ss, status, tab) << "incremental sync: " << num_inc << "/" << total_shards << " shards";
+
+ rgw_datalog_info log_info;
+ ret = sync.read_log_info(&log_info);
+ if (ret < 0) {
+ status.push_back(string("failed to fetch local sync status: ") + cpp_strerror(-ret));
+ return;
+ }
+
+
+ map<int, RGWDataChangesLogInfo> source_shards_info;
+
+ ret = sync.read_source_log_shards_info(&source_shards_info);
+ if (ret < 0) {
+ status.push_back(string("failed to fetch master sync status: ") + cpp_strerror(-ret));
+ return;
+ }
+
+ map<int, string> shards_behind;
+
+ for (auto local_iter : sync_status.sync_markers) {
+ int shard_id = local_iter.first;
+ auto iter = source_shards_info.find(shard_id);
+
+ if (iter == source_shards_info.end()) {
+ /* huh? */
+ derr << "ERROR: could not find remote sync shard status for shard_id=" << shard_id << dendl;
+ continue;
+ }
+ auto master_marker = iter->second.marker;
+ if (master_marker > local_iter.second.marker) {
+ shards_behind[shard_id] = local_iter.second.marker;
+ }
+ }
+
+ int total_behind = shards_behind.size() + (sync_status.sync_info.num_shards - num_inc);
+ if (total_behind == 0) {
+ status.push_back("data is caught up with master");
+ } else {
+ push_ss(ss, status, tab) << "data is behind on " << total_behind << " shards";
+ }
+
+ flush_ss(ss, status);
+}
+
static void tab_dump(const string& header, int width, const list<string>& entries)
{
string s = header;
}
tab_dump("metadata sync", width, md_status);
+
+ list<string> data_status;
+
+ for (auto iter : store->zone_conn_map) {
+ const string& source_id = iter.first;
+ string zone_name;
+ string source_str = "source: ";
+ string s = source_str + source_id;
+ auto siter = store->zone_name_by_id.find(source_id);
+ if (siter != store->zone_name_by_id.end()) {
+ s += string(" (") + siter->second + ")";
+ }
+ data_status.push_back(s);
+ get_data_sync_status(source_id, data_status, source_str.size());
+ }
+
+ tab_dump("data sync", width, data_status);
}
int main(int argc, char **argv)
}
};
+class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
+ RGWDataSyncEnv *sync_env;
+
+ int num_shards;
+ map<int, RGWDataChangesLogInfo> *datalog_info;
+
+ int shard_id;
+#define READ_DATALOG_MAX_CONCURRENT 10
+
+public:
+ RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
+ int _num_shards,
+ map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
+ sync_env(_sync_env), num_shards(_num_shards),
+ datalog_info(_datalog_info), shard_id(0) {}
+ bool spawn_next();
+};
+
+bool RGWReadRemoteDataLogInfoCR::spawn_next() {
+ if (shard_id >= num_shards) {
+ return false;
+ }
+ spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
+ shard_id++;
+ return true;
+}
+
class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
RGWDataSyncEnv *sync_env;
return 0;
}
+int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
+{
+ rgw_datalog_info log_info;
+ int ret = read_log_info(&log_info);
+ if (ret < 0) {
+ return ret;
+ }
+
+ return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
+}
+
int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger)
{
if (initialized) {
#include "rgw_coroutine.h"
#include "rgw_http_client.h"
+#include "rgw_bucket.h"
#include "common/RWLock.h"
void finish();
int read_log_info(rgw_datalog_info *log_info);
+ int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info);
int get_shard_info(int shard_id);
int read_sync_status(rgw_data_sync_status *sync_status);
int init_sync_status(int num_shards);
int read_sync_status() { return source_log.read_sync_status(&sync_status); }
int init_sync_status() { return source_log.init_sync_status(num_shards); }
+ int read_log_info(rgw_datalog_info *log_info) {
+ return source_log.read_log_info(log_info);
+ }
+ int read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info) {
+ return source_log.read_source_log_shards_info(shards_info);
+ }
+
int run() { return source_log.run_sync(num_shards, sync_status); }
void wakeup(int shard_id, set<string>& keys) { return source_log.wakeup(shard_id, keys); }
const string& id = ziter->first;
RGWZone& z = ziter->second;
zone_id_by_name[z.name] = id;
+ zone_name_by_id[id] = z.name;
if (id != zone_id()) {
if (!z.endpoints.empty()) {
ldout(cct, 20) << "generating connection object for zone " << z.name << " id " << z.id << dendl;
map<string, RGWRESTConn *> zonegroup_conn_map;
map<string, string> zone_id_by_name;
+ map<string, string> zone_name_by_id;
RGWRESTConn *get_zone_conn_by_id(const string& id) {
auto citer = zone_conn_map.find(id);
JSONDecoder::decode_json("entries", entries, obj);
};
-class RGWShardCollectCR : public RGWCoroutine {
- CephContext *cct;
-
- int cur_shard;
- int current_running;
- int max_concurrent;
- int status;
-
-public:
- RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
- current_running(0),
- max_concurrent(_max_concurrent),
- status(0) {}
-
- virtual bool spawn_next() = 0;
+int RGWShardCollectCR::operate() {
+ reenter(this) {
+ while (spawn_next()) {
+ current_running++;
- int operate() {
- reenter(this) {
- while (spawn_next()) {
- current_running++;
-
- while (current_running >= max_concurrent) {
- int child_ret;
- yield wait_for_child();
- if (collect_next(&child_ret)) {
- current_running--;
- if (child_ret < 0 && child_ret != -ENOENT) {
- ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
- status = child_ret;
- }
- }
- }
- }
- while (current_running > 0) {
+ while (current_running >= max_concurrent) {
int child_ret;
yield wait_for_child();
if (collect_next(&child_ret)) {
}
}
}
- if (status < 0) {
- return set_cr_error(status);
+ }
+ while (current_running > 0) {
+ int child_ret;
+ yield wait_for_child();
+ if (collect_next(&child_ret)) {
+ current_running--;
+ if (child_ret < 0 && child_ret != -ENOENT) {
+ ldout(cct, 10) << __func__ << ": failed to fetch log status, ret=" << child_ret << dendl;
+ status = child_ret;
+ }
}
- return set_cr_done();
}
- return 0;
+ if (status < 0) {
+ return set_cr_error(status);
+ }
+ return set_cr_done();
}
-
-};
+ return 0;
+}
class RGWReadRemoteMDLogInfoCR : public RGWShardCollectCR {
RGWMetaSyncEnv *sync_env;
int operate();
};
+class RGWShardCollectCR : public RGWCoroutine {
+ CephContext *cct;
+
+ int cur_shard;
+ int current_running;
+ int max_concurrent;
+ int status;
+
+public:
+ RGWShardCollectCR(CephContext *_cct, int _max_concurrent) : RGWCoroutine(_cct),
+ current_running(0),
+ max_concurrent(_max_concurrent),
+ status(0) {}
+
+ virtual bool spawn_next() = 0;
+ int operate();
+};
#endif