+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+
#include "common/ceph_json.h"
#include "common/RWLock.h"
#include "common/RefCountedObj.h"
}
public:
RGWAsyncReadMDLogEntries(RGWAioCompletionNotifier *cn, RGWRados *_store,
- int _shard_id, string* _marker, int _max_entries,
- list<cls_log_entry> *_entries, bool *_truncated) : RGWAsyncRadosRequest(cn), store(_store), mdlog(store->meta_mgr->get_log()),
- shard_id(_shard_id), marker(_marker), max_entries(_max_entries),
- entries(_entries), truncated(_truncated) {}
+ RGWMetadataLog* mdlog, int _shard_id,
+ string* _marker, int _max_entries,
+ list<cls_log_entry> *_entries, bool *_truncated)
+ : RGWAsyncRadosRequest(cn), store(_store), mdlog(mdlog),
+ shard_id(_shard_id), marker(_marker), max_entries(_max_entries),
+ entries(_entries), truncated(_truncated) {}
};
class RGWReadMDLogEntriesCR : public RGWSimpleCoroutine {
RGWMetaSyncEnv *sync_env;
+ RGWMetadataLog *const mdlog;
int shard_id;
string marker;
string *pmarker;
RGWAsyncReadMDLogEntries *req;
public:
- RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env,
- int _shard_id, string*_marker, int _max_entries,
- list<cls_log_entry> *_entries, bool *_truncated) : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env),
- shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
- entries(_entries), truncated(_truncated) {
- }
+ RGWReadMDLogEntriesCR(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
+ int _shard_id, string*_marker, int _max_entries,
+ list<cls_log_entry> *_entries, bool *_truncated)
+ : RGWSimpleCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
+ shard_id(_shard_id), pmarker(_marker), max_entries(_max_entries),
+ entries(_entries), truncated(_truncated) {}
~RGWReadMDLogEntriesCR() {
if (req) {
int send_request() {
marker = *pmarker;
req = new RGWAsyncReadMDLogEntries(stack->create_completion_notifier(),
- sync_env->store, shard_id, &marker, max_entries, entries, truncated);
+ sync_env->store, mdlog, shard_id, &marker,
+ max_entries, entries, truncated);
sync_env->async_rados->queue(req);
return 0;
}
RGWObjectCtx& obj_ctx;
rgw_meta_sync_info status;
- map<int, RGWMetadataLogInfo> shards_info;
+ vector<RGWMetadataLogInfo> shards_info;
RGWContinuousLeaseCR *lease_cr;
public:
RGWInitSyncStatusCoroutine(RGWMetaSyncEnv *_sync_env,
RGWObjectCtx& _obj_ctx,
const rgw_meta_sync_info &status)
: RGWCoroutine(_sync_env->store->ctx()), sync_env(_sync_env),
- obj_ctx(_obj_ctx), status(status), lease_cr(NULL) {}
+ obj_ctx(_obj_ctx), status(status), shards_info(status.num_shards),
+ lease_cr(NULL) {}
~RGWInitSyncStatusCoroutine() {
if (lease_cr) {
set_status("fetching remote log position");
yield {
for (int i = 0; i < (int)status.num_shards; i++) {
- spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period,
- i, &shards_info[i]), false);
+ spawn(new RGWReadRemoteMDLogShardInfoCR(sync_env, status.period, i,
+ &shards_info[i]), false);
}
}
const std::string& period;
int shard_id;
string marker;
- bool truncated;
+ bool truncated = false;
string *new_marker;
- int max_entries;
+ int max_entries = CLONE_MAX_ENTRIES;
- RGWRESTReadResource *http_op;
+ RGWRESTReadResource *http_op = nullptr;
- int req_ret;
+ int req_ret = 0;
RGWMetadataLogInfo shard_info;
rgw_mdlog_shard_data data;
public:
- RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, const std::string& period,
- int _id, const string& _marker, string *_new_marker)
- : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- mdlog(sync_env->store->meta_mgr->get_log()), period(period),
- shard_id(_id), marker(_marker), truncated(false), new_marker(_new_marker),
- max_entries(CLONE_MAX_ENTRIES), http_op(NULL), req_ret(0) {
+ RGWCloneMetaLogCoroutine(RGWMetaSyncEnv *_sync_env, RGWMetadataLog* mdlog,
+ const std::string& period, int _id,
+ const string& _marker, string *_new_marker)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), mdlog(mdlog),
+ period(period), shard_id(_id), marker(_marker), new_marker(_new_marker) {
if (new_marker) {
*new_marker = marker;
}
class RGWMetaSyncShardCR : public RGWCoroutine {
RGWMetaSyncEnv *sync_env;
- rgw_bucket pool;
-
- const std::string& period;
+ const rgw_bucket& pool;
+ const std::string& period; //< currently syncing period id
+ RGWMetadataLog* mdlog; //< log of syncing period
uint32_t shard_id;
- rgw_meta_sync_marker sync_marker;
+ rgw_meta_sync_marker& sync_marker;
string marker;
string max_marker;
+ const std::string& period_marker; //< max marker stored in next period
map<string, bufferlist> entries;
map<string, bufferlist>::iterator iter;
string oid;
- RGWMetaSyncShardMarkerTrack *marker_tracker;
+ RGWMetaSyncShardMarkerTrack *marker_tracker = nullptr;
list<cls_log_entry> log_entries;
list<cls_log_entry>::iterator log_iter;
- bool truncated;
+ bool truncated = false;
string mdlog_marker;
string raw_key;
boost::asio::coroutine incremental_cr;
boost::asio::coroutine full_cr;
- RGWContinuousLeaseCR *lease_cr;
- bool lost_lock;
+ RGWContinuousLeaseCR *lease_cr = nullptr;
+ bool lost_lock = false;
bool *reset_backoff;
map<RGWCoroutinesStack *, string> stack_to_pos;
map<string, string> pos_to_prev;
- bool can_adjust_marker;
+ bool can_adjust_marker = false;
+ bool done_with_period = false;
- int total_entries;
+ int total_entries = 0;
public:
- RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env,
- rgw_bucket& _pool, const std::string& period,
- uint32_t _shard_id, rgw_meta_sync_marker& _marker,
- bool *_reset_backoff) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- pool(_pool),
- period(period),
- shard_id(_shard_id),
- sync_marker(_marker),
- marker_tracker(NULL), truncated(false), inc_lock("RGWMetaSyncShardCR::inc_lock"),
- lease_cr(NULL), lost_lock(false), reset_backoff(_reset_backoff), can_adjust_marker(false),
- total_entries(0) {
+ RGWMetaSyncShardCR(RGWMetaSyncEnv *_sync_env, const rgw_bucket& _pool,
+ const std::string& period, RGWMetadataLog* mdlog,
+ uint32_t _shard_id, rgw_meta_sync_marker& _marker,
+ const std::string& period_marker, bool *_reset_backoff)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), pool(_pool),
+ period(period), mdlog(mdlog), shard_id(_shard_id), sync_marker(_marker),
+ period_marker(period_marker), inc_lock("RGWMetaSyncShardCR::inc_lock"),
+ reset_backoff(_reset_backoff) {
*reset_backoff = false;
}
lease_cr->put();
}
RGWRados *store = sync_env->store;
- lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
+ lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, pool,
sync_env->shard_obj_name(shard_id),
lock_name, lock_duration, this);
lease_cr->get();
sync_marker.marker = sync_marker.next_step_marker;
sync_marker.next_step_marker.clear();
}
+ // XXX: why write the marker if !can_adjust_marker?
RGWRados *store = sync_env->store;
ldout(sync_env->cct, 0) << *this << ": saving marker pos=" << sync_marker.marker << dendl;
- call(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, store, store->get_zone_params().log_pool,
+ call(new RGWSimpleRadosWriteCR<rgw_meta_sync_marker>(sync_env->async_rados, store, pool,
sync_env->shard_obj_name(shard_id), sync_marker));
}
if (retcode < 0) {
uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
string lock_name = "sync_lock";
RGWRados *store = sync_env->store;
- lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, store->get_zone_params().log_pool,
+ lease_cr = new RGWContinuousLeaseCR(sync_env->async_rados, store, pool,
sync_env->shard_obj_name(shard_id),
lock_name, lock_duration, this);
lease_cr->get();
* sync_marker: the local sync marker position
* max_marker: the max mdlog position that we fetched
* marker: the current position we try to sync
+ * period_marker: the last marker before the next period begins (optional)
*/
marker = max_marker = sync_marker.marker;
/* inc sync */
break;
}
#define INCREMENTAL_MAX_ENTRIES 100
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
+ if (!period_marker.empty() && period_marker <= marker) {
+ done_with_period = true;
+ break;
+ }
if (mdlog_marker <= max_marker) {
/* we're at the tip, try to bring more entries */
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
- yield call(new RGWCloneMetaLogCoroutine(sync_env, period, shard_id,
+ yield call(new RGWCloneMetaLogCoroutine(sync_env, mdlog,
+ period, shard_id,
mdlog_marker, &mdlog_marker));
}
if (retcode < 0) {
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
if (mdlog_marker > max_marker) {
marker = max_marker;
- yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
+ yield call(new RGWReadMDLogEntriesCR(sync_env, mdlog, shard_id,
+ &max_marker, INCREMENTAL_MAX_ENTRIES,
+ &log_entries, &truncated));
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
+ if (!period_marker.empty() && period_marker < log_iter->id) {
+ done_with_period = true;
+ break;
+ }
if (!mdlog_entry.convert_from(*log_iter)) {
ldout(sync_env->cct, 0) << __func__ << ":" << __LINE__ << ": ERROR: failed to convert mdlog entry, shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << " ... skipping entry" << dendl;
continue;
}
}
collect_children();
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << " period_marker=" << period_marker << dendl;
+ if (done_with_period) {
+ // return control to RGWMetaSyncCR and advance to the next period
+ break;
+ }
if (mdlog_marker == max_marker && can_adjust_marker) {
#define INCREMENTAL_INTERVAL 20
yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
{
RGWMetaSyncEnv *sync_env;
- rgw_bucket pool;
-
+ const rgw_bucket& pool;
const std::string& period;
+ RGWMetadataLog* mdlog;
uint32_t shard_id;
rgw_meta_sync_marker sync_marker;
+ const std::string period_marker;
RGWObjectCtx obj_ctx;
public:
- RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env,
- rgw_bucket& _pool, const std::string& period,
- uint32_t _shard_id, rgw_meta_sync_marker& _marker)
+ RGWMetaSyncShardControlCR(RGWMetaSyncEnv *_sync_env, const rgw_bucket& _pool,
+ const std::string& period, RGWMetadataLog* mdlog,
+ uint32_t _shard_id, const rgw_meta_sync_marker& _marker,
+ std::string&& period_marker)
: RGWBackoffControlCR(_sync_env->cct), sync_env(_sync_env),
- pool(_pool), period(period), shard_id(_shard_id),
- sync_marker(_marker), obj_ctx(sync_env->store) {}
+ pool(_pool), period(period), mdlog(mdlog), shard_id(_shard_id),
+ sync_marker(_marker), period_marker(std::move(period_marker)),
+ obj_ctx(sync_env->store) {}
RGWCoroutine *alloc_cr() {
- return new RGWMetaSyncShardCR(sync_env, pool, period, shard_id,
- sync_marker, backoff_ptr());
+ return new RGWMetaSyncShardCR(sync_env, pool, period, mdlog, shard_id,
+ sync_marker, period_marker, backoff_ptr());
}
RGWCoroutine *alloc_finisher_cr() {
RGWRados *store = sync_env->store;
- return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store, obj_ctx, store->get_zone_params().log_pool,
+ return new RGWSimpleRadosReadCR<rgw_meta_sync_marker>(sync_env->async_rados, store, obj_ctx, pool,
sync_env->shard_obj_name(shard_id), &sync_marker);
}
};
class RGWMetaSyncCR : public RGWCoroutine {
RGWMetaSyncEnv *sync_env;
-
+ const rgw_bucket& pool;
+ RGWPeriodHistory::Cursor cursor; //< sync position in period history
+ RGWPeriodHistory::Cursor next; //< next period in history
rgw_meta_sync_status sync_status;
map<int, RGWMetaSyncShardControlCR *> shard_crs;
-
public:
- RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, const rgw_meta_sync_status& _sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- sync_status(_sync_status) {
- }
+ RGWMetaSyncCR(RGWMetaSyncEnv *_sync_env, RGWPeriodHistory::Cursor cursor,
+ const rgw_meta_sync_status& _sync_status)
+ : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
+ pool(sync_env->store->get_zone_params().log_pool),
+ cursor(cursor), sync_status(_sync_status) {}
int operate() {
+ int ret = 0;
reenter(this) {
- yield {
- map<uint32_t, rgw_meta_sync_marker>::iterator iter = sync_status.sync_markers.begin();
- for (; iter != sync_status.sync_markers.end(); ++iter) {
- uint32_t shard_id = iter->first;
- rgw_meta_sync_marker marker;
-
- RGWMetaSyncShardControlCR *shard_cr = new RGWMetaSyncShardControlCR(sync_env,
- sync_env->store->get_zone_params().log_pool,
- sync_status.sync_info.period, shard_id,
- sync_status.sync_markers[shard_id]);
+ // loop through one period at a time
+ for (;;) {
+ if (cursor == sync_env->store->period_history->get_current()) {
+ next = RGWPeriodHistory::Cursor{};
+ if (cursor) {
+ ldout(cct, 10) << "RGWMetaSyncCR on current period="
+ << cursor.get_period().get_id() << dendl;
+ } else {
+ ldout(cct, 10) << "RGWMetaSyncCR with no period" << dendl;
+ }
+ } else {
+ next = cursor;
+ next.next();
+ ldout(cct, 10) << "RGWMetaSyncCR on period="
+ << cursor.get_period().get_id() << ", next="
+ << next.get_period().get_id() << dendl;
+ }
+ yield {
+ // get the mdlog for the current period (may be empty)
+ auto& period_id = sync_status.sync_info.period;
+ auto mdlog = sync_env->store->meta_mgr->get_log(period_id);
+
+ // sync this period on each shard
+ for (const auto& m : sync_status.sync_markers) {
+ uint32_t shard_id = m.first;
+ auto& marker = m.second;
+
+ std::string period_marker;
+ if (next) {
+ // read the maximum marker from the next period's sync status
+ period_marker = next.get_period().get_sync_status()[shard_id];
+ if (period_marker.empty()) {
+ // no metadata changes have occurred on this shard, skip it
+ ldout(cct, 10) << "RGWMetaSyncCR: skipping shard " << shard_id
+ << " with empty period marker" << dendl;
+ continue;
+ }
+ }
- shard_crs[shard_id] = shard_cr;
- spawn(shard_cr, true);
+ auto cr = new RGWMetaSyncShardControlCR(sync_env, pool, period_id,
+ mdlog, shard_id, marker,
+ std::move(period_marker));
+ // XXX: do we need to hold a ref on cr while it's in shard_crs?
+ shard_crs[shard_id] = cr;
+ spawn(cr, false);
+ }
}
+ // wait for each shard to complete
+ collect(&ret);
+ if (ret < 0) {
+ return set_cr_error(ret);
+ }
+ drain_all();
+ // advance to the next period
+ assert(next);
+ cursor = next;
+
+ // write the updated sync info
+ sync_status.sync_info.period = cursor.get_period().get_id();
+ sync_status.sync_info.realm_epoch = cursor.get_epoch();
+ yield call(new RGWSimpleRadosWriteCR<rgw_meta_sync_info>(sync_env->async_rados,
+ sync_env->store, pool,
+ sync_env->status_oid(),
+ sync_status.sync_info));
}
- yield return set_cr_done();
}
return 0;
}