string instance;
string ns;
+ string cur_id;
+
public:
entries_iter = list_result.begin();
for (; entries_iter != list_result.end(); ++entries_iter) {
entry = &(*entries_iter);
- inc_marker.position = entry->id;
+ {
+ ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
+ if (p < 0) {
+ cur_id = entry->id;
+ } else {
+ cur_id = entry->id.substr(p + 1);
+ }
+ }
+ inc_marker.position = cur_id;
if (!rgw_obj::parse_raw_oid(entries_iter->object, &name, &instance, &ns)) {
set_status() << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry";
ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entries_iter->object << " returned false, skipping entry" << dendl;
+ marker_tracker->try_update_high_marker(cur_id, 0, entries_iter->timestamp);
continue;
}
- ldout(sync_env->cct, 20) << "parsed entry: iter->object=" << entries_iter->object << " iter->instance=" << entries_iter->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
+ ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << name << " instance=" << instance << " ns=" << ns << dendl;
if (!ns.empty()) {
- set_status() << "skipping entry in namespace: " << entries_iter->object;
- ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entries_iter->object << dendl;
+ set_status() << "skipping entry in namespace: " << entry->object;
+ ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
+ marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
- key = rgw_obj_key(name, entries_iter->instance);
- set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op;
+ key = rgw_obj_key(name, entry->instance);
+ set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
if (entry->op == CLS_RGW_OP_CANCEL) {
set_status() << "canceled operation, skipping";
ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
+ marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
if (entry->state != CLS_RGW_STATE_COMPLETE) {
set_status() << "non-complete operation, skipping";
ldout(sync_env->cct, 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": non-complete operation" << dendl;
+ marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
ldout(sync_env->cct, 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
yield wait_for_child();
}
- if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) {
+ if (!marker_tracker->index_key_to_marker(key, entry->op, cur_id)) {
set_status() << "can't do op, sync already in progress for object";
- ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
- marker_tracker->try_update_high_marker(entry->id, 0, entries_iter->timestamp);
+ ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
+ marker_tracker->try_update_high_marker(cur_id, 0, entry->timestamp);
continue;
}
// yield {
set_status() << "start object sync";
- if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) {
- ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
+ if (!marker_tracker->start(cur_id, 0, entry->timestamp)) {
+ ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
} else {
uint64_t versioned_epoch = 0;
bucket_entry_owner owner(entry->owner, entry->owner_display_name);
ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(sync_env, bucket_info, shard_id,
key, entry->is_versioned(), versioned_epoch, entry->timestamp, owner, entry->op,
- entry->state, entry->id, marker_tracker), false);
+ entry->state, cur_id, marker_tracker), false);
}
// }
while ((int)num_spawned() > spawn_window) {
}
} while (!list_result.empty());
+ yield {
+ call(marker_tracker->flush());
+ }
+ if (retcode < 0) {
+ ldout(sync_env->cct, 0) << "ERROR: marker_tracker->flush() returned retcode=" << retcode << dendl;
+ lease_cr->go_down();
+ drain_all();
+ return set_cr_error(retcode);
+ }
+
lease_cr->go_down();
/* wait for all operations to complete */
drain_all();
+
return set_cr_done();
}
return 0;