#define DATA_SYNC_UPDATE_MARKER_WINDOW 1
-class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string> {
+class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
RGWRados *store;
RGWAsyncRadosProcessor *async_rados;
map<string, string> key_to_marker;
map<string, string> marker_to_key;
- set<string> need_retry_set;
void handle_finish(const string& marker) {
map<string, string>::iterator iter = marker_to_key.find(marker);
return;
}
key_to_marker.erase(iter->second);
+ reset_need_retry(iter->second);
marker_to_key.erase(iter);
- need_retry_set.erase(marker);
}
public:
*/
bool index_key_to_marker(const string& key, const string& marker) {
if (key_to_marker.find(key) != key_to_marker.end()) {
- need_retry_set.insert(key);
+ set_need_retry(key);
return false;
}
key_to_marker[key] = marker;
marker_to_key[marker] = key;
return true;
}
-
- /*
- * a key needs retry if it was processing when another marker that points
- * to the same bucket shards arrives. Instead of processing it, we mark
- * it as need_retry so that when we finish processing the original, we
- * retry the processing on the same bucket shard, in case there are more
- * entries to process. This closes a race that can happen.
- */
- bool need_retry(const string& key) {
- return (need_retry_set.find(key) != need_retry_set.end());
- }
-
- void reset_need_retry(const string& key) {
- need_retry_set.erase(key);
- }
};
class RGWRunBucketSyncCoroutine : public RGWCoroutine {
sync_marker(_marker),
marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
total_entries(0), reset_backoff(NULL) {
+ set_description() << "data sync shard source_zone=" << source_zone << " shard_id=" << shard_id;
}
~RGWDataSyncShardCR() {
#define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
-class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key> {
+class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
RGWRados *store;
RGWAsyncRadosProcessor *async_rados;
string marker_oid;
rgw_bucket_shard_full_sync_marker sync_marker;
-
public:
RGWBucketFullSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
const string& _marker_oid,
}
};
-class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string> {
+class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
RGWRados *store;
RGWAsyncRadosProcessor *async_rados;
string marker_oid;
rgw_bucket_shard_inc_sync_marker sync_marker;
+ map<rgw_obj_key, pair<RGWModifyOp, string> > key_to_marker;
+ map<string, rgw_obj_key> marker_to_key;
+
+ void handle_finish(const string& marker) {
+ map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
+ if (iter == marker_to_key.end()) {
+ return;
+ }
+ key_to_marker.erase(iter->second);
+ reset_need_retry(iter->second);
+ marker_to_key.erase(iter);
+ }
public:
RGWBucketIncSyncShardMarkerTrack(RGWRados *_store, RGWAsyncRadosProcessor *_async_rados,
return new RGWSimpleRadosWriteAttrsCR(async_rados, store, store->get_zone_params().log_pool,
marker_oid, attrs);
}
+
+ /*
+ * create index from key -> <op, marker>, and from marker -> key
+ * this is useful so that we can insure that we only have one
+ * entry for any key that is used. This is needed when doing
+ * incremenatl sync of data, and we don't want to run multiple
+ * concurrent sync operations for the same bucket shard
+ * Also, we should make sure that we don't run concurrent operations on the same key with
+ * different ops.
+ */
+ bool index_key_to_marker(const rgw_obj_key& key, RGWModifyOp op, const string& marker) {
+ if (key_to_marker.find(key) != key_to_marker.end()) {
+ set_need_retry(key);
+ return false;
+ }
+ key_to_marker[key] = make_pair<>(op, marker);
+ marker_to_key[marker] = key;
+ return true;
+ }
+
+ bool can_do_op(const rgw_obj_key& key, RGWModifyOp op) {
+ auto i = key_to_marker.find(key);
+ if (i == key_to_marker.end()) {
+ return true;
+ }
+
+ return (i->second.first == op);
+ }
};
-template <class T>
+template <class T, class K>
class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
RGWRados *store;
RGWAsyncRadosProcessor *async_rados;
RGWPendingState op_state;
T entry_marker;
- RGWSyncShardMarkerTrack<T> *marker_tracker;
+ RGWSyncShardMarkerTrack<T, K> *marker_tracker;
int sync_status;
const rgw_obj_key& _key, uint64_t _versioned_epoch,
utime_t& _timestamp,
RGWModifyOp _op, RGWPendingState _op_state,
- const T& _entry_marker, RGWSyncShardMarkerTrack<T> *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
+ const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker) : RGWCoroutine(_store->ctx()), store(_store),
async_rados(_async_rados),
source_zone(_source_zone),
bucket_info(_bucket_info), shard_id(_shard_id),
marker_tracker(_marker_tracker),
sync_status(0) {
set_description() << "bucket sync single entry (source_zone=" << source_zone << ") b=" << bucket_info->bucket << ":" << shard_id <<"/" << key << "[" << versioned_epoch << "] log_entry=" << entry_marker;
+ set_status("init");
}
int operate() {
if (op_state != CLS_RGW_STATE_COMPLETE) {
goto done;
}
- yield {
- if (op == CLS_RGW_OP_ADD ||
- op == CLS_RGW_OP_LINK_OLH) {
- if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
- set_status("skipping entry");
- ldout(store->ctx(), 10) << "bucket skipping sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
- return set_cr_done();
+ do {
+ yield {
+ marker_tracker->reset_need_retry(key);
+ if (op == CLS_RGW_OP_ADD ||
+ op == CLS_RGW_OP_LINK_OLH) {
+ if (op == CLS_RGW_OP_ADD && !key.instance.empty() && key.instance != "null") {
+ set_status("skipping entry");
+ ldout(store->ctx(), 10) << "bucket skipping sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]: versioned object will be synced on link_olh" << dendl;
+ goto done;
+ }
+ set_status("syncing obj");
+ ldout(store->ctx(), 5) << "bucket sync: sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
+ call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
+ key, versioned_epoch,
+ true));
+ } else if (op == CLS_RGW_OP_DEL) {
+ call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned_epoch, ×tamp));
}
- set_status("syncing obj");
- ldout(store->ctx(), 5) << "bucket sync: sync obj: " << source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch << "]" << dendl;
- call(new RGWFetchRemoteObjCR(async_rados, store, source_zone, *bucket_info,
- key, versioned_epoch,
- true));
- } else if (op == CLS_RGW_OP_DEL) {
- call(new RGWRemoveObjCR(async_rados, store, source_zone, *bucket_info, key, versioned_epoch, ×tamp));
}
- }
+ } while (marker_tracker->need_retry(key));
if (retcode < 0 && retcode != -ENOENT) {
- set_status("failed to sync obj");
+ set_status() << "failed to sync obj; retcode=" << retcode;
rgw_bucket& bucket = bucket_info->bucket;
ldout(store->ctx(), 0) << "ERROR: failed to sync object: " << bucket.name << ":" << bucket.bucket_id << ":" << shard_id << "/" << key << dendl;
sync_status = retcode;
}
done:
/* update marker */
+ set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
yield call(marker_tracker->finish(entry_marker));
if (sync_status == 0) {
sync_status = retcode;
} else {
RGWModifyOp op = (entry.key.instance.empty() || entry.key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
- spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
+ spawn(new RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
entry.key, entry.versioned_epoch, entry.mtime, op, CLS_RGW_STATE_COMPLETE, entry.key, marker_tracker), false);
}
}
list<rgw_bi_log_entry> list_result;
list<rgw_bi_log_entry>::iterator entries_iter;
rgw_bucket_shard_inc_sync_marker inc_marker;
+ rgw_obj_key key;
+ rgw_bi_log_entry *entry;
RGWBucketIncSyncShardMarkerTrack *marker_tracker;
int spawn_window;
+ bool updated_status;
+
public:
RGWBucketShardIncrementalSyncCR(RGWHTTPManager *_mgr, RGWAsyncRadosProcessor *_async_rados,
bucket_name(_bucket_name),
bucket_id(_bucket_id), shard_id(_shard_id),
bucket_info(_bucket_info),
- inc_marker(_inc_marker), marker_tracker(NULL),
- spawn_window(BUCKET_SYNC_SPAWN_WINDOW) {}
+ inc_marker(_inc_marker), entry(NULL), marker_tracker(NULL),
+ spawn_window(BUCKET_SYNC_SPAWN_WINDOW), updated_status(false) {
+ set_description() << "bucket shard incremental sync bucket=" << _bucket_name << ":" << _bucket_id << ":" << _shard_id;
+ set_status("init");
+ }
~RGWBucketShardIncrementalSyncCR() {
delete marker_tracker;
inc_marker);
do {
ldout(store->ctx(), 20) << __func__ << "(): listing bilog for incremental sync" << dendl;
+ set_status() << "listing bilog; position=" << inc_marker.position;
yield call(new RGWListBucketIndexLogCR(store, http_manager, async_rados, conn, bucket_name, bucket_id, shard_id,
inc_marker.position, &list_result));
if (retcode < 0 && retcode != -ENOENT) {
}
entries_iter = list_result.begin();
for (; entries_iter != list_result.end(); ++entries_iter) {
+ key = rgw_obj_key(entries_iter->object, entries_iter->instance);
+ entry = &(*entries_iter);
+ set_status() << "got entry.id=" << entry->id << " key=" << key << " op=" << (int)entry->op;
+ if (entry->op == CLS_RGW_OP_CANCEL) {
+ set_status() << "canceled operation, skipping";
+ ldout(store->ctx(), 20) << "[inc sync] skipping object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << ": canceled operation" << dendl;
+ continue;
+ }
+ ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
+ inc_marker.position = entry->id;
+ updated_status = false;
+ while (!marker_tracker->can_do_op(key, entry->op)) {
+ if (!updated_status) {
+ set_status() << "can't do op, conflicting inflight operation";
+ updated_status = true;
+ }
+ ldout(store->ctx(), 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
+ yield wait_for_child();
+
+ }
+ if (!marker_tracker->index_key_to_marker(key, entry->op, entry->id)) {
+ set_status() << "can't do op, sync already in progress for object";
+ ldout(store->ctx(), 20) << __func__ << ": skipping sync of entry: " << entry->id << ":" << key << " sync already in progress for object" << dendl;
+ continue;
+ }
yield {
- rgw_obj_key key(entries_iter->object, entries_iter->instance);
- ldout(store->ctx(), 20) << "[inc sync] syncing object: " << bucket_name << ":" << bucket_id << ":" << shard_id << "/" << key << dendl;
- rgw_bi_log_entry& entry = *entries_iter;
- inc_marker.position = entry.id;
- if (!marker_tracker->start(entry.id, 0, entries_iter->timestamp)) {
- ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry.id << ". Duplicate entry?" << dendl;
+ set_status() << "start object sync";
+ if (!marker_tracker->start(entry->id, 0, entries_iter->timestamp)) {
+ ldout(store->ctx(), 0) << "ERROR: cannot start syncing " << entry->id << ". Duplicate entry?" << dendl;
} else {
uint64_t versioned_epoch = 0;
- if (entry.ver.pool < 0) {
- versioned_epoch = entry.ver.epoch;
+ if (entry->ver.pool < 0) {
+ versioned_epoch = entry->ver.epoch;
}
- spawn(new RGWBucketSyncSingleEntryCR<string>(store, async_rados, source_zone, bucket_info, shard_id,
- key, versioned_epoch, entry.timestamp, entry.op, entry.state, entry.id, marker_tracker), false);
+ spawn(new RGWBucketSyncSingleEntryCR<string, rgw_obj_key>(store, async_rados, source_zone, bucket_info, shard_id,
+ key, versioned_epoch, entry->timestamp, entry->op, entry->state, entry->id, marker_tracker), false);
}
}
while ((int)num_spawned() > spawn_window) {
+ set_status() << "num_spawned() > spawn_window";
yield wait_for_child();
while (collect(&ret)) {
if (ret < 0) {