]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: per-bucket instance shard state
authorYehuda Sadeh <yehuda@redhat.com>
Wed, 16 Sep 2015 22:47:54 +0000 (15:47 -0700)
committerYehuda Sadeh <yehuda@redhat.com>
Fri, 12 Feb 2016 00:12:51 +0000 (16:12 -0800)
add 'radosgw-admin bucket sync init' command

Signed-off-by: Yehuda Sadeh <yehuda@redhat.com>
src/rgw/rgw_admin.cc
src/rgw/rgw_bucket.cc
src/rgw/rgw_bucket.h
src/rgw/rgw_cr_rest.h
src/rgw/rgw_data_sync.cc
src/rgw/rgw_data_sync.h
src/rgw/rgw_rest_conn.cc
src/rgw/rgw_rest_conn.h

index 1f20e016d9a4ed093de8377d3f47aa1a9b7957d5..a7bd0df1d2a5800afad5d385ea0d4af3c2c9d013 100644 (file)
@@ -259,6 +259,9 @@ enum {
   OPT_BUCKET_UNLINK,
   OPT_BUCKET_STATS,
   OPT_BUCKET_CHECK,
+  OPT_BUCKET_SYNC_STATUS,
+  OPT_BUCKET_SYNC_INIT,
+  OPT_BUCKET_SYNC_RUN,
   OPT_BUCKET_RM,
   OPT_BUCKET_REWRITE,
   OPT_POLICY,
@@ -444,6 +447,18 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_
       return OPT_BUCKET_REWRITE;
     if (strcmp(cmd, "check") == 0)
       return OPT_BUCKET_CHECK;
+    if (strcmp(cmd, "sync") == 0) {
+      *need_more = true;
+      return 0;
+    }
+  } else if ((prev_prev_cmd && strcmp(prev_prev_cmd, "bucket") == 0) &&
+            (strcmp(prev_cmd, "sync") == 0)) {
+    if (strcmp(cmd, "status") == 0)
+      return OPT_BUCKET_SYNC_STATUS;
+    if (strcmp(cmd, "init") == 0)
+      return OPT_BUCKET_SYNC_INIT;
+    if (strcmp(cmd, "run") == 0)
+      return OPT_BUCKET_SYNC_RUN;
   } else if (strcmp(prev_cmd, "log") == 0) {
     if (strcmp(cmd, "list") == 0)
       return OPT_LOG_LIST;
@@ -3889,6 +3904,33 @@ next:
     }
   }
 
+  if (opt_cmd == OPT_BUCKET_SYNC_INIT) {
+    if (source_zone.empty()) {
+      cerr << "ERROR: source zone not specified" << std::endl;
+      return EINVAL;
+    }
+    if (bucket_name.empty()) {
+      cerr << "ERROR: bucket not specified" << std::endl;
+      return EINVAL;
+    }
+    if (bucket_id.empty()) {
+      cerr << "ERROR: bucket id specified" << std::endl;
+      return EINVAL;
+    }
+    RGWBucketSyncStatusManager sync(store, source_zone, bucket_name, bucket_id);
+
+    int ret = sync.init();
+    if (ret < 0) {
+      cerr << "ERROR: sync.init() returned ret=" << ret << std::endl;
+      return -ret;
+    }
+    ret = sync.init_sync_status();
+    if (ret < 0) {
+      cerr << "ERROR: sync.get_sync_status() returned ret=" << ret << std::endl;
+      return -ret;
+    }
+  }
+
   if (opt_cmd == OPT_BILOG_LIST) {
     if (bucket_name.empty()) {
       cerr << "ERROR: bucket not specified" << std::endl;
index 8959182365913a5a2db9201b54a696fc3cd42b47..5cae3851d04017cdb2c5958c8baa4eebe78e46bb 100644 (file)
@@ -1552,46 +1552,15 @@ void RGWDataChangesLog::ChangesRenewThread::stop()
   cond.Signal();
 }
 
-struct RGWBucketCompleteInfo {
-  RGWBucketInfo info;
-  map<string, bufferlist> attrs;
-
- void dump(Formatter *f) const {
-    encode_json("bucket_info", info, f);
-    encode_json("attrs", attrs, f);
-  }
-
-  void decode_json(JSONObj *obj) {
-    JSONDecoder::decode_json("bucket_info", info, obj);
-    JSONDecoder::decode_json("attrs", attrs, obj);
-  }
-};
-
-class RGWBucketEntryMetadataObject : public RGWMetadataObject {
-  RGWBucketEntryPoint ep;
-public:
-  RGWBucketEntryMetadataObject(RGWBucketEntryPoint& _ep, obj_version& v, time_t m) : ep(_ep) {
-    objv = v;
-    mtime = m;
-  }
-
-  void dump(Formatter *f) const {
-    ep.dump(f);
-  }
-};
-
-class RGWBucketInstanceMetadataObject : public RGWMetadataObject {
-  RGWBucketCompleteInfo info;
-public:
-  RGWBucketInstanceMetadataObject(RGWBucketCompleteInfo& i, obj_version& v, time_t m) : info(i) {
-    objv = v;
-    mtime = m;
-  }
+void RGWBucketCompleteInfo::dump(Formatter *f) const {
+  encode_json("bucket_info", info, f);
+  encode_json("attrs", attrs, f);
+}
 
-  void dump(Formatter *f) const {
-    info.dump(f);
-  }
-};
+void RGWBucketCompleteInfo::decode_json(JSONObj *obj) {
+  JSONDecoder::decode_json("bucket_info", info, obj);
+  JSONDecoder::decode_json("attrs", attrs, obj);
+}
 
 class RGWBucketMetadataHandler : public RGWMetadataHandler {
 
index a1800aa41b88d113d279ffaf47727496427f5347..65e40641e361d9a3fef953a1ba5f6b7e5455c417 100644 (file)
@@ -53,6 +53,47 @@ extern void rgw_parse_url_bucket(const string& bucket,
                                  const string& auth_tenant,
                                  string &tenant_name, string &bucket_name);
 
+struct RGWBucketCompleteInfo {
+  RGWBucketInfo info;
+  map<string, bufferlist> attrs;
+
+  void dump(Formatter *f) const;
+  void decode_json(JSONObj *obj);
+};
+
+class RGWBucketEntryMetadataObject : public RGWMetadataObject {
+  RGWBucketEntryPoint ep;
+public:
+  RGWBucketEntryMetadataObject(RGWBucketEntryPoint& _ep, obj_version& v, time_t m) : ep(_ep) {
+    objv = v;
+    mtime = m;
+  }
+
+  void dump(Formatter *f) const {
+    ep.dump(f);
+  }
+};
+
+class RGWBucketInstanceMetadataObject : public RGWMetadataObject {
+  RGWBucketCompleteInfo info;
+public:
+  RGWBucketInstanceMetadataObject() {}
+  RGWBucketInstanceMetadataObject(RGWBucketCompleteInfo& i, obj_version& v, time_t m) : info(i) {
+    objv = v;
+    mtime = m;
+  }
+
+  void dump(Formatter *f) const {
+    info.dump(f);
+  }
+
+  void decode_json(JSONObj *obj) {
+    info.decode_json(obj);
+  }
+
+  RGWBucketInfo& get_bucket_info() { return info.info; }
+};
+
 /**
  * Store a list of the user's buckets, with associated functinos.
  */
index 0ffec9bab0172fd56629ec568ab76fc04de5cb64..86347592dd0b15580a5ad3c410d64345863ab14e 100644 (file)
@@ -3,24 +3,34 @@
 
 #include "rgw_coroutine.h"
 
+#include <list>
+
 template <class T>
 class RGWReadRESTResourceCR : public RGWSimpleCoroutine {
   RGWRESTConn *conn;
   RGWHTTPManager *http_manager;
   string path;
-  rgw_http_param_pair *params;
+  std::list<pair<string, string> > params_list;
   T *result;
 
   RGWRESTReadResource *http_op;
 
 public:
   RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager,
-                       const string& _path, rgw_http_param_pair *_params,
+                       const string& _path, rgw_http_param_pair *params,
                        T *_result) : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
-                                      path(_path), params(_params), result(_result), http_op(NULL) {}
+                                      path(_path), result(_result), http_op(NULL) {
+     rgw_http_param_pair *pp = params;
+     while (pp && pp->key) {
+      string k = pp->key;
+      string v = (pp->val ? pp->val : "");
+      params_list.push_back(make_pair(k, v));
+      ++pp;
+    }
+  }
 
   int send_request() {
-    http_op = new RGWRESTReadResource(conn, path, params, NULL, http_manager);
+    http_op = new RGWRESTReadResource(conn, path, params_list, NULL, http_manager);
 
     http_op->set_user_info((void *)stack);
 
@@ -35,11 +45,12 @@ public:
 
   int request_complete() {
     int ret = http_op->wait(result);
-    http_op->put();
     if (ret < 0) {
       error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl;
+      http_op->put();
       return ret;
     }
+    http_op->put();
     return 0;
   }
 };
index a9d3892865af22ca8bb0fe7d69f6e455d4bba1c8..afc7ac0ca42ce6a0fc9c86ad24207615b953a548 100644 (file)
@@ -12,6 +12,7 @@
 #include "rgw_cr_rest.h"
 #include "rgw_http_client.h"
 #include "rgw_bucket.h"
+#include "rgw_metadata.h"
 
 #include "cls/lock/cls_lock_client.h"
 
@@ -24,6 +25,7 @@
 static string datalog_sync_status_oid = "datalog.sync-status";
 static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
 static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
+static string bucket_status_oid_prefix = "bucket.sync-status";
 
 void rgw_datalog_info::decode_json(JSONObj *obj) {
   JSONDecoder::decode_json("num_objects", num_shards, obj);
@@ -523,4 +525,287 @@ string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int s
   return string(buf);
 }
 
+int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name,
+                             const string& _bucket_id, int _shard_id)
+{
+  conn = _conn;
+  source_zone = _source_zone;
+  bucket_name = _bucket_name;
+  bucket_id = _bucket_id;
+  shard_id = _shard_id;
+
+  return 0;
+}
+
+struct bucket_instance_meta_info {
+  string key;
+  obj_version ver;
+  time_t mtime;
+  RGWBucketInstanceMetadataObject data;
+
+  bucket_instance_meta_info() : mtime(0) {}
+
+  void decode_json(JSONObj *obj) {
+    JSONDecoder::decode_json("key", key, obj);
+    JSONDecoder::decode_json("ver", ver, obj);
+    JSONDecoder::decode_json("mtime", mtime, obj);
+    JSONDecoder::decode_json("data", data, obj);
+  }
+};
+
+struct bucket_index_marker_info {
+  string bucket_ver;
+  string master_ver;
+  string max_marker;
+
+  void decode_json(JSONObj *obj) {
+    JSONDecoder::decode_json("bucket_ver", bucket_ver, obj);
+    JSONDecoder::decode_json("master_ver", master_ver, obj);
+    JSONDecoder::decode_json("max_marker", max_marker, obj);
+  }
+};
+
+class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
+  RGWRados *store;
+  RGWHTTPManager *http_manager;
+  RGWAsyncRadosProcessor *async_rados;
+
+  RGWRESTConn *conn;
+
+  string bucket_name;
+  string bucket_id;
+  int shard_id;
+
+  string instance_key;
+
+  bucket_index_marker_info *info;
+
+public:
+  RGWReadRemoteBucketIndexLogInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
+                                  RGWRESTConn *_conn,
+                                  const string& _bucket_name, const string& _bucket_id, int _shard_id,
+                                  bucket_index_marker_info *_info) : RGWCoroutine(_store->ctx()), store(_store),
+                                                      http_manager(_mgr),
+                                                     async_rados(_async_rados),
+                                                      conn(_conn),
+                                                      bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id),
+                                                      info(_info) {
+    instance_key = bucket_id;
+    if (shard_id > 0) {
+      char buf[16];
+      snprintf(buf, sizeof(buf), ":%d", shard_id);
+      instance_key.append(buf);
+    }
+  }
+
+  int operate() {
+    int ret;
+    reenter(this) {
+      yield {
+        rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
+                                       { "bucket", bucket_name.c_str() },
+                                       { "bucket-instance", instance_key.c_str() },
+                                       { "info" , NULL },
+                                       { NULL, NULL } };
+
+        string p = "/admin/log/";
+        ret = call(new RGWReadRESTResourceCR<bucket_index_marker_info>(store->ctx(), conn, http_manager, p, pairs, info));
+        if (ret < 0) {
+          return set_state(RGWCoroutine_Error, ret);
+        }
+      }
+      if (retcode < 0) {
+        return set_state(RGWCoroutine_Error, ret);
+      }
+      return set_state(RGWCoroutine_Done, 0);
+    }
+    return 0;
+  }
+};
+
+
+class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
+  RGWAsyncRadosProcessor *async_rados;
+  RGWRados *store;
+  RGWHTTPManager *http_manager;
+  RGWObjectCtx& obj_ctx;
+  string source_zone;
+  RGWRESTConn *conn;
+  string bucket_name;
+  string bucket_id;
+  int shard_id;
+
+  string sync_status_oid;
+
+  string lock_name;
+  string cookie;
+  rgw_bucket_shard_sync_info status;
+
+  bucket_index_marker_info info;
+public:
+  RGWInitBucketShardSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr,
+                     RGWObjectCtx& _obj_ctx, const string& _source_zone, RGWRESTConn *_conn,
+                      const string& _bucket_name, const string& _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store),
+                                                                                             http_manager(_http_mgr),
+                                                                                             obj_ctx(_obj_ctx), source_zone(_source_zone), conn(_conn),
+                                                                                             bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id) {
+    lock_name = "sync_lock";
+
+#define COOKIE_LEN 16
+    char buf[COOKIE_LEN + 1];
+
+    gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
+    string cookie = buf;
+
+    sync_status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id);
+  }
+
+  int operate() {
+    int ret;
+    reenter(this) {
+      yield {
+       uint32_t lock_duration = 30;
+       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+                                    lock_name, cookie, lock_duration));
+       if (retcode < 0) {
+         ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
+         return set_state(RGWCoroutine_Error, retcode);
+       }
+      }
+      yield {
+        call(new RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+                                sync_status_oid, status));
+      }
+      yield { /* take lock again, we just recreated the object */
+       uint32_t lock_duration = 30;
+       call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+                                    lock_name, cookie, lock_duration));
+       if (retcode < 0) {
+         ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
+         return set_state(RGWCoroutine_Error, retcode);
+       }
+      }
+      /* fetch current position in logs */
+      yield {
+        ret = call(new RGWReadRemoteBucketIndexLogInfoCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, &info));
+        if (ret < 0) {
+         ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
+          return set_state(RGWCoroutine_Error, ret);
+        }
+      }
+      if (retcode < 0 && retcode != -ENOENT) {
+        ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
+        return set_state(RGWCoroutine_Error, retcode);
+      }
+      yield {
+       status.state = rgw_bucket_shard_sync_info::StateFullSync;
+        status.marker.next_step_marker = info.max_marker;
+        call(new RGWSimpleRadosWriteCR<rgw_bucket_shard_sync_info>(async_rados, store, store->get_zone_params().log_pool,
+                                sync_status_oid, status));
+      }
+      yield { /* unlock */
+       call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid,
+                                    lock_name, cookie));
+      }
+      return set_state(RGWCoroutine_Done);
+    }
+    return 0;
+  }
+};
+
+RGWCoroutine *RGWRemoteBucketLog::init_sync_status(RGWObjectCtx& obj_ctx)
+{
+  return new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone,
+                                                   conn, bucket_name, bucket_id, shard_id);
+}
+
+RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
+  for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
+    delete iter->second;
+  }
+}
+
+int RGWBucketSyncStatusManager::init()
+{
+  map<string, RGWRESTConn *>::iterator iter = store->zone_conn_map.find(source_zone);
+  if (iter == store->zone_conn_map.end()) {
+    lderr(store->ctx()) << "no REST connection to master zone" << dendl;
+    return -EIO;
+  }
+
+  conn = iter->second;
+
+  async_rados = new RGWAsyncRadosProcessor(store, store->ctx()->_conf->rgw_num_async_rados_threads);
+  async_rados->start();
+
+  int ret = http_manager.set_threaded();
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
+    return ret;
+  }
+
+
+  string key = bucket_name + ":" + bucket_id;
+
+  rgw_http_param_pair pairs[] = { { "key", key.c_str() },
+                                  { NULL, NULL } };
+
+  string path = string("/admin/metadata/bucket.instance");
+
+  bucket_instance_meta_info result;
+  ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
+  if (ret < 0) {
+    ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
+    return ret;
+  }
+
+  RGWBucketInfo& bi = result.data.get_bucket_info();
+  num_shards = bi.num_shards;
+
+
+  int effective_num_shards = (num_shards ? num_shards : 1);
+
+  for (int i = 0; i < effective_num_shards; i++) {
+    RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
+    ret = l->init(source_zone, conn, bucket_name, bucket_id, (num_shards ? i : -1));
+    if (ret < 0) {
+      ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
+      return ret;
+    }
+    source_logs[i] = l;
+  }
+
+  return 0;
+}
+
+int RGWBucketSyncStatusManager::init_sync_status()
+{
+  RGWObjectCtx obj_ctx(store);
+
+  list<RGWCoroutinesStack *> stacks;
+
+  for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
+    RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
+    RGWRemoteBucketLog *l = iter->second;
+    int r = stack->call(l->init_sync_status(obj_ctx));
+    if (r < 0) {
+      ldout(store->ctx(), 0) << "ERROR: failed to init sync status for " << bucket_name << ":" << bucket_id << ":" << iter->first << dendl;
+    }
+
+    stacks.push_back(stack);
+  }
+
+  return cr_mgr.run(stacks);
+}
+
+string RGWBucketSyncStatusManager::status_oid(const string& source_zone, const string& bucket_name, const string& bucket_id, int shard_id)
+{
+  string oid = bucket_status_oid_prefix + "." + source_zone + ":" + bucket_name + ":" + bucket_id;
+  if (shard_id > 0) {
+    char buf[16];
+    snprintf(buf, sizeof(buf), ":%d", shard_id);
+    oid.append(buf);
+  }
+  return oid;
+}
 
index f2a075f6125297b5a473046e2edf4cfc7ee1eb2d..90a67e622e75ecfbe251bbd3b114c6d5c0ba75fa 100644 (file)
@@ -199,4 +199,188 @@ public:
   }
 };
 
+class RGWBucketSyncStatusManager;
+class RGWBucketSyncCR;
+
+struct rgw_bucket_shard_sync_marker {
+  enum SyncState {
+    FullSync = 0,
+    IncrementalSync = 1,
+  };
+  uint16_t state;
+  string marker;
+  string next_step_marker;
+
+  rgw_bucket_shard_sync_marker() : state(FullSync) {}
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(state, bl);
+    ::encode(marker, bl);
+    ::encode(next_step_marker, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+     DECODE_START(1, bl);
+    ::decode(state, bl);
+    ::decode(marker, bl);
+    ::decode(next_step_marker, bl);
+     DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const {
+    encode_json("state", (int)state, f);
+    encode_json("marker", marker, f);
+    encode_json("next_step_marker", next_step_marker, f);
+  }
+};
+WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_marker)
+
+struct rgw_bucket_shard_sync_info {
+  enum SyncState {
+    StateInit = 0,
+    StateFullSync = 1,
+    StateIncrementalSync = 2,
+  };
+
+  uint16_t state;
+  rgw_bucket_shard_sync_marker marker;
+
+  void encode(bufferlist& bl) const {
+    ENCODE_START(1, 1, bl);
+    ::encode(state, bl);
+    ::encode(marker, bl);
+    ENCODE_FINISH(bl);
+  }
+
+  void decode(bufferlist::iterator& bl) {
+     DECODE_START(1, bl);
+     ::decode(state, bl);
+     ::decode(marker, bl);
+     DECODE_FINISH(bl);
+  }
+
+  void dump(Formatter *f) const {
+    string s;
+    switch ((SyncState)state) {
+      case StateInit:
+       s = "init";
+       break;
+      case StateFullSync:
+       s = "full-sync";
+       break;
+      case StateIncrementalSync:
+       s = "incremental-sync";
+       break;
+      default:
+       s = "unknown";
+       break;
+    }
+    encode_json("status", s, f);
+    encode_json("marker", marker, f);
+  }
+
+  rgw_bucket_shard_sync_info() : state((int)StateInit) {}
+};
+WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info)
+
+
+class RGWRemoteBucketLog : public RGWCoroutinesManager {
+  RGWRados *store;
+  RGWRESTConn *conn;
+  string source_zone;
+  string bucket_name;
+  string bucket_id;
+  int shard_id;
+
+  RGWBucketSyncStatusManager *status_manager;
+  RGWAsyncRadosProcessor *async_rados;
+  RGWHTTPManager *http_manager;
+
+  RGWBucketSyncCR *sync_cr;
+
+public:
+  RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm,
+                     RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx()), store(_store),
+                                       conn(NULL), shard_id(0),
+                                       status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager),
+                                       sync_cr(NULL) {}
+
+  int init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name, const string& _bucket_id, int _shard_id);
+  void finish();
+
+#if 0
+  int read_log_info(rgw_datalog_info *log_info);
+  int get_sync_info();
+  int read_sync_status(rgw_data_sync_status *sync_status);
+#endif
+  RGWCoroutine *init_sync_status(RGWObjectCtx& obj_ctx);
+#if 0
+  int set_sync_info(const rgw_data_sync_info& sync_info);
+  int run_sync(int num_shards, rgw_data_sync_status& sync_status);
+#endif
+
+  void wakeup();
+};
+
+class RGWBucketSyncStatusManager {
+  RGWRados *store;
+  librados::IoCtx ioctx;
+
+  RGWCoroutinesManager cr_mgr;
+
+  RGWAsyncRadosProcessor *async_rados;
+  RGWHTTPManager http_manager;
+
+  string source_zone;
+  RGWRESTConn *conn;
+
+  string bucket_name;
+  string bucket_id;
+
+  map<int, RGWRemoteBucketLog *> source_logs;
+
+  string source_status_oid;
+  string source_shard_status_oid_prefix;
+  rgw_obj source_status_obj;
+
+  rgw_data_sync_status sync_status;
+  rgw_obj status_obj;
+
+  int num_shards;
+
+public:
+  RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone,
+                             const string& _bucket_name, const string& _bucket_id) : store(_store),
+                                                                                     cr_mgr(_store->ctx()),
+                                                                                     async_rados(NULL),
+                                                                                     http_manager(store->ctx(), cr_mgr.get_completion_mgr()),
+                                                                                     source_zone(_source_zone),
+                                                                                     conn(NULL),
+                                                                                     bucket_name(_bucket_name), bucket_id(_bucket_id),
+                                                                                     num_shards(0) {}
+  ~RGWBucketSyncStatusManager();
+
+  int init();
+
+  rgw_data_sync_status& get_sync_status() { return sync_status; }
+  int init_sync_status();
+
+  static string status_oid(const string& source_zone, const string& bucket_name, const string& bucket_id, int shard_id);
+#if 0
+
+  int read_sync_status() { return source_log.read_sync_status(&sync_status); }
+  int init_sync_status() { return source_log.init_sync_status(num_shards); }
+
+  int run() { return source_log.run_sync(num_shards, sync_status); }
+
+  void wakeup() { return source_log.wakeup(); }
+  void stop() {
+    source_log.finish();
+  }
+#endif
+};
+
+
 #endif
index 379cd87fed45a238be5d941cf0f415d16b658726..54259347ec976983aef112a49322210df505b216 100644 (file)
@@ -211,7 +211,23 @@ RGWRESTReadResource::RGWRESTReadResource(RGWRESTConn *_conn,
     params.push_back(make_pair(k, v));
     ++pp;
   }
+  init_common(extra_headers);
+}
+
+RGWRESTReadResource::RGWRESTReadResource(RGWRESTConn *_conn,
+                                         const string& _resource,
+                                        list<pair<string, string> >& _params,
+                                         list<pair<string, string> > *extra_headers,
+                                         RGWHTTPManager *_mgr) : cct(_conn->get_ctx()), conn(_conn), resource(_resource), cb(bl),
+                                                                 mgr(_mgr), req(cct, conn->get_url(), &cb, NULL, NULL) {
+  for (list<pair<string, string> >::iterator iter = _params.begin(); iter != _params.end(); ++iter) {
+    params.push_back(*iter);
+  }
+  init_common(extra_headers);
+}
 
+void RGWRESTReadResource::init_common(list<pair<string, string> > *extra_headers)
+{
   params.push_back(pair<string, string>(RGW_SYS_PARAM_PREFIX "zonegroup", conn->get_zonegroup()));
 
   if (extra_headers) {
index 0da8974627daa2aae0e3aecd1aa4202be011e7c1..d80dfe7869f6c85e14ddabb4bd8952c17287e4a4 100644 (file)
@@ -139,6 +139,8 @@ class RGWRESTReadResource : public RefCountedObject {
   RGWHTTPManager *mgr;
   RGWRESTStreamReadRequest req;
 
+  void init_common(list<pair<string, string> > *extra_headers);
+
 public:
   RGWRESTReadResource(RGWRESTConn *_conn,
                      const string& _resource,
@@ -146,6 +148,12 @@ public:
                      list<pair<string, string> > *extra_headers,
                      RGWHTTPManager *_mgr);
 
+  RGWRESTReadResource(RGWRESTConn *_conn,
+                     const string& _resource,
+                     list<pair<string, string> >& _params,
+                     list<pair<string, string> > *extra_headers,
+                     RGWHTTPManager *_mgr);
+
   void set_user_info(void *user_info) {
     req.set_user_info(user_info);
   }