From 390f64a0215d968d28b2d3614f43b6c30ba5e8e2 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Thu, 1 Oct 2015 18:57:02 -0700 Subject: [PATCH] rgw: data sync work Instead of just keeping map of bucket instances, keep map of all the shards. Also prepare the CRs that will call into the bucket sync. Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_data_sync.cc | 414 ++++++++++++++++++++++++++++++++++----- 1 file changed, 362 insertions(+), 52 deletions(-) diff --git a/src/rgw/rgw_data_sync.cc b/src/rgw/rgw_data_sync.cc index f50b25db821a9..d7362b7fc0ffb 100644 --- a/src/rgw/rgw_data_sync.cc +++ b/src/rgw/rgw_data_sync.cc @@ -373,6 +373,29 @@ int RGWRemoteDataLog::set_sync_info(const rgw_data_sync_info& sync_info) datalog_sync_status_oid, sync_info)); } +static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id) +{ + char buf[datalog_sync_full_sync_index_prefix.size() + 16]; + snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id); + return string(buf); +} + +struct bucket_instance_meta_info { + string key; + obj_version ver; + time_t mtime; + RGWBucketInstanceMetadataObject data; + + bucket_instance_meta_info() : mtime(0) {} + + void decode_json(JSONObj *obj) { + JSONDecoder::decode_json("key", key, obj); + JSONDecoder::decode_json("ver", ver, obj); + JSONDecoder::decode_json("mtime", mtime, obj); + JSONDecoder::decode_json("data", data, obj); + } +}; + class RGWListBucketIndexesCR : public RGWCoroutine { RGWRados *store; RGWHTTPManager *http_manager; @@ -385,11 +408,18 @@ class RGWListBucketIndexesCR : public RGWCoroutine { int req_ret; list result; + list::iterator iter; RGWShardedOmapCRManager *entries_index; string oid_prefix; + string path; + bucket_instance_meta_info meta_info; + string key; + string s; + int i; + public: RGWListBucketIndexesCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, RGWRESTConn *_conn, @@ -397,8 +427,9 @@ public: http_manager(_mgr), async_rados(_async_rados), conn(_conn), source_zone(_source_zone), num_shards(_num_shards), - req_ret(0), entries_index(NULL) { + req_ret(0), entries_index(NULL), i(0) { oid_prefix = datalog_sync_full_sync_index_prefix + "." + source_zone; + path = "/admin/metadata/bucket.instance"; } ~RGWListBucketIndexesCR() { delete entries_index; @@ -415,17 +446,40 @@ public: call(new RGWReadRESTResourceCR >(store->ctx(), conn, http_manager, entrypoint, NULL, &result)); } - yield { - if (get_ret_status() < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl; - return set_state(RGWCoroutine_Error); + if (get_ret_status() < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl; + return set_state(RGWCoroutine_Error); + } + for (iter = result.begin(); iter != result.end(); ++iter) { + ldout(store->ctx(), 20) << "list metadata: section=bucket.index key=" << *iter << dendl; + + key = *iter; + + yield { + rgw_http_param_pair pairs[] = { { "key", key.c_str() }, + { NULL, NULL } }; + + int ret = call(new RGWReadRESTResourceCR(store->ctx(), conn, http_manager, path, pairs, &meta_info)); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl; + return ret; + } } - for (list::iterator iter = result.begin(); iter != result.end(); ++iter) { - ldout(store->ctx(), 20) << "list metadata: section=bucket.index key=" << *iter << dendl; - entries_index->append(*iter); + + num_shards = meta_info.data.get_bucket_info().num_shards; #warning error handling of shards + if (num_shards > 0) { + for (i = 0; i < num_shards; i++) { + char buf[16]; + snprintf(buf, sizeof(buf), ":%d", i); + s = key + buf; + yield entries_index->append(s); + } + } else { + yield entries_index->append(key); } } + yield entries_index->finish(); int ret; while (collect(&ret)) { @@ -440,6 +494,306 @@ public: } }; +#define DATA_SYNC_UPDATE_MARKER_WINDOW 10 + +class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack { + RGWRados *store; + RGWAsyncRadosProcessor *async_rados; + + string marker_oid; + rgw_data_sync_marker sync_marker; + + +public: + RGWDataSyncShardMarkerTrack(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + const string& _marker_oid, + const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW), + store(_store), + async_rados(_async_rados), + marker_oid(_marker_oid), + sync_marker(_marker) {} + + RGWCoroutine *store_marker(const string& new_marker) { + sync_marker.marker = new_marker; + + ldout(store->ctx(), 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl; + return new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + marker_oid, sync_marker); + } +}; + +class RGWRunBucketSyncCoroutine : public RGWCoroutine { + RGWHTTPManager *http_manager; + RGWAsyncRadosProcessor *async_rados; + RGWRESTConn *conn; + RGWRados *store; + RGWObjectCtx& obj_ctx; + string source_zone; + string bucket_name; + string bucket_id; + RGWBucketInfo bucket_info; + int shard_id; + rgw_bucket_shard_sync_info sync_status; + +public: + RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + RGWRESTConn *_conn, RGWRados *_store, + RGWObjectCtx& _obj_ctx, const string& _source_zone, + const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), + http_manager(_mgr), async_rados(_async_rados), conn(_conn), + store(_store), + obj_ctx(_obj_ctx), source_zone(_source_zone), + bucket_name(_bucket_name), + bucket_id(_bucket_id), shard_id(_shard_id) {} + + int operate(); +}; + + +class RGWDataSyncSingleEntryCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http_manager; + RGWAsyncRadosProcessor *async_rados; + + RGWRESTConn *conn; + string source_zone; + + string raw_key; + string entry_marker; + + RGWObjectCtx obj_ctx; + + ssize_t pos; + string bucket_name; + string bucket_instance; + + int sync_status; + + bufferlist md_bl; + + RGWDataSyncShardMarkerTrack *marker_tracker; + +public: + RGWDataSyncSingleEntryCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + RGWRESTConn *_conn, const string& _source_zone, + const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store), + http_manager(_mgr), + async_rados(_async_rados), + conn(_conn), source_zone(_source_zone), + raw_key(_raw_key), entry_marker(_entry_marker), + obj_ctx(_store), + pos(0), sync_status(0), + marker_tracker(_marker_tracker) { + } + + int operate() { + reenter(this) { + yield { + pos = raw_key.find(':'); + bucket_name = raw_key.substr(0, pos); + bucket_instance = raw_key.substr(pos + 1); + pos = bucket_instance.find(':'); + int shard_id = -1; + if (pos >= 0) { + string err; + string s = bucket_instance.substr(pos + 1); + shard_id = strict_strtol(s.c_str(), 10, &err); + if (!err.empty()) { + ldout(store->ctx(), 0) << "ERROR: failed to parse bucket instance key: " << bucket_instance << dendl; + return set_state(RGWCoroutine_Error, -EIO); + } + + bucket_instance = bucket_instance.substr(0, pos - 1); + } + int ret = call(new RGWRunBucketSyncCoroutine(http_manager, async_rados, conn, store, obj_ctx, source_zone, bucket_name, bucket_instance, shard_id)); + if (ret < 0) { +#warning failed syncing bucket, need to log + return set_state(RGWCoroutine_Error, sync_status); + } + } + + sync_status = retcode; +#warning what do do in case of error + yield { + /* update marker */ + int ret = call(marker_tracker->finish(entry_marker)); + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl; + return set_state(RGWCoroutine_Error, sync_status); + } + } + if (sync_status == 0) { + sync_status = retcode; + } + if (sync_status < 0) { + return set_state(RGWCoroutine_Error, retcode); + } + return set_state(RGWCoroutine_Done, 0); + } + return 0; + } +}; + +class RGWDataSyncShardCR : public RGWCoroutine { + RGWRados *store; + RGWHTTPManager *http_manager; + RGWAsyncRadosProcessor *async_rados; + RGWRESTConn *conn; + + rgw_bucket pool; + + string source_zone; + uint32_t shard_id; + rgw_data_sync_marker sync_marker; + + map entries; + map::iterator iter; + + string oid; + + RGWDataSyncShardMarkerTrack *marker_tracker; + + list log_entries; + list::iterator log_iter; + bool truncated; + + string mdlog_marker; + string raw_key; + + Mutex inc_lock; + Cond inc_cond; + + boost::asio::coroutine incremental_cr; + boost::asio::coroutine full_cr; + + +public: + RGWDataSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, + RGWRESTConn *_conn, rgw_bucket& _pool, const string& _source_zone, + uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWCoroutine(_store->ctx()), store(_store), + http_manager(_mgr), + async_rados(_async_rados), + conn(_conn), + pool(_pool), + source_zone(_source_zone), + shard_id(_shard_id), + sync_marker(_marker), + marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock") { + } + + ~RGWDataSyncShardCR() { + delete marker_tracker; + } + + void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) { + delete marker_tracker; + marker_tracker = mt; + } + + int operate() { + while (true) { + switch (sync_marker.state) { + case rgw_data_sync_marker::FullSync: + return full_sync(); + case rgw_data_sync_marker::IncrementalSync: + return incremental_sync(); + break; + default: + return set_state(RGWCoroutine_Error, -EIO); + } + } + return 0; + } + + int full_sync() { + int ret; + +#define OMAP_GET_MAX_ENTRIES 100 + int max_entries = OMAP_GET_MAX_ENTRIES; + reenter(&full_cr) { + oid = full_data_sync_index_shard_oid(source_zone, shard_id); + set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados, + RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id), + sync_marker)); + do { + yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries)); + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + iter = entries.begin(); + for (; iter != entries.end(); ++iter) { + ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl; + marker_tracker->start(iter->first); + // fetch remote and write locally + yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false); + if (retcode < 0) { + return set_state(RGWCoroutine_Error, retcode); + } + sync_marker.marker = iter->first; + } + } while ((int)entries.size() == max_entries); + + drain_all(); + + yield { + /* update marker to reflect we're done with full sync */ + sync_marker.state = rgw_data_sync_marker::IncrementalSync; + sync_marker.marker = sync_marker.next_step_marker; + sync_marker.next_step_marker.clear(); + call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + RGWDataSyncStatusManager::shard_obj_name(source_zone, shard_id), sync_marker)); + } + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + } + return 0; + } + + + int incremental_sync() { +#if 0 + reenter(&incremental_cr) { + mdlog_marker = sync_marker.marker; + set_marker_tracker(new RGWDataSyncShardMarkerTrack(store, http_manager, async_rados, + RGWDataSyncStatusManager::shard_obj_name(shard_id), + sync_marker)); + do { +#define INCREMENTAL_MAX_ENTRIES 100 + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + if (mdlog_marker <= sync_marker.marker) { + /* we're at the tip, try to bring more entries */ + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl; + yield call(new RGWCloneMetaLogCoroutine(store, http_manager, shard_id, mdlog_marker, &mdlog_marker)); + } + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + if (mdlog_marker > sync_marker.marker) { + yield call(new RGWReadMDLogEntriesCR(async_rados, store, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated)); + for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) { + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl; + marker_tracker->start(log_iter->id); + raw_key = log_iter->section + ":" + log_iter->name; + yield spawn(new RGWMetaSyncSingleEntryCR(store, http_manager, async_rados, raw_key, log_iter->id, marker_tracker), false); + if (retcode < 0) { + return set_state(RGWCoroutine_Error, retcode); + } + } + } + ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl; + if (mdlog_marker == sync_marker.marker) { +#define INCREMENTAL_INTERVAL 20 + yield wait(utime_t(INCREMENTAL_INTERVAL, 0)); + } + } while (true); + } + /* TODO */ + return 0; +#endif + } +}; + int RGWRemoteDataLog::run_sync(int num_shards, rgw_data_sync_status& sync_status) { RGWObjectCtx obj_ctx(store, NULL); @@ -538,22 +892,6 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, con return 0; } -struct bucket_instance_meta_info { - string key; - obj_version ver; - time_t mtime; - RGWBucketInstanceMetadataObject data; - - bucket_instance_meta_info() : mtime(0) {} - - void decode_json(JSONObj *obj) { - JSONDecoder::decode_json("key", key, obj); - JSONDecoder::decode_json("ver", ver, obj); - JSONDecoder::decode_json("mtime", mtime, obj); - JSONDecoder::decode_json("data", data, obj); - } -}; - struct bucket_index_marker_info { string bucket_ver; string master_ver; @@ -1365,34 +1703,6 @@ int RGWBucketShardIncrementalSyncCR::operate() return 0; } -class RGWRunBucketSyncCoroutine : public RGWCoroutine { - RGWHTTPManager *http_manager; - RGWAsyncRadosProcessor *async_rados; - RGWRESTConn *conn; - RGWRados *store; - RGWObjectCtx& obj_ctx; - string source_zone; - string bucket_name; - string bucket_id; - RGWBucketInfo bucket_info; - int shard_id; - rgw_bucket_shard_sync_info sync_status; - -public: - RGWRunBucketSyncCoroutine(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, - RGWRESTConn *_conn, RGWRados *_store, - RGWObjectCtx& _obj_ctx, const string& _source_zone, - const string& _bucket_name, const string _bucket_id, int _shard_id) : RGWCoroutine(_store->ctx()), - http_manager(_mgr), async_rados(_async_rados), conn(_conn), - store(_store), - obj_ctx(_obj_ctx), source_zone(_source_zone), - bucket_name(_bucket_name), - bucket_id(_bucket_id), shard_id(_shard_id) {} - - int operate(); -}; - - int RGWRunBucketSyncCoroutine::operate() { reenter(this) { -- 2.39.5