}
if (sync_status < 0) {
+#warning need to store entry for non-transient errors
ldout(sync_env->cct, 10) << *this << ": failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << dendl;
log_error() << "failed to send read remote metadata entry: section=" << section << " key=" << key << " status=" << sync_status << std::endl;
return set_cr_error(sync_status);
sync_status = retcode;
- yield {
- /* update marker */
- int ret = call(marker_tracker->finish(entry_marker));
- if (ret < 0) {
- ldout(sync_env->cct, 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl;
- return set_cr_error(sync_status);
- }
- }
if (sync_status == 0) {
+ yield {
+ /* update marker */
+ int ret = call(marker_tracker->finish(entry_marker));
+ if (ret < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: marker_tracker->finish(" << entry_marker << ") returned ret=" << ret << dendl;
+ return set_cr_error(sync_status);
+ }
+ }
sync_status = retcode;
}
if (sync_status < 0) {
uint32_t shard_id;
rgw_meta_sync_marker sync_marker;
string marker;
+ string max_marker;
map<string, bufferlist> entries;
map<string, bufferlist>::iterator iter;
int incremental_sync() {
reenter(&incremental_cr) {
+ can_adjust_marker = true;
/* grab lock */
if (!lease_cr) { /* could have had a lease_cr lock from previous state */
yield {
set_marker_tracker(new RGWMetaSyncShardMarkerTrack(sync_env,
sync_env->shard_obj_name(shard_id),
sync_marker));
+
+ /*
+ * mdlog_marker: the remote sync marker positiion
+ * sync_marker: the local sync marker position
+ * max_marker: the max mdlog position that we fetched
+ * marker: the current position we try to sync
+ */
+ marker = max_marker = sync_marker.marker;
/* inc sync */
do {
if (!lease_cr->is_locked()) {
}
#define INCREMENTAL_MAX_ENTRIES 100
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
- if (mdlog_marker <= sync_marker.marker) {
+ if (mdlog_marker <= max_marker) {
/* we're at the tip, try to bring more entries */
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " syncing mdlog for shard_id=" << shard_id << dendl;
yield call(new RGWCloneMetaLogCoroutine(sync_env, shard_id, mdlog_marker, &mdlog_marker));
}
+ if (retcode < 0) {
+ ldout(sync_env->cct, 10) << *this << ": failed to fetch more log entries, retcode=" << retcode << dendl;
+ return retcode;
+ }
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
- if (mdlog_marker > sync_marker.marker) {
- yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &sync_marker.marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
+ if (mdlog_marker > max_marker) {
+ marker = max_marker;
+ yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
marker_tracker->start(log_iter->id);
raw_key = log_iter->section + ":" + log_iter->name;
- yield spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
- if (retcode < 0) {
- return retcode;
+ yield {
+ RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
+ if (retcode < 0) {
+ return retcode;
+ }
+ assert(stack);
+ stack->get();
+
+ stack_to_pos[stack] = log_iter->id;
+ pos_to_prev[log_iter->id] = marker;
+ marker = log_iter->id;
+ }
}
- }
- }
- ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
- if (mdlog_marker == sync_marker.marker) {
+ }
+ collect_children();
+ ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " mdlog_marker=" << mdlog_marker << " max_marker=" << max_marker << " sync_marker.marker=" << sync_marker.marker << dendl;
+ if (mdlog_marker == max_marker && can_adjust_marker) {
#define INCREMENTAL_INTERVAL 20
yield wait(utime_t(INCREMENTAL_INTERVAL, 0));
}
- } while (true);
+ } while (can_adjust_marker);
+
+ while (num_spawned() > 1) {
+ yield wait_for_child();
+ collect_children();
+ }
yield lease_cr->go_down();
if (lost_lock) {
return -EBUSY;
}
+
+ if (!can_adjust_marker) {
+ return -EAGAIN;
+ }
}
/* TODO */
return 0;