From f0e351d450e726c62f44e458be90adf74f8dc690 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Wed, 16 Sep 2015 15:47:54 -0700 Subject: [PATCH] rgw: per-bucket instance shard state add 'radosgw-admin bucket sync init' command Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 42 ++++++ src/rgw/rgw_bucket.cc | 47 ++----- src/rgw/rgw_bucket.h | 41 ++++++ src/rgw/rgw_cr_rest.h | 21 ++- src/rgw/rgw_data_sync.cc | 285 +++++++++++++++++++++++++++++++++++++++ src/rgw/rgw_data_sync.h | 184 +++++++++++++++++++++++++ src/rgw/rgw_rest_conn.cc | 16 +++ src/rgw/rgw_rest_conn.h | 8 ++ 8 files changed, 600 insertions(+), 44 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 1f20e016d9a4e..a7bd0df1d2a58 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -259,6 +259,9 @@ enum { OPT_BUCKET_UNLINK, OPT_BUCKET_STATS, OPT_BUCKET_CHECK, + OPT_BUCKET_SYNC_STATUS, + OPT_BUCKET_SYNC_INIT, + OPT_BUCKET_SYNC_RUN, OPT_BUCKET_RM, OPT_BUCKET_REWRITE, OPT_POLICY, @@ -444,6 +447,18 @@ static int get_cmd(const char *cmd, const char *prev_cmd, const char *prev_prev_ return OPT_BUCKET_REWRITE; if (strcmp(cmd, "check") == 0) return OPT_BUCKET_CHECK; + if (strcmp(cmd, "sync") == 0) { + *need_more = true; + return 0; + } + } else if ((prev_prev_cmd && strcmp(prev_prev_cmd, "bucket") == 0) && + (strcmp(prev_cmd, "sync") == 0)) { + if (strcmp(cmd, "status") == 0) + return OPT_BUCKET_SYNC_STATUS; + if (strcmp(cmd, "init") == 0) + return OPT_BUCKET_SYNC_INIT; + if (strcmp(cmd, "run") == 0) + return OPT_BUCKET_SYNC_RUN; } else if (strcmp(prev_cmd, "log") == 0) { if (strcmp(cmd, "list") == 0) return OPT_LOG_LIST; @@ -3889,6 +3904,33 @@ next: } } + if (opt_cmd == OPT_BUCKET_SYNC_INIT) { + if (source_zone.empty()) { + cerr << "ERROR: source zone not specified" << std::endl; + return EINVAL; + } + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return EINVAL; + } + if (bucket_id.empty()) { + cerr << "ERROR: bucket id specified" << std::endl; + return EINVAL; + } + RGWBucketSyncStatusManager sync(store, source_zone, bucket_name, bucket_id); + + int ret = sync.init(); + if (ret < 0) { + cerr << "ERROR: sync.init() returned ret=" << ret << std::endl; + return -ret; + } + ret = sync.init_sync_status(); + if (ret < 0) { + cerr << "ERROR: sync.get_sync_status() returned ret=" << ret << std::endl; + return -ret; + } + } + if (opt_cmd == OPT_BILOG_LIST) { if (bucket_name.empty()) { cerr << "ERROR: bucket not specified" << std::endl; diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 8959182365913..5cae3851d0401 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -1552,46 +1552,15 @@ void RGWDataChangesLog::ChangesRenewThread::stop() cond.Signal(); } -struct RGWBucketCompleteInfo { - RGWBucketInfo info; - map attrs; - - void dump(Formatter *f) const { - encode_json("bucket_info", info, f); - encode_json("attrs", attrs, f); - } - - void decode_json(JSONObj *obj) { - JSONDecoder::decode_json("bucket_info", info, obj); - JSONDecoder::decode_json("attrs", attrs, obj); - } -}; - -class RGWBucketEntryMetadataObject : public RGWMetadataObject { - RGWBucketEntryPoint ep; -public: - RGWBucketEntryMetadataObject(RGWBucketEntryPoint& _ep, obj_version& v, time_t m) : ep(_ep) { - objv = v; - mtime = m; - } - - void dump(Formatter *f) const { - ep.dump(f); - } -}; - -class RGWBucketInstanceMetadataObject : public RGWMetadataObject { - RGWBucketCompleteInfo info; -public: - RGWBucketInstanceMetadataObject(RGWBucketCompleteInfo& i, obj_version& v, time_t m) : info(i) { - objv = v; - mtime = m; - } +void RGWBucketCompleteInfo::dump(Formatter *f) const { + encode_json("bucket_info", info, f); + encode_json("attrs", attrs, f); +} - void dump(Formatter *f) const { - info.dump(f); - } -}; +void RGWBucketCompleteInfo::decode_json(JSONObj *obj) { + JSONDecoder::decode_json("bucket_info", info, obj); + JSONDecoder::decode_json("attrs", attrs, obj); +} class RGWBucketMetadataHandler : public RGWMetadataHandler { diff --git a/src/rgw/rgw_bucket.h b/src/rgw/rgw_bucket.h index a1800aa41b88d..65e40641e361d 100644 --- a/src/rgw/rgw_bucket.h +++ b/src/rgw/rgw_bucket.h @@ -53,6 +53,47 @@ extern void rgw_parse_url_bucket(const string& bucket, const string& auth_tenant, string &tenant_name, string &bucket_name); +struct RGWBucketCompleteInfo { + RGWBucketInfo info; + map attrs; + + void dump(Formatter *f) const; + void decode_json(JSONObj *obj); +}; + +class RGWBucketEntryMetadataObject : public RGWMetadataObject { + RGWBucketEntryPoint ep; +public: + RGWBucketEntryMetadataObject(RGWBucketEntryPoint& _ep, obj_version& v, time_t m) : ep(_ep) { + objv = v; + mtime = m; + } + + void dump(Formatter *f) const { + ep.dump(f); + } +}; + +class RGWBucketInstanceMetadataObject : public RGWMetadataObject { + RGWBucketCompleteInfo info; +public: + RGWBucketInstanceMetadataObject() {} + RGWBucketInstanceMetadataObject(RGWBucketCompleteInfo& i, obj_version& v, time_t m) : info(i) { + objv = v; + mtime = m; + } + + void dump(Formatter *f) const { + info.dump(f); + } + + void decode_json(JSONObj *obj) { + info.decode_json(obj); + } + + RGWBucketInfo& get_bucket_info() { return info.info; } +}; + /** * Store a list of the user's buckets, with associated functinos. */ diff --git a/src/rgw/rgw_cr_rest.h b/src/rgw/rgw_cr_rest.h index 0ffec9bab0172..86347592dd0b1 100644 --- a/src/rgw/rgw_cr_rest.h +++ b/src/rgw/rgw_cr_rest.h @@ -3,24 +3,34 @@ #include "rgw_coroutine.h" +#include + template class RGWReadRESTResourceCR : public RGWSimpleCoroutine { RGWRESTConn *conn; RGWHTTPManager *http_manager; string path; - rgw_http_param_pair *params; + std::list > params_list; T *result; RGWRESTReadResource *http_op; public: RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, RGWHTTPManager *_http_manager, - const string& _path, rgw_http_param_pair *_params, + const string& _path, rgw_http_param_pair *params, T *_result) : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), - path(_path), params(_params), result(_result), http_op(NULL) {} + path(_path), result(_result), http_op(NULL) { + rgw_http_param_pair *pp = params; + while (pp && pp->key) { + string k = pp->key; + string v = (pp->val ? pp->val : ""); + params_list.push_back(make_pair(k, v)); + ++pp; + } + } int send_request() { - http_op = new RGWRESTReadResource(conn, path, params, NULL, http_manager); + http_op = new RGWRESTReadResource(conn, path, params_list, NULL, http_manager); http_op->set_user_info((void *)stack); @@ -35,11 +45,12 @@ public: int request_complete() { int ret = http_op->wait(result); - http_op->put(); if (ret < 0) { error_stream << "http operation failed: " << http_op->to_str() << " status=" << http_op->get_http_status() << std::endl; + http_op->put(); return ret; } + http_op->put(); return 0; } }; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index a9d3892865af2..afc7ac0ca42ce 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -12,6 +12,7 @@ #include "rgw_cr_rest.h" #include "rgw_http_client.h" #include "rgw_bucket.h" +#include "rgw_metadata.h" #include "cls/lock/cls_lock_client.h" @@ -24,6 +25,7 @@ static string datalog_sync_status_oid = "datalog.sync-status"; static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard"; static string datalog_sync_full_sync_index_prefix = "data.full-sync.index"; +static string bucket_status_oid_prefix = "bucket.sync-status"; void rgw_datalog_info::decode_json(JSONObj *obj) { JSONDecoder::decode_json("num_objects", num_shards, obj); @@ -523,4 +525,287 @@ string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int s return string(buf); } +int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name, + const string& _bucket_id, int _shard_id) +{ + conn = _conn; + source_zone = _source_zone; + bucket_name = _bucket_name; + bucket_id = _bucket_id; + shard_id = _shard_id; + + return 0; +} + +struct bucket_instance_meta_info { + string key; + obj_version ver; + time_t mtime; + RGWBucketInstanceMetadataObject data; + + bucket_instance_meta_info() : mtime(0) {} + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("key", key, obj); + JSONDecoder::decode_json("ver", ver, obj); + JSONDecoder::decode_json("mtime", mtime, obj); + JSONDecoder::decode_json("data", data, obj); + } +}; + +struct bucket_index_marker_info { + string bucket_ver; + string master_ver; + string max_marker; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("bucket_ver", bucket_ver, obj); + JSONDecoder::decode_json("master_ver", master_ver, obj); + JSONDecoder::decode_json("max_marker", max_marker, obj); + } +}; + +class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http_manager; + RGWAsyncRadosProcessor *async_rados; + + RGWRESTConn *conn; + + string bucket_name; + string bucket_id; + int shard_id; + + string instance_key; + + bucket_index_marker_info *info; + +public: + RGWReadRemoteBucketIndexLogInfoCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + RGWRESTConn *_conn, + const string& _bucket_name, const string& _bucket_id, int _shard_id, + bucket_index_marker_info *_info) : RGWCoroutine(_store->ctx()), store(_store), + http_manager(_mgr), + async_rados(_async_rados), + conn(_conn), + bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), + info(_info) { + instance_key = bucket_id; + if (shard_id > 0) { + char buf[16]; + snprintf(buf, sizeof(buf), ":%d", shard_id); + instance_key.append(buf); + } + } + + int operate() { + int ret; + reenter(this) { + yield { + rgw_http_param_pair pairs[] = { { "type" , "bucket-index" }, + { "bucket", bucket_name.c_str() }, + { "bucket-instance", instance_key.c_str() }, + { "info" , NULL }, + { NULL, NULL } }; + + string p = "/admin/log/"; + ret = call(new RGWReadRESTResourceCR(store->ctx(), conn, http_manager, p, pairs, info)); + if (ret < 0) { + return set_state(RGWCoroutine_Error, ret); + } + } + if (retcode < 0) { + return set_state(RGWCoroutine_Error, ret); + } + return set_state(RGWCoroutine_Done, 0); + } + return 0; + } +}; + + +class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + RGWHTTPManager *http_manager; + RGWObjectCtx& obj_ctx; + string source_zone; + RGWRESTConn *conn; + string bucket_name; + string bucket_id; + int shard_id; + + string sync_status_oid; + + string lock_name; + string cookie; + rgw_bucket_shard_sync_info status; + + bucket_index_marker_info info; +public: + RGWInitBucketShardSyncStatusCoroutine(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, RGWHTTPManager *_http_mgr, + RGWObjectCtx& _obj_ctx, const string& _source_zone, RGWRESTConn *_conn, + const string& _bucket_name, const string& _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), async_rados(_async_rados), store(_store), + http_manager(_http_mgr), + obj_ctx(_obj_ctx), source_zone(_source_zone), conn(_conn), + bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id) { + lock_name = "sync_lock"; + +#define COOKIE_LEN 16 + char buf[COOKIE_LEN + 1]; + + gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1); + string cookie = buf; + + sync_status_oid = RGWBucketSyncStatusManager::status_oid(source_zone, bucket_name, bucket_id, shard_id); + } + + int operate() { + int ret; + reenter(this) { + yield { + uint32_t lock_duration = 30; + call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid, + lock_name, cookie, lock_duration)); + if (retcode < 0) { + ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + } + yield { + call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + sync_status_oid, status)); + } + yield { /* take lock again, we just recreated the object */ + uint32_t lock_duration = 30; + call(new RGWSimpleRadosLockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid, + lock_name, cookie, lock_duration)); + if (retcode < 0) { + ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + } + /* fetch current position in logs */ + yield { + ret = call(new RGWReadRemoteBucketIndexLogInfoCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, &info)); + if (ret < 0) { + ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl; + return set_state(RGWCoroutine_Error, ret); + } + } + if (retcode < 0 && retcode != -ENOENT) { + ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + yield { + status.state = rgw_bucket_shard_sync_info::StateFullSync; + status.marker.next_step_marker = info.max_marker; + call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + sync_status_oid, status)); + } + yield { /* unlock */ + call(new RGWSimpleRadosUnlockCR(async_rados, store, store->get_zone_params().log_pool, sync_status_oid, + lock_name, cookie)); + } + return set_state(RGWCoroutine_Done); + } + return 0; + } +}; + +RGWCoroutine *RGWRemoteBucketLog::init_sync_status(RGWObjectCtx& obj_ctx) +{ + return new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, + conn, bucket_name, bucket_id, shard_id); +} + +RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() { + for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { + delete iter->second; + } +} + +int RGWBucketSyncStatusManager::init() +{ + map::iterator iter = store->zone_conn_map.find(source_zone); + if (iter == store->zone_conn_map.end()) { + lderr(store->ctx()) << "no REST connection to master zone" << dendl; + return -EIO; + } + + conn = iter->second; + + async_rados = new RGWAsyncRadosProcessor(store, store->ctx()->_conf->rgw_num_async_rados_threads); + async_rados->start(); + + int ret = http_manager.set_threaded(); + if (ret < 0) { + ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl; + return ret; + } + + + string key = bucket_name + ":" + bucket_id; + + rgw_http_param_pair pairs[] = { { "key", key.c_str() }, + { NULL, NULL } }; + + string path = string("/admin/metadata/bucket.instance"); + + bucket_instance_meta_info result; + ret = cr_mgr.run(new RGWReadRESTResourceCR(store->ctx(), conn, &http_manager, path, pairs, &result)); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl; + return ret; + } + + RGWBucketInfo& bi = result.data.get_bucket_info(); + num_shards = bi.num_shards; + + + int effective_num_shards = (num_shards ? num_shards : 1); + + for (int i = 0; i < effective_num_shards; i++) { + RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager); + ret = l->init(source_zone, conn, bucket_name, bucket_id, (num_shards ? i : -1)); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl; + return ret; + } + source_logs[i] = l; + } + + return 0; +} + +int RGWBucketSyncStatusManager::init_sync_status() +{ + RGWObjectCtx obj_ctx(store); + + list stacks; + + for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { + RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); + RGWRemoteBucketLog *l = iter->second; + int r = stack->call(l->init_sync_status(obj_ctx)); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to init sync status for " << bucket_name << ":" << bucket_id << ":" << iter->first << dendl; + } + + stacks.push_back(stack); + } + + return cr_mgr.run(stacks); +} + +string RGWBucketSyncStatusManager::status_oid(const string& source_zone, const string& bucket_name, const string& bucket_id, int shard_id) +{ + string oid = bucket_status_oid_prefix + "." + source_zone + ":" + bucket_name + ":" + bucket_id; + if (shard_id > 0) { + char buf[16]; + snprintf(buf, sizeof(buf), ":%d", shard_id); + oid.append(buf); + } + return oid; +} diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index f2a075f612529..90a67e622e75e 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -199,4 +199,188 @@ public: } }; +class RGWBucketSyncStatusManager; +class RGWBucketSyncCR; + +struct rgw_bucket_shard_sync_marker { + enum SyncState { + FullSync = 0, + IncrementalSync = 1, + }; + uint16_t state; + string marker; + string next_step_marker; + + rgw_bucket_shard_sync_marker() : state(FullSync) {} + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(state, bl); + ::encode(marker, bl); + ::encode(next_step_marker, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(state, bl); + ::decode(marker, bl); + ::decode(next_step_marker, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const { + encode_json("state", (int)state, f); + encode_json("marker", marker, f); + encode_json("next_step_marker", next_step_marker, f); + } +}; +WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_marker) + +struct rgw_bucket_shard_sync_info { + enum SyncState { + StateInit = 0, + StateFullSync = 1, + StateIncrementalSync = 2, + }; + + uint16_t state; + rgw_bucket_shard_sync_marker marker; + + void encode(bufferlist& bl) const { + ENCODE_START(1, 1, bl); + ::encode(state, bl); + ::encode(marker, bl); + ENCODE_FINISH(bl); + } + + void decode(bufferlist::iterator& bl) { + DECODE_START(1, bl); + ::decode(state, bl); + ::decode(marker, bl); + DECODE_FINISH(bl); + } + + void dump(Formatter *f) const { + string s; + switch ((SyncState)state) { + case StateInit: + s = "init"; + break; + case StateFullSync: + s = "full-sync"; + break; + case StateIncrementalSync: + s = "incremental-sync"; + break; + default: + s = "unknown"; + break; + } + encode_json("status", s, f); + encode_json("marker", marker, f); + } + + rgw_bucket_shard_sync_info() : state((int)StateInit) {} +}; +WRITE_CLASS_ENCODER(rgw_bucket_shard_sync_info) + + +class RGWRemoteBucketLog : public RGWCoroutinesManager { + RGWRados *store; + RGWRESTConn *conn; + string source_zone; + string bucket_name; + string bucket_id; + int shard_id; + + RGWBucketSyncStatusManager *status_manager; + RGWAsyncRadosProcessor *async_rados; + RGWHTTPManager *http_manager; + + RGWBucketSyncCR *sync_cr; + +public: + RGWRemoteBucketLog(RGWRados *_store, RGWBucketSyncStatusManager *_sm, + RGWAsyncRadosProcessor *_async_rados, RGWHTTPManager *_http_manager) : RGWCoroutinesManager(_store->ctx()), store(_store), + conn(NULL), shard_id(0), + status_manager(_sm), async_rados(_async_rados), http_manager(_http_manager), + sync_cr(NULL) {} + + int init(const string& _source_zone, RGWRESTConn *_conn, const string& _bucket_name, const string& _bucket_id, int _shard_id); + void finish(); + +#if 0 + int read_log_info(rgw_datalog_info *log_info); + int get_sync_info(); + int read_sync_status(rgw_data_sync_status *sync_status); +#endif + RGWCoroutine *init_sync_status(RGWObjectCtx& obj_ctx); +#if 0 + int set_sync_info(const rgw_data_sync_info& sync_info); + int run_sync(int num_shards, rgw_data_sync_status& sync_status); +#endif + + void wakeup(); +}; + +class RGWBucketSyncStatusManager { + RGWRados *store; + librados::IoCtx ioctx; + + RGWCoroutinesManager cr_mgr; + + RGWAsyncRadosProcessor *async_rados; + RGWHTTPManager http_manager; + + string source_zone; + RGWRESTConn *conn; + + string bucket_name; + string bucket_id; + + map source_logs; + + string source_status_oid; + string source_shard_status_oid_prefix; + rgw_obj source_status_obj; + + rgw_data_sync_status sync_status; + rgw_obj status_obj; + + int num_shards; + +public: + RGWBucketSyncStatusManager(RGWRados *_store, const string& _source_zone, + const string& _bucket_name, const string& _bucket_id) : store(_store), + cr_mgr(_store->ctx()), + async_rados(NULL), + http_manager(store->ctx(), cr_mgr.get_completion_mgr()), + source_zone(_source_zone), + conn(NULL), + bucket_name(_bucket_name), bucket_id(_bucket_id), + num_shards(0) {} + ~RGWBucketSyncStatusManager(); + + int init(); + + rgw_data_sync_status& get_sync_status() { return sync_status; } + int init_sync_status(); + + static string status_oid(const string& source_zone, const string& bucket_name, const string& bucket_id, int shard_id); +#if 0 + + int read_sync_status() { return source_log.read_sync_status(&sync_status); } + int init_sync_status() { return source_log.init_sync_status(num_shards); } + + int run() { return source_log.run_sync(num_shards, sync_status); } + + void wakeup() { return source_log.wakeup(); } + void stop() { + source_log.finish(); + } +#endif +}; + + #endif diff --git a/src/rgw/rgw_rest_conn.cc b/src/rgw/rgw_rest_conn.cc index 379cd87fed45a..54259347ec976 100644 --- a/src/rgw/rgw_rest_conn.cc +++ b/src/rgw/rgw_rest_conn.cc @@ -211,7 +211,23 @@ RGWRESTReadResource::RGWRESTReadResource(RGWRESTConn *_conn, params.push_back(make_pair(k, v)); ++pp; } + init_common(extra_headers); +} + +RGWRESTReadResource::RGWRESTReadResource(RGWRESTConn *_conn, + const string& _resource, + list >& _params, + list > *extra_headers, + RGWHTTPManager *_mgr) : cct(_conn->get_ctx()), conn(_conn), resource(_resource), cb(bl), + mgr(_mgr), req(cct, conn->get_url(), &cb, NULL, NULL) { + for (list >::iterator iter = _params.begin(); iter != _params.end(); ++iter) { + params.push_back(*iter); + } + init_common(extra_headers); +} +void RGWRESTReadResource::init_common(list > *extra_headers) +{ params.push_back(pair(RGW_SYS_PARAM_PREFIX "zonegroup", conn->get_zonegroup())); if (extra_headers) { diff --git a/src/rgw/rgw_rest_conn.h b/src/rgw/rgw_rest_conn.h index 0da8974627daa..d80dfe7869f6c 100644 --- a/src/rgw/rgw_rest_conn.h +++ b/src/rgw/rgw_rest_conn.h @@ -139,6 +139,8 @@ class RGWRESTReadResource : public RefCountedObject { RGWHTTPManager *mgr; RGWRESTStreamReadRequest req; + void init_common(list > *extra_headers); + public: RGWRESTReadResource(RGWRESTConn *_conn, const string& _resource, @@ -146,6 +148,12 @@ public: list > *extra_headers, RGWHTTPManager *_mgr); + RGWRESTReadResource(RGWRESTConn *_conn, + const string& _resource, + list >& _params, + list > *extra_headers, + RGWHTTPManager *_mgr); + void set_user_info(void *user_info) { req.set_user_info(user_info); } -- 2.39.5