int shard_id;
string marker;
bool truncated;
+ string *new_marker;
int max_entries;
public:
RGWCloneMetaLogCoroutine(RGWRados *_store, RGWHTTPManager *_mgr,
- int _id, const string& _marker) : RGWCoroutine(_store->ctx()), store(_store),
+ int _id, const string& _marker, string *_new_marker) : RGWCoroutine(_store->ctx()), store(_store),
mdlog(store->meta_mgr->get_log()),
http_manager(_mgr), shard_id(_id),
- marker(_marker), truncated(false), max_entries(CLONE_MAX_ENTRIES),
+ marker(_marker), truncated(false), new_marker(_new_marker),
+ max_entries(CLONE_MAX_ENTRIES),
http_op(NULL), md_op_notifier(NULL),
- req_ret(0) {}
+ req_ret(0) {
+ if (new_marker) {
+ *new_marker = marker;
+ }
+ }
int operate();
list<cls_log_entry> log_entries;
bool truncated;
+ string mdlog_marker;
+
Mutex inc_lock;
Cond inc_cond;
int incremental_sync() {
reenter(&incremental_cr) {
+ mdlog_marker = sync_marker.marker;
do {
#define INCREMENTAL_MAX_ENTRIES 100
- yield call(new RGWReadMDLogEntriesCR(async_rados, store, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
- if (log_entries.size() < INCREMENTAL_MAX_ENTRIES) {
- ldout(store->ctx(), 20) << __func__ << ": syncing mdlog for shard_id=" << shard_id << dendl;
- yield call(new RGWCloneMetaLogCoroutine(store, http_manager, shard_id, sync_marker.marker));
+ ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ if (mdlog_marker <= sync_marker.marker) {
+ /* we're at the tip, try to bring more entries */
+ ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
+ yield call(new RGWCloneMetaLogCoroutine(store, http_manager, shard_id, mdlog_marker, &mdlog_marker));
}
- for (list<cls_log_entry>::iterator iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
- ldout(store->ctx(), 20) << __func__ << ": log_entry: " << iter->id << ":" << iter->section << ":" << iter->name << ":" << iter->timestamp << dendl;
+ ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ if (mdlog_marker > sync_marker.marker) {
+ yield call(new RGWReadMDLogEntriesCR(async_rados, store, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
+ for (list<cls_log_entry>::iterator iter = log_entries.begin(); iter != log_entries.end(); ++iter) {
+ ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << iter->id << ":" << iter->section << ":" << iter->name << ":" << iter->timestamp << dendl;
+ }
}
+ ldout(store->ctx(), 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ if (mdlog_marker == sync_marker.marker) {
#define INCREMENTAL_INTERVAL 20
- yield call(new RGWWaitCR(async_rados, store->ctx(), &inc_lock, &inc_cond, INCREMENTAL_INTERVAL));
+ yield call(new RGWWaitCR(async_rados, store->ctx(), &inc_lock, &inc_cond, INCREMENTAL_INTERVAL));
+ }
} while (true);
}
/* TODO */
list<RGWCoroutinesStack *> stacks;
for (int i = 0; i < (int)num_shards; i++) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
- int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i]));
+ int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i], NULL));
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: stack->call() returned r=" << r << dendl;
return r;
list<RGWCoroutinesStack *> stacks;
for (int i = 0; i < (int)num_shards; i++) {
RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), this);
- int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i]));
+ int r = stack->call(new RGWCloneMetaLogCoroutine(store, &http_manager, i, clone_markers[i], NULL));
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: stack->call() returned r=" << r << dendl;
return r;
return set_state(RGWCoroutine_Done);
}
+ if (new_marker) {
+ *new_marker = data.entries.back().id;
+ }
+
return 0;
}