for (; iter != entries.end(); ++iter) {
ldout(store->ctx(), 20) << __func__ << ": full sync: " << iter->first << dendl;
total_entries++;
- marker_tracker->start(iter->first, total_entries, utime_t());
+ if (!marker_tracker->start(iter->first, total_entries, utime_t())) {
+ ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
+ } else {
// fetch remote and write locally
- yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false);
- if (retcode < 0) {
- return set_cr_error(retcode);
+ yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, iter->first, iter->first, marker_tracker), false);
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
}
sync_marker.marker = iter->first;
}
ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
continue;
}
- marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp);
- yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
- if (retcode < 0) {
- return set_cr_error(retcode);
+ if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
+ ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
+ } else {
+ yield spawn(new RGWDataSyncSingleEntryCR(store, http_manager, async_rados, conn, source_zone, log_iter->entry.key, log_iter->log_id, marker_tracker), false);
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
}
}
}
yield {
bucket_list_entry& entry = *entries_iter;
total_entries++;
- marker_tracker->start(entry.key, total_entries, utime_t());
list_marker = entry.key;
+ if (!marker_tracker->start(entry.key, total_entries, utime_t())) {
+ ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry.key << ". Duplicate entry?" << dendl;
+ } else {
+ RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
- RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
-
- spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
- entry.key, entry.versioned_epoch, entry.mtime, op, entry.key, marker_tracker), false);
+ spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
+ entry.key, entry.versioned_epoch, entry.mtime, op, entry.key, marker_tracker), false);
+ }
}
while ((int)num_spawned() > spawn_window) {
yield wait_for_child();
rgw_obj_key key(entries_iter->object, entries_iter->instance);
ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
rgw_bi_log_entry& entry = *entries_iter;
- marker_tracker->start(entry.id, 0, entries_iter->timestamp);
inc_marker.position = entry.id;
- uint64_t versioned_epoch = 0;
- if (entry.ver.pool < 0) {
- versioned_epoch = entry.ver.epoch;
+ if (!marker_tracker->start(entry.id, 0, entries_iter->timestamp)) {
+ ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry.id << ". Duplicate entry?" << dendl;
+ } else {
+ uint64_t versioned_epoch = 0;
+ if (entry.ver.pool < 0) {
+ versioned_epoch = entry.ver.epoch;
+ }
+ spawn(new RGWBucketSyncSingleEntryCR<string>(store, async_rados, source_zone, bucket_info, shard_id,
+ key, versioned_epoch, entry.timestamp, entry.op, entry.id, marker_tracker), false);
}
- spawn(new RGWBucketSyncSingleEntryCR<string>(store, async_rados, source_zone, bucket_info, shard_id,
- key, versioned_epoch, entry.timestamp, entry.op, entry.id, marker_tracker), false);
}
while ((int)num_spawned() > spawn_window) {
yield wait_for_child();
for (; iter != entries.end(); ++iter) {
ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl;
total_entries++;
- marker_tracker->start(iter->first, total_entries, utime_t());
-
+ if (!marker_tracker->start(iter->first, total_entries, utime_t())) {
+ ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl;
+ } else {
// fetch remote and write locally
- yield {
- RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false);
- if (retcode < 0) {
- return retcode;
- }
- assert(stack);
- stack->get();
+ yield {
+ RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker), false);
+ if (retcode < 0) {
+ return retcode;
+ }
+ assert(stack);
+ stack->get();
- stack_to_pos[stack] = iter->first;
- pos_to_prev[iter->first] = marker;
+ stack_to_pos[stack] = iter->first;
+ pos_to_prev[iter->first] = marker;
+ }
}
marker = iter->first;
}
yield call(new RGWReadMDLogEntriesCR(sync_env, shard_id, &max_marker, INCREMENTAL_MAX_ENTRIES, &log_entries, &truncated));
for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->id << ":" << log_iter->section << ":" << log_iter->name << ":" << log_iter->timestamp << dendl;
- marker_tracker->start(log_iter->id, 0, log_iter->timestamp);
- raw_key = log_iter->section + ":" + log_iter->name;
- yield {
- RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
- assert(stack);
- stack->get();
-
- stack_to_pos[stack] = log_iter->id;
- pos_to_prev[log_iter->id] = marker;
- marker = log_iter->id;
+ if (!marker_tracker->start(log_iter->id, 0, log_iter->timestamp)) {
+ ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->id << ". Duplicate entry?" << dendl;
+ } else {
+ raw_key = log_iter->section + ":" + log_iter->name;
+ yield {
+ RGWCoroutinesStack *stack = spawn(new RGWMetaSyncSingleEntryCR(sync_env, raw_key, log_iter->id, marker_tracker), false);
+ assert(stack);
+ stack->get();
+
+ stack_to_pos[stack] = log_iter->id;
+ pos_to_prev[log_iter->id] = marker;
+ }
}
+ marker = log_iter->id;
}
}
collect_children();