#include "common/ceph_json.h"
+#include "common/RWLock.h"
#include "rgw_common.h"
#include "rgw_rados.h"
ldout(store->ctx(), 20) << "remote mdlog, num_shards=" << log_info.num_shards << dendl;
+ RWLock::WLocker wl(ts_to_shard_lock);
+ for (int i = 0; i < (int)log_info.num_shards; i++) {
+ clone_markers.push_back(string());
+ utime_shard ut;
+ ut.shard_id = i;
+ ts_to_shard[ut] = i;
+ }
return 0;
}
return 0;
}
+int RGWRemoteMetaLog::get_shard_info(int shard_id)
+{
+ conn = store->rest_master_conn;
+
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%d", shard_id);
+
+ rgw_http_param_pair pairs[] = { { "type", "metadata" },
+ { "id", buf },
+ { "info", NULL },
+ { NULL, NULL } };
+
+ RGWMetadataLogInfo info;
+ int ret = conn->get_json_resource("/admin/log", pairs, info);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to fetch mdlog info" << dendl;
+ return ret;
+ }
+
+ ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " marker=" << info.marker << dendl;
+
+ return 0;
+}
+
int RGWRemoteMetaLog::clone_shards()
{
+ bool truncated;
+
+ list<int> active_shards;
for (int i = 0; i < (int)log_info.num_shards; i++) {
- int ret = clone_shard(i);
- if (ret < 0) {
- ldout(store->ctx(), 10) << "failed to clone shard: ret=" << ret << dendl;
- }
+ active_shards.push_back(i);
}
+ do {
+ truncated = false;
+
+ list<int> next_active_shards;
+
+ for (list<int>::iterator iter = active_shards.begin(); iter != active_shards.end(); ++iter) {
+ int shard_id = *iter;
+ bool shard_truncated;
+ string& shard_marker = clone_markers[shard_id];
+ int ret = clone_shard(shard_id, shard_marker, &shard_marker, &shard_truncated);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to clone shard: ret=" << ret << dendl;
+ }
+ truncated |= shard_truncated;
+
+ if (shard_truncated) {
+ next_active_shards.push_back(shard_id);
+ }
+ }
+ active_shards.swap(next_active_shards);
+ } while (truncated);
return 0;
}
-int RGWRemoteMetaLog::clone_shard(int shard_id)
+int RGWRemoteMetaLog::clone_shard(int shard_id, const string& marker, string *new_marker, bool *truncated)
{
+ *truncated = false;
+
conn = store->rest_master_conn;
char buf[32];
snprintf(buf, sizeof(buf), "%d", shard_id);
+#define CLONE_MAX_ENTRIES 100
+ int max_entries = CLONE_MAX_ENTRIES;
+
+ char max_entries_buf[32];
+ snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", max_entries);
+
+ const char *marker_key = (marker.empty() ? "" : "marker");
+
rgw_http_param_pair pairs[] = { { "type", "metadata" },
{ "id", buf },
+ { "max-entries", max_entries_buf },
+ { marker_key, marker.c_str() },
{ NULL, NULL } };
rgw_mdlog_shard_data data;
ldout(store->ctx(), 20) << "remote mdlog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
+ *truncated = ((int)data.entries.size() == max_entries);
+
if (data.entries.empty()) {
return 0;
}
::encode(entry.log_data, dest_entry.data);
dest_entries.push_back(dest_entry);
+
+ *new_marker = entry.id;
}
ret = store->meta_mgr->store_md_log_entries(dest_entries, shard_id);