From 442edc8256c4c9dde8e5a7d36e2d8aecb9c54cd8 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 17 Sep 2015 15:01:17 -0700 Subject: [PATCH] rgw: list bucket instance shard entries for data sync Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_admin.cc | 28 ++++++ src/rgw/rgw_data_sync.cc | 212 ++++++++++++++++++++++++++++++++++++++- src/rgw/rgw_data_sync.h | 10 +- 3 files changed, 241 insertions(+), 9 deletions(-) diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 12389484657c3..77afd45c5c9a3 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -3963,6 +3963,34 @@ next: formatter->flush(cout); } + if (opt_cmd == OPT_BUCKET_SYNC_RUN) { + if (source_zone.empty()) { + cerr << "ERROR: source zone not specified" << std::endl; + return EINVAL; + } + if (bucket_name.empty()) { + cerr << "ERROR: bucket not specified" << std::endl; + return EINVAL; + } + if (bucket_id.empty()) { + cerr << "ERROR: bucket id specified" << std::endl; + return EINVAL; + } + RGWBucketSyncStatusManager sync(store, source_zone, bucket_name, bucket_id); + + int ret = sync.init(); + if (ret < 0) { + cerr << "ERROR: sync.init() returned ret=" << ret << std::endl; + return -ret; + } + + ret = sync.run(); + if (ret < 0) { + cerr << "ERROR: sync.run() returned ret=" << ret << std::endl; + return -ret; + } + } + if (opt_cmd == OPT_BILOG_LIST) { if (bucket_name.empty()) { cerr << "ERROR: bucket not specified" << std::endl; diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index 9fe7c2a2ec682..b8aedfb92ba4d 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -712,7 +712,7 @@ public: } }; -RGWCoroutine *RGWRemoteBucketLog::init_sync_status(RGWObjectCtx& obj_ctx) +RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr(RGWObjectCtx& obj_ctx) { return new RGWInitBucketShardSyncStatusCoroutine(async_rados, store, http_manager, obj_ctx, source_zone, conn, bucket_name, bucket_id, shard_id); @@ -730,7 +730,7 @@ public: status) {} }; -RGWCoroutine *RGWRemoteBucketLog::read_sync_status(RGWObjectCtx& obj_ctx, rgw_bucket_shard_sync_info *sync_status) +RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(RGWObjectCtx& obj_ctx, rgw_bucket_shard_sync_info *sync_status) { return new RGWReadBucketSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, bucket_name, bucket_id, shard_id, sync_status); @@ -742,6 +742,184 @@ RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() { } } + +struct bucket_entry_owner { + string id; + string display_name; + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("ID", id, obj); + JSONDecoder::decode_json("DisplayName", display_name, obj); + } +}; + +struct bucket_list_entry { + bool delete_marker; + string key; + string version_id; + bool is_latest; + utime_t mtime; + string etag; + uint64_t size; + string storage_class; + bucket_entry_owner owner; + + bucket_list_entry() : delete_marker(false), is_latest(false), size(0) {} + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj); + JSONDecoder::decode_json("Key", key, obj); + JSONDecoder::decode_json("VersionId", version_id, obj); + JSONDecoder::decode_json("IsLatest", is_latest, obj); + JSONDecoder::decode_json("LastModified", mtime, obj); + JSONDecoder::decode_json("ETag", etag, obj); + JSONDecoder::decode_json("Size", size, obj); + JSONDecoder::decode_json("StorageClass", storage_class, obj); + JSONDecoder::decode_json("Owner", owner, obj); + } +}; + +struct bucket_list_result { + string name; + string prefix; + string key_marker; + string version_id_marker; + int max_keys; + bool is_truncated; + list contents; + + bucket_list_result() : max_keys(0), is_truncated(false) {} + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("Name", name, obj); + JSONDecoder::decode_json("Prefix", prefix, obj); + JSONDecoder::decode_json("KeyMarker", key_marker, obj); + JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj); + JSONDecoder::decode_json("MaxKeys", max_keys, obj); + JSONDecoder::decode_json("IsTruncated", max_keys, obj); + } +}; + +class RGWListBucketShardCR: public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http_manager; + RGWAsyncRadosProcessor *async_rados; + + RGWRESTConn *conn; + + string bucket_name; + string bucket_id; + int shard_id; + + string instance_key; + + bucket_list_result *result; + +public: + RGWListBucketShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + RGWRESTConn *_conn, + const string& _bucket_name, const string& _bucket_id, int _shard_id, + bucket_list_result *_result) : RGWCoroutine(_store->ctx()), store(_store), + http_manager(_mgr), + async_rados(_async_rados), + conn(_conn), + bucket_name(_bucket_name), bucket_id(_bucket_id), shard_id(_shard_id), + result(_result) { + instance_key = bucket_name + ":" + bucket_id; + if (shard_id > 0) { + char buf[16]; + snprintf(buf, sizeof(buf), ":%d", shard_id); + instance_key.append(buf); + } + } + + int operate() { + int ret; + reenter(this) { + yield { + rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() }, + { "versions" , NULL }, + { "format" , "json" }, + { NULL, NULL } }; + + string p = string("/") + bucket_name; + ret = call(new RGWReadRESTResourceCR(store->ctx(), conn, http_manager, p, pairs, result)); + if (ret < 0) { + return set_state(RGWCoroutine_Error, ret); + } + } + if (retcode < 0) { + return set_state(RGWCoroutine_Error, retcode); + } + return set_state(RGWCoroutine_Done, 0); + } + return 0; + } +}; + +class RGWRunBucketSyncCoroutine : public RGWCoroutine { + RGWHTTPManager *http_manager; + RGWAsyncRadosProcessor *async_rados; + RGWRESTConn *conn; + RGWRados *store; + RGWObjectCtx& obj_ctx; + string source_zone; + string bucket_name; + string bucket_id; + int shard_id; + bucket_list_result list_result; + rgw_bucket_shard_sync_info sync_status; + +public: + RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + RGWRESTConn *_conn, RGWRados *_store, + RGWObjectCtx& _obj_ctx, const string& _source_zone, + const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), + http_manager(_mgr), async_rados(_async_rados), conn(_conn), + store(_store), + obj_ctx(_obj_ctx), source_zone(_source_zone), + bucket_name(_bucket_name), + bucket_id(_bucket_id), shard_id(_shard_id) {} + int operate(); +}; + + +int RGWRunBucketSyncCoroutine::operate() +{ + reenter(this) { + yield { + int r = call(new RGWReadBucketSyncStatusCoroutine(async_rados, store, obj_ctx, source_zone, bucket_name, bucket_id, shard_id, &sync_status)); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch sync status" << dendl; + return r; + } + } + + if (retcode < 0 && retcode != -ENOENT) { + ldout(store->ctx(), 0) << "ERROR: failed to read sync status for bucket=" << bucket_name << " bucket_id=" << bucket_id << " shard_id=" << shard_id << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + + yield { + if ((rgw_bucket_shard_sync_info::SyncState)sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) { + ldout(store->ctx(), 20) << __func__ << "(): building full sync maps" << dendl; + int r = call(new RGWListBucketShardCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id, &list_result)); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to call new CR (RGWListBucketShardCR)" << dendl; + return r; + } + } + } + } + + return 0; +} + +RGWCoroutine *RGWRemoteBucketLog::run_sync_cr(RGWObjectCtx& obj_ctx) +{ + return new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, obj_ctx, source_zone, bucket_name, bucket_id, shard_id); +} + int RGWBucketSyncStatusManager::init() { map::iterator iter = store->zone_conn_map.find(source_zone); @@ -804,7 +982,7 @@ int RGWBucketSyncStatusManager::init_sync_status() for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); RGWRemoteBucketLog *l = iter->second; - int r = stack->call(l->init_sync_status(obj_ctx)); + int r = stack->call(l->init_sync_status_cr(obj_ctx)); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: failed to init sync status for " << bucket_name << ":" << bucket_id << ":" << iter->first << dendl; } @@ -824,7 +1002,33 @@ int RGWBucketSyncStatusManager::read_sync_status() for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); RGWRemoteBucketLog *l = iter->second; - int r = stack->call(l->read_sync_status(obj_ctx, &sync_status[iter->first])); + int r = stack->call(l->read_sync_status_cr(obj_ctx, &sync_status[iter->first])); + if (r < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " << bucket_name << ":" << bucket_id << ":" << iter->first << dendl; + } + + stacks.push_back(stack); + } + + int ret = cr_mgr.run(stacks); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " << bucket_name << ":" << bucket_id << dendl; + return ret; + } + + return 0; +} + +int RGWBucketSyncStatusManager::run() +{ + RGWObjectCtx obj_ctx(store); + + list stacks; + + for (map::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) { + RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr); + RGWRemoteBucketLog *l = iter->second; + int r = stack->call(l->run_sync_cr(obj_ctx)); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: failed to read sync status for " << bucket_name << ":" << bucket_id << ":" << iter->first << dendl; } diff --git a/src/rgw/rgw_data_sync.h b/src/rgw/rgw_data_sync.h index 43b9cea89ee87..9ed150e599fab 100644 --- a/src/rgw/rgw_data_sync.h +++ b/src/rgw/rgw_data_sync.h @@ -314,8 +314,9 @@ public: int read_log_info(rgw_datalog_info *log_info); int get_sync_info(); #endif - RGWCoroutine *read_sync_status(RGWObjectCtx& obj_ctx, rgw_bucket_shard_sync_info *sync_status); - RGWCoroutine *init_sync_status(RGWObjectCtx& obj_ctx); + RGWCoroutine *read_sync_status_cr(RGWObjectCtx& obj_ctx, rgw_bucket_shard_sync_info *sync_status); + RGWCoroutine *init_sync_status_cr(RGWObjectCtx& obj_ctx); + RGWCoroutine *run_sync_cr(RGWObjectCtx& obj_ctx); #if 0 int set_sync_info(const rgw_data_sync_info& sync_info); int run_sync(int num_shards, rgw_data_sync_status& sync_status); @@ -370,10 +371,9 @@ public: static string status_oid(const string& source_zone, const string& bucket_name, const string& bucket_id, int shard_id); int read_sync_status(); -#if 0 - - int run() { return source_log.run_sync(num_shards, sync_status); } + int run(); +#if 0 void wakeup() { return source_log.wakeup(); } void stop() { source_log.finish(); -- 2.39.5