From 6bf77a7e358b2605b99d0af87570ba82e5883d5e Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 1 Sep 2015 16:18:32 -0700 Subject: [PATCH] rgw: read mdlog entries for incremental sync Signed-off-by: Yehuda Sadeh --- src/rgw/rgw_sync.cc | 196 ++++++++++++++++++++++++++++++++------------ 1 file changed, 142 insertions(+), 54 deletions(-) diff --git a/src/rgw/rgw_sync.cc b/src/rgw/rgw_sync.cc index e4b3caed06435..9a558fe298f10 100644 --- a/src/rgw/rgw_sync.cc +++ b/src/rgw/rgw_sync.cc @@ -80,6 +80,7 @@ public: int get_ret_status() { return retcode; } }; + class RGWAsyncGetSystemObj : public RGWAsyncRadosRequest { RGWRados *store; RGWObjectCtx *obj_ctx; @@ -750,6 +751,77 @@ public: } }; +class RGWAsyncReadMDLogEntries : public RGWAsyncRadosRequest { + RGWRados *store; + RGWMetadataLog *mdlog; + int shard_id; + string *marker; + int max_entries; + list *entries; + bool *truncated; + + void *handle; +protected: + int _send_request() { + utime_t from_time; + utime_t end_time; + + mdlog->init_list_entries(shard_id, from_time, end_time, *marker, &handle); + + return mdlog->list_entries(handle, max_entries, *entries, marker, truncated); + } +public: + RGWAsyncReadMDLogEntries(RGWAioCompletionNotifier *cn, RGWRados *_store, + int _shard_id, string* _marker, int _max_entries, + list *_entries, bool *_truncated) : RGWAsyncRadosRequest(cn), store(_store), mdlog(store->meta_mgr->get_log()), + shard_id(_shard_id), marker(_marker), max_entries(_max_entries), + entries(_entries), truncated(_truncated) {} +}; + +class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine { + RGWAsyncRadosProcessor *async_rados; + RGWRados *store; + int shard_id; + string marker; + string *pmarker; + int max_entries; + list *entries; + bool *truncated; + + RGWAsyncReadMDLogEntries *req; + +public: + RGWReadMDLogEntriesCR(RGWAsyncRadosProcessor *_async_rados, RGWRados *_store, + int _shard_id, string*_marker, int _max_entries, + list *_entries, bool *_truncated) : RGWSimpleCoroutine(_store->ctx()), + async_rados(_async_rados), + store(_store), + shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries), + entries(_entries), truncated(_truncated) { + } + + ~RGWReadMDLogEntriesCR() { + delete req; + } + + int send_request() { + marker = *pmarker; + req = new RGWAsyncReadMDLogEntries(stack->create_completion_notifier(), + store, shard_id, &marker, max_entries, entries, truncated); + async_rados->queue(req); + return 0; + } + + int request_complete() { + int ret = req->get_ret_status(); + if (ret >= 0 && !entries->empty()) { + *pmarker = marker; + } + return req->get_ret_status(); + } +}; + + template class RGWReadRESTResourceCR : public RGWSimpleCoroutine { RGWRESTConn *conn; @@ -1069,7 +1141,6 @@ public: } } void append(const string& entry) { - static int counter = 0; int shard_id = store->key_to_shard_id(entry, shards.size()); shards[shard_id]->append(entry); } @@ -1412,6 +1483,13 @@ class RGWMetaSyncShardCR : public RGWCoroutine { RGWMetaSyncShardMarkerTrack *marker_tracker; + list log_entries; + bool truncated; + + + boost::asio::coroutine incremental_cr; + boost::asio::coroutine full_cr; + public: RGWMetaSyncShardCR(RGWRados *_store, RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados, @@ -1422,7 +1500,7 @@ public: pool(_pool), shard_id(_shard_id), sync_marker(_marker), - marker_tracker(NULL) { + marker_tracker(NULL), truncated(false) { } ~RGWMetaSyncShardCR() { @@ -1435,76 +1513,88 @@ public: } int operate() { - RGWRESTConn *conn = store->rest_master_conn; - string section; - string key; + while (true) { + switch (sync_marker.state) { + case rgw_meta_sync_marker::FullSync: + return full_sync(); + case rgw_meta_sync_marker::IncrementalSync: + return incremental_sync(); + break; + default: + return set_state(RGWCoroutine_Error, -EIO); + } + } + return 0; + } + + int full_sync() { int ret; #define OMAP_GET_MAX_ENTRIES 100 int max_entries = OMAP_GET_MAX_ENTRIES; - reenter(this) { - if (sync_marker.state == rgw_meta_sync_marker::FullSync) { - oid = full_sync_index_shard_oid(shard_id); - set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados, this, + reenter(&full_cr) { + oid = full_sync_index_shard_oid(shard_id); + set_marker_tracker(new RGWMetaSyncShardMarkerTrack(store, http_manager, async_rados, this, RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker)); - do { - yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries)); + do { + yield return call(new RGWRadosGetOmapKeysCR(store, pool, oid, sync_marker.marker, &entries, max_entries)); + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); + } + iter = entries.begin(); + for (; iter != entries.end(); ++iter) { + ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl; + marker_tracker->start(iter->first); + // fetch remote and write locally + yield spawn(new RGWMetaSyncSingleEntryCR(store, http_manager, async_rados, iter->first, marker_tracker), false); if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl; return set_state(RGWCoroutine_Error, retcode); } - iter = entries.begin(); - for (; iter != entries.end(); ++iter) { - ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl; - marker_tracker->start(iter->first); - // fetch remote and write locally - yield spawn(new RGWMetaSyncSingleEntryCR(store, http_manager, async_rados, iter->first, marker_tracker), false); - if (retcode < 0) { - return set_state(RGWCoroutine_Error, retcode); - } - sync_marker.marker = iter->first; - } - } while ((int)entries.size() == max_entries); + sync_marker.marker = iter->first; + } + } while ((int)entries.size() == max_entries); - /* wait for all operations to complete */ - while (collect(&ret)) { - if (ret < 0) { - ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; - /* we should have reported this error */ + /* wait for all operations to complete */ + while (collect(&ret)) { + if (ret < 0) { + ldout(store->ctx(), 0) << "ERROR: a sync operation returned error" << dendl; + /* we should have reported this error */ #warning deal with error - } - yield; } + yield; + } - yield { - /* update marker to reflect we're done with full sync */ - sync_marker.state = rgw_meta_sync_marker::IncrementalSync; - sync_marker.marker = sync_marker.next_step_marker; - sync_marker.next_step_marker.clear(); - call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, - RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker)); - } - if (retcode < 0) { - ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; - return set_state(RGWCoroutine_Error, retcode); - } - goto incremental_sync; - // update shard state - } else if (sync_marker.state == rgw_meta_sync_marker::IncrementalSync) { -incremental_sync: - int r = incremental_sync(); - return set_state(RGWCoroutine_Done, 0); - } else { - return set_state(RGWCoroutine_Error, -EIO); + yield { + /* update marker to reflect we're done with full sync */ + sync_marker.state = rgw_meta_sync_marker::IncrementalSync; + sync_marker.marker = sync_marker.next_step_marker; + sync_marker.next_step_marker.clear(); + call(new RGWSimpleRadosWriteCR(async_rados, store, store->get_zone_params().log_pool, + RGWMetaSyncStatusManager::shard_obj_name(shard_id), sync_marker)); + } + if (retcode < 0) { + ldout(store->ctx(), 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl; + return set_state(RGWCoroutine_Error, retcode); } } return 0; } + int incremental_sync() { + reenter(&incremental_cr) { + do { +#define INCREMENTAL_MAX_ENTRIES 100 + yield call(new RGWReadMDLogEntriesCR(async_rados, store, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated)); + for (list::iterator iter = log_entries.begin(); iter != log_entries.end(); ++iter) { + ldout(store->ctx(), 20) << __func__ << ": log_entry: " << iter->id << ":" << iter->section << ":" << iter->name << ":" << iter->timestamp << dendl; + } + } while (true); + } /* TODO */ - return -ENOTSUP; + return 0; } }; @@ -1524,8 +1614,6 @@ public: } int operate() { - RGWRESTConn *conn = store->rest_master_conn; - reenter(this) { yield { map::iterator iter = sync_status.sync_markers.begin(); -- 2.39.5