]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: initial data sync work
authorYehuda Sadeh <yehuda@redhat.com>
Tue, 15 Sep 2015 21:37:46 +0000 (14:37 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:12:51 +0000 (16:12 -0800)
Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/CMakeLists.txt
src/rgw/Makefile.am
src/rgw/rgw_admin.cc
src/rgw/rgw_data_sync.cc [new file with mode: 0644]
src/rgw/rgw_data_sync.h [new file with mode: 0644]

index f01f98dee9c4033a36e9b0d75b7227cdb17725d3..56d480a5c6ce1135e6e7a07330ea98bdc3500ad2 100644 (file)
@@ -1163,6 +1163,7 @@ if(${WITH_RADOSGW})
     rgw/rgw_keystone.cc
     rgw/rgw_quota.cc
     rgw/rgw_sync.cc
+    rgw/rgw_data_sync.cc
     rgw/rgw_dencoder.cc
     rgw/rgw_coroutine.cc
     rgw/rgw_cr_rados.cc
@@ -1197,12 +1198,14 @@ if(${WITH_RADOSGW})
     rgw/rgw_civetweb_log.cc
     civetweb/src/civetweb.c
     rgw/rgw_main.cc
-    rgw/rgw_sync.cc)
+    rgw/rgw_sync.cc
+    rgw/rgw_data_sync.cc)
 
   set(radosgw_admin_srcs
     rgw/rgw_admin.cc
     rgw/rgw_orphan.cc
-    rgw/rgw_sync.cc)
+    rgw/rgw_sync.cc
+    rgw/rgw_data_sync.cc)
 
   set(radosgw_object_expirer_srcs
     rgw/rgw_object_expirer.cc)
index 079b3af00fe3558b301585924ebb76a046669f32..6466d2183d76b18f874679f9188e000d82c7a92e 100644 (file)
@@ -58,10 +58,11 @@ librgw_la_SOURCES =  \
        rgw/rgw_replica_log.cc \
        rgw/rgw_keystone.cc \
        rgw/rgw_quota.cc \
-       rgw/rgw_sync.cc \
        rgw/rgw_dencoder.cc \
        rgw/rgw_object_expirer_core.cc \
-       rgw/rgw_website.cc
+       rgw/rgw_website.cc \
+       rgw/rgw_sync.cc \
+       rgw/rgw_data_sync.cc
 
 librgw_la_CXXFLAGS = -Woverloaded-virtual ${AM_CXXFLAGS}
 noinst_LTLIBRARIES += librgw.la
@@ -192,6 +193,7 @@ noinst_HEADERS += \
        rgw/rgw_rest_replica_log.h \
        rgw/rgw_rest_config.h \
        rgw/rgw_sync.h \
+       rgw/rgw_data_sync.h \
        rgw/rgw_usage.h \
        rgw/rgw_user.h \
        rgw/rgw_bucket.h \
index 4754b413ba3129d181d086f59b28a42be39e3608..6a2138111b74ddb55788a493aa04521e782b8d4b 100644 (file)
@@ -32,6 +32,7 @@
 #include "rgw_replica_log.h"
 #include "rgw_orphan.h"
 #include "rgw_sync.h"
+#include "rgw_data_sync.h"
 #include "rgw_rest_conn.h"
 
 using namespace std;
diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc
new file mode 100644 (file)
index 0000000..2e6951a
--- /dev/null
@@ -0,0 +1,216 @@
+#include "common/ceph_json.h"
+#include "common/RWLock.h"
+#include "common/RefCountedObj.h"
+#include "common/WorkQueue.h"
+#include "common/Throttle.h"
+
+#include "rgw_common.h"
+#include "rgw_rados.h"
+#include "rgw_data_sync.h"
+#include "rgw_rest_conn.h"
+#include "rgw_cr_rados.h"
+#include "rgw_cr_rest.h"
+#include "rgw_http_client.h"
+#include "rgw_bucket.h"
+
+#include "cls/lock/cls_lock_client.h"
+
+#include <boost/asio/coroutine.hpp>
+#include <boost/asio/yield.hpp>
+
+
+#define dout_subsys ceph_subsys_rgw
+
+static string datalog_sync_status_oid = "datalog.sync-status";
+static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
+static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
+
+void rgw_datalog_info::decode_json(JSONObj *obj) {
+  JSONDecoder::decode_json("num_objects", num_shards, obj);
+}
+
+struct rgw_datalog_entry {
+  string key;
+  utime_t timestamp;
+
+  void decode_json(JSONObj *obj);
+};
+
+struct rgw_datalog_shard_data {
+  string marker;
+  bool truncated;
+  vector<rgw_datalog_entry> entries;
+
+  void decode_json(JSONObj *obj);
+};
+
+
+void rgw_datalog_entry::decode_json(JSONObj *obj) {
+  JSONDecoder::decode_json("key", key, obj);
+  JSONDecoder::decode_json("timestamp", timestamp, obj);
+}
+
+void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
+  JSONDecoder::decode_json("marker", marker, obj);
+  JSONDecoder::decode_json("truncated", truncated, obj);
+  JSONDecoder::decode_json("entries", entries, obj);
+};
+
+int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
+{
+  rgw_http_param_pair pairs[] = { { "type", "data" },
+                                  { NULL, NULL } };
+
+  int ret = conn->get_json_resource("/admin/log", pairs, *log_info);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
+    return ret;
+  }
+
+  ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
+
+  return 0;
+}
+
+int RGWRemoteDataLog::init(RGWRESTConn *_conn)
+{
+  CephContext *cct = store->ctx();
+  async_rados = new RGWAsyncRadosProcessor(store, cct->_conf->rgw_num_async_rados_threads);
+  async_rados->start();
+
+  conn = _conn;
+
+  int ret = http_manager.set_threaded();
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+    return ret;
+  }
+
+  return 0;
+}
+
+void RGWRemoteDataLog::finish()
+{
+  stop();
+  if (async_rados) {
+    async_rados->stop();
+  }
+  delete async_rados;
+}
+
+int RGWRemoteDataLog::list_shards(int num_shards)
+{
+  for (int i = 0; i < (int)num_shards; i++) {
+    int ret = list_shard(i);
+    if (ret < 0) {
+      ldout(store->ctx(), 10) << "failed to list shard: ret=" << ret << dendl;
+    }
+  }
+
+  return 0;
+}
+
+int RGWRemoteDataLog::list_shard(int shard_id)
+{
+  conn = store->rest_master_conn;
+
+  char buf[32];
+  snprintf(buf, sizeof(buf), "%d", shard_id);
+
+  rgw_http_param_pair pairs[] = { { "type", "data" },
+                                  { "id", buf },
+                                  { NULL, NULL } };
+
+  rgw_datalog_shard_data data;
+  int ret = conn->get_json_resource("/admin/log", pairs, data);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog data" << dendl;
+    return ret;
+  }
+
+  ldout(store->ctx(), 20) << "remote datalog, shard_id=" << shard_id << " num of shard entries: " << data.entries.size() << dendl;
+
+  vector<rgw_datalog_entry>::iterator iter;
+  for (iter = data.entries.begin(); iter != data.entries.end(); ++iter) {
+    rgw_datalog_entry& entry = *iter;
+    ldout(store->ctx(), 20) << "entry: key=" << entry.key << dendl;
+  }
+
+  return 0;
+}
+
+int RGWRemoteDataLog::get_shard_info(int shard_id)
+{
+  conn = store->rest_master_conn;
+
+  char buf[32];
+  snprintf(buf, sizeof(buf), "%d", shard_id);
+
+  rgw_http_param_pair pairs[] = { { "type", "data" },
+                                  { "id", buf },
+                                  { "info", NULL },
+                                  { NULL, NULL } };
+
+  RGWDataChangesLogInfo info;
+  int ret = conn->get_json_resource("/admin/log", pairs, info);
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
+    return ret;
+  }
+
+  ldout(store->ctx(), 20) << "remote datalog, shard_id=" << shard_id << " marker=" << info.marker << dendl;
+
+  return 0;
+}
+
+int RGWDataSyncStatusManager::init()
+{
+  map<string, RGWRESTConn *>::iterator iter = store->zone_conn_map.find(source_zone);
+  if (iter == store->zone_conn_map.end()) {
+    lderr(store->ctx()) << "no REST connection to master zone" << dendl;
+    return -EIO;
+  }
+
+  conn = iter->second;
+
+  const char *log_pool = store->get_zone_params().log_pool.name.c_str();
+  librados::Rados *rados = store->get_rados_handle();
+  int r = rados->ioctx_create(log_pool, ioctx);
+  if (r < 0) {
+    lderr(store->ctx()) << "ERROR: failed to open log pool (" << store->get_zone_params().log_pool.name << " ret=" << r << dendl;
+    return r;
+  }
+
+  source_status_obj = rgw_obj(store->get_zone_params().log_pool, datalog_sync_status_oid);
+
+  r = source_log.init(conn);
+  if (r < 0) {
+    lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
+    return r;
+  }
+
+  rgw_datalog_info datalog_info;
+  r = source_log.read_log_info(&datalog_info);
+  if (r < 0) {
+    lderr(store->ctx()) << "ERROR: master.read_log_info() returned r=" << r << dendl;
+    return r;
+  }
+
+  num_shards = datalog_info.num_shards;
+
+  for (int i = 0; i < num_shards; i++) {
+    shard_objs[i] = rgw_obj(store->get_zone_params().log_pool, shard_obj_name(source_zone, i));
+  }
+
+  return 0;
+}
+
+string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
+{
+  char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
+  snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
+
+  return string(buf);
+}
+
+
diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h
new file mode 100644 (file)
index 0000000..c080602
--- /dev/null
@@ -0,0 +1,201 @@
+#ifndef CEPH_RGW_DATA_SYNC_H
+#define CEPH_RGW_DATA_SYNC_H
+
+#include "rgw_coroutine.h"
+#include "rgw_http_client.h"
+
+#include "common/RWLock.h"
+
+
+struct rgw_datalog_info {
+  uint32_t num_shards;
+
+  rgw_datalog_info() : num_shards(0) {}
+
+  void decode_json(JSONObj *obj);
+};
+
+struct rgw_data_sync_info {
+  enum SyncState {
+    StateInit = 0,
+    StateBuildingFullSyncMaps = 1,
+    StateSync = 2,
+  };
+
+  uint16_t state;
+  uint32_t num_shards;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(state, bl);
+    ::encode(num_shards, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+     DECODE_START(1, bl);
+     ::decode(state, bl);
+     ::decode(num_shards, bl);
+     DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const {
+    string s;
+    switch ((SyncState)state) {
+      case StateInit:
+       s = "init";
+       break;
+      case StateBuildingFullSyncMaps:
+       s = "building-full-sync-maps";
+       break;
+      case StateSync:
+       s = "sync";
+       break;
+      default:
+       s = "unknown";
+       break;
+    }
+    encode_json("status", s, f);
+    encode_json("num_shards", num_shards, f);
+  }
+
+  rgw_data_sync_info() : state((int)StateInit), num_shards(0) {}
+};
+WRITE_CLASS_ENCODER(rgw_data_sync_info)
+
+struct rgw_data_sync_marker {
+  enum SyncState {
+    FullSync = 0,
+    IncrementalSync = 1,
+  };
+  uint16_t state;
+  string marker;
+  string next_step_marker;
+
+  rgw_data_sync_marker() : state(FullSync) {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(state, bl);
+    ::encode(marker, bl);
+    ::encode(next_step_marker, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+     DECODE_START(1, bl);
+    ::decode(state, bl);
+    ::decode(marker, bl);
+    ::decode(next_step_marker, bl);
+     DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const {
+    encode_json("state", (int)state, f);
+    encode_json("marker", marker, f);
+    encode_json("next_step_marker", next_step_marker, f);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_data_sync_marker)
+
+struct rgw_data_sync_status {
+  rgw_data_sync_info sync_info;
+  map<uint32_t, rgw_data_sync_marker> sync_markers;
+
+  rgw_data_sync_status() {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(sync_info, bl);
+    ::encode(sync_markers, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+     DECODE_START(1, bl);
+    ::decode(sync_info, bl);
+    ::decode(sync_markers, bl);
+     DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const {
+    encode_json("info", sync_info, f);
+    encode_json("markers", sync_markers, f);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_data_sync_status)
+
+class RGWAsyncRadosProcessor;
+class RGWDataSyncStatusManager;
+class RGWDataSyncCR;
+
+class RGWRemoteDataLog : public RGWCoroutinesManager {
+  RGWRados *store;
+  RGWRESTConn *conn;
+  RGWAsyncRadosProcessor *async_rados;
+
+  RGWHTTPManager http_manager;
+  RGWDataSyncStatusManager *status_manager;
+
+  RGWDataSyncCR *data_sync_cr;
+
+public:
+  RGWRemoteDataLog(RGWRados *_store, RGWDataSyncStatusManager *_sm) : RGWCoroutinesManager(_store->ctx()), store(_store),
+                                       conn(NULL),
+                                       http_manager(store->ctx(), &completion_mgr),
+                                       status_manager(_sm), data_sync_cr(NULL) {}
+
+  int init(RGWRESTConn *_conn);
+  void finish();
+
+  int read_log_info(rgw_datalog_info *log_info);
+  int list_shard(int shard_id);
+  int list_shards(int num_shards);
+  int get_shard_info(int shard_id);
+  int read_sync_status(rgw_data_sync_status *sync_status);
+  int init_sync_status(int num_shards);
+  int set_sync_info(const rgw_data_sync_info& sync_info);
+  int run_sync(int num_shards, rgw_data_sync_status& sync_status);
+
+  void wakeup(int shard_id);
+};
+
+class RGWDataSyncStatusManager {
+  RGWRados *store;
+  librados::IoCtx ioctx;
+
+  string source_zone;
+  RGWRESTConn *conn;
+
+  RGWRemoteDataLog source_log;
+
+  string source_status_oid;
+  string source_shard_status_oid_prefix;
+  rgw_obj source_status_obj;
+
+  rgw_data_sync_status sync_status;
+  map<int, rgw_obj> shard_objs;
+
+  int num_shards;
+
+public:
+  RGWDataSyncStatusManager(RGWRados *_store, const string& _source_zone) : store(_store), source_zone(_source_zone), conn(NULL),
+                                                                           source_log(store, this), num_shards(0) {}
+  int init();
+
+  rgw_data_sync_status& get_sync_status() { return sync_status; }
+
+  static string shard_obj_name(const string& source_zone, int shard_id);
+
+  int read_sync_status() { return source_log.read_sync_status(&sync_status); }
+  int init_sync_status() { return source_log.init_sync_status(num_shards); }
+
+  int run() { return source_log.run_sync(num_shards, sync_status); }
+
+  void wakeup(int shard_id) { return source_log.wakeup(shard_id); }
+  void stop() {
+    source_log.finish();
+  }
+};
+
+#endif