--- /dev/null
+#include "common/ceph_json.h"
+#include "common/RWLock.h"
+#include "common/RefCountedObj.h"
+#include "common/WorkQueue.h"
+#include "common/Throttle.h"
+
+#include "rgw_common.h"
+#include "rgw_rados.h"
+#include "rgw_data_sync.h"
+#include "rgw_rest_conn.h"
+#include "rgw_cr_rados.h"
+#include "rgw_cr_rest.h"
+#include "rgw_http_client.h"
+#include "rgw_bucket.h"
+
+#include "cls/lock/cls_lock_client.h"
+
+#include <boost/asio/coroutine.hpp>
+#include <boost/asio/yield.hpp>
+
+
+#define dout_subsys ceph_subsys_rgw
+
+static string datalog_sync_status_oid = "datalog.sync-status";
+static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
+static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
+
+void rgw_datalog_info::decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("num_objects", num_shards, obj);
+}
+
+struct rgw_datalog_entry {
+ string key;
+ utime_t timestamp;
+
+ void decode_json(JSONObj *obj);
+};
+
+struct rgw_datalog_shard_data {
+ string marker;
+ bool truncated;
+ vector<rgw_datalog_entry> entries;
+
+ void decode_json(JSONObj *obj);
+};
+
+
+void rgw_datalog_entry::decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("key", key, obj);
+ JSONDecoder::decode_json("timestamp", timestamp, obj);
+}
+
+void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("marker", marker, obj);
+ JSONDecoder::decode_json("truncated", truncated, obj);
+ JSONDecoder::decode_json("entries", entries, obj);
+};
+
+int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
+{
+ rgw_http_param_pair pairs[] = { { "type", "data" },
+ { NULL, NULL } };
+
+ int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
+ return ret;
+ }
+
+ ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
+
+ return 0;
+}
+
+int RGWRemoteDataLog::init(RGWRESTConn *_conn)
+{
+ CephContext *cct = store->ctx();
+ async_rados = new RGWAsyncRadosProcessor(store, cct->_conf->rgw_num_async_rados_threads);
+ async_rados->start();
+
+ conn = _conn;
+
+ int ret = http_manager.set_threaded();
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+ return ret;
+ }
+
+ return 0;
+}
+
+void RGWRemoteDataLog::finish()
+{
+ stop();
+ if (async_rados) {
+ async_rados->stop();
+ }
+ delete async_rados;
+}
+
+int RGWRemoteDataLog::list_shards(int num_shards)
+{
+ for (int i = 0; i < (int)num_shards; i++) {
+ int ret = list_shard(i);
+ if (ret < 0) {
+ ldout(store->ctx(), 10) << "failed to list shard: ret=" << ret << dendl;
+ }
+ }
+
+ return 0;
+}
+
+int RGWRemoteDataLog::list_shard(int shard_id)
+{
+ conn = store->rest_master_conn;
+
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%d", shard_id);
+
+ rgw_http_param_pair pairs[] = { { "type", "data" },
+ { "id", buf },
+ { NULL, NULL } };
+
+ rgw_datalog_shard_data data;
+ int ret = conn->get_json_resource("/admin/log", pairs, data);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog data" << dendl;
+ return ret;
+ }
+
+ ldout(store->ctx(), 20) << "remote datalog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
+
+ vector<rgw_datalog_entry>::iterator iter;
+ for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
+ rgw_datalog_entry& entry = *iter;
+ ldout(store->ctx(), 20) << "entry: key=" << entry.key << dendl;
+ }
+
+ return 0;
+}
+
+int RGWRemoteDataLog::get_shard_info(int shard_id)
+{
+ conn = store->rest_master_conn;
+
+ char buf[32];
+ snprintf(buf, sizeof(buf), "%d", shard_id);
+
+ rgw_http_param_pair pairs[] = { { "type", "data" },
+ { "id", buf },
+ { "info", NULL },
+ { NULL, NULL } };
+
+ RGWDataChangesLogInfo info;
+ int ret = conn->get_json_resource("/admin/log", pairs, info);
+ if (ret < 0) {
+ ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
+ return ret;
+ }
+
+ ldout(store->ctx(), 20) << "remote datalog, shard_id=" << shard_id << " marker=" << info.marker << dendl;
+
+ return 0;
+}
+
+int RGWDataSyncStatusManager::init()
+{
+ map<string, RGWRESTConn *>::iterator iter = store->zone_conn_map.find(source_zone);
+ if (iter == store->zone_conn_map.end()) {
+ lderr(store->ctx()) << "no REST connection to master zone" << dendl;
+ return -EIO;
+ }
+
+ conn = iter->second;
+
+ const char *log_pool = store->get_zone_params().log_pool.name.c_str();
+ librados::Rados *rados = store->get_rados_handle();
+ int r = rados->ioctx_create(log_pool, ioctx);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->get_zone_params().log_pool.name << " ret=" << r << dendl;
+ return r;
+ }
+
+ source_status_obj = rgw_obj(store->get_zone_params().log_pool, datalog_sync_status_oid);
+
+ r = source_log.init(conn);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
+ return r;
+ }
+
+ rgw_datalog_info datalog_info;
+ r = source_log.read_log_info(&datalog_info);
+ if (r < 0) {
+ lderr(store->ctx()) << "ERROR: master.read_log_info() returned r=" << r << dendl;
+ return r;
+ }
+
+ num_shards = datalog_info.num_shards;
+
+ for (int i = 0; i < num_shards; i++) {
+ shard_objs[i] = rgw_obj(store->get_zone_params().log_pool, shard_obj_name(source_zone, i));
+ }
+
+ return 0;
+}
+
+string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
+{
+ char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
+ snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
+
+ return string(buf);
+}
+
+
--- /dev/null
+#ifndef CEPH_RGW_DATA_SYNC_H
+#define CEPH_RGW_DATA_SYNC_H
+
+#include "rgw_coroutine.h"
+#include "rgw_http_client.h"
+
+#include "common/RWLock.h"
+
+
+struct rgw_datalog_info {
+ uint32_t num_shards;
+
+ rgw_datalog_info() : num_shards(0) {}
+
+ void decode_json(JSONObj *obj);
+};
+
+struct rgw_data_sync_info {
+ enum SyncState {
+ StateInit = 0,
+ StateBuildingFullSyncMaps = 1,
+ StateSync = 2,
+ };
+
+ uint16_t state;
+ uint32_t num_shards;
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(state, bl);
+ ::encode(num_shards, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(state, bl);
+ ::decode(num_shards, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const {
+ string s;
+ switch ((SyncState)state) {
+ case StateInit:
+ s = "init";
+ break;
+ case StateBuildingFullSyncMaps:
+ s = "building-full-sync-maps";
+ break;
+ case StateSync:
+ s = "sync";
+ break;
+ default:
+ s = "unknown";
+ break;
+ }
+ encode_json("status", s, f);
+ encode_json("num_shards", num_shards, f);
+ }
+
+ rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
+};
+WRITE_CLASS_ENCODER(rgw_data_sync_info)
+
+struct rgw_data_sync_marker {
+ enum SyncState {
+ FullSync = 0,
+ IncrementalSync = 1,
+ };
+ uint16_t state;
+ string marker;
+ string next_step_marker;
+
+ rgw_data_sync_marker() : state(FullSync) {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(state, bl);
+ ::encode(marker, bl);
+ ::encode(next_step_marker, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(state, bl);
+ ::decode(marker, bl);
+ ::decode(next_step_marker, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const {
+ encode_json("state", (int)state, f);
+ encode_json("marker", marker, f);
+ encode_json("next_step_marker", next_step_marker, f);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_data_sync_marker)
+
+struct rgw_data_sync_status {
+ rgw_data_sync_info sync_info;
+ map<uint32_t, rgw_data_sync_marker> sync_markers;
+
+ rgw_data_sync_status() {}
+
+ void encode(bufferlist& bl) const {
+ ENCODE_START(1, 1, bl);
+ ::encode(sync_info, bl);
+ ::encode(sync_markers, bl);
+ ENCODE_FINISH(bl);
+ }
+
+ void decode(bufferlist::iterator& bl) {
+ DECODE_START(1, bl);
+ ::decode(sync_info, bl);
+ ::decode(sync_markers, bl);
+ DECODE_FINISH(bl);
+ }
+
+ void dump(Formatter *f) const {
+ encode_json("info", sync_info, f);
+ encode_json("markers", sync_markers, f);
+ }
+};
+WRITE_CLASS_ENCODER(rgw_data_sync_status)
+
+class RGWAsyncRadosProcessor;
+class RGWDataSyncStatusManager;
+class RGWDataSyncCR;
+
+class RGWRemoteDataLog : public RGWCoroutinesManager {
+ RGWRados *store;
+ RGWRESTConn *conn;
+ RGWAsyncRadosProcessor *async_rados;
+
+ RGWHTTPManager http_manager;
+ RGWDataSyncStatusManager *status_manager;
+
+ RGWDataSyncCR *data_sync_cr;
+
+public:
+ RGWRemoteDataLog(RGWRados *_store, RGWDataSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx()), store(_store),
+ conn(NULL),
+ http_manager(store->ctx(), &completion_mgr),
+ status_manager(_sm), data_sync_cr(NULL) {}
+
+ int init(RGWRESTConn *_conn);
+ void finish();
+
+ int read_log_info(rgw_datalog_info *log_info);
+ int list_shard(int shard_id);
+ int list_shards(int num_shards);
+ int get_shard_info(int shard_id);
+ int read_sync_status(rgw_data_sync_status *sync_status);
+ int init_sync_status(int num_shards);
+ int set_sync_info(const rgw_data_sync_info& sync_info);
+ int run_sync(int num_shards, rgw_data_sync_status& sync_status);
+
+ void wakeup(int shard_id);
+};
+
+class RGWDataSyncStatusManager {
+ RGWRados *store;
+ librados::IoCtx ioctx;
+
+ string source_zone;
+ RGWRESTConn *conn;
+
+ RGWRemoteDataLog source_log;
+
+ string source_status_oid;
+ string source_shard_status_oid_prefix;
+ rgw_obj source_status_obj;
+
+ rgw_data_sync_status sync_status;
+ map<int, rgw_obj> shard_objs;
+
+ int num_shards;
+
+public:
+ RGWDataSyncStatusManager(RGWRados *_store, const string& _source_zone) : store(_store), source_zone(_source_zone), conn(NULL),
+ source_log(store, this), num_shards(0) {}
+ int init();
+
+ rgw_data_sync_status& get_sync_status() { return sync_status; }
+
+ static string shard_obj_name(const string& source_zone, int shard_id);
+
+ int read_sync_status() { return source_log.read_sync_status(&sync_status); }
+ int init_sync_status() { return source_log.init_sync_status(num_shards); }
+
+ int run() { return source_log.run_sync(num_shards, sync_status); }
+
+ void wakeup(int shard_id) { return source_log.wakeup(shard_id); }
+ void stop() {
+ source_log.finish();
+ }
+};
+
+#endif