rgw_bucket_shard_inc_sync_marker sync_marker;
map<rgw_obj_key, string> key_to_marker;
- map<string, rgw_obj_key> marker_to_key;
+
+ struct operation {
+ rgw_obj_key key;
+ bool is_olh;
+ };
+ map<string, operation> marker_to_op;
+ std::set<std::string> pending_olh; // object names with pending olh operations
RGWSyncTraceNodeRef tn;
void handle_finish(const string& marker) override {
- map<string, rgw_obj_key>::iterator iter = marker_to_key.find(marker);
- if (iter == marker_to_key.end()) {
+ auto iter = marker_to_op.find(marker);
+ if (iter == marker_to_op.end()) {
return;
}
- key_to_marker.erase(iter->second);
- reset_need_retry(iter->second);
- marker_to_key.erase(iter);
+ auto& op = iter->second;
+ key_to_marker.erase(op.key);
+ reset_need_retry(op.key);
+ if (op.is_olh) {
+ pending_olh.erase(op.key.name);
+ }
+ marker_to_op.erase(iter);
}
public:
* Also, we should make sure that we don't run concurrent operations on the same key with
* different ops.
*/
- bool index_key_to_marker(const rgw_obj_key& key, const string& marker) {
- if (key_to_marker.find(key) != key_to_marker.end()) {
+ bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
+ auto result = key_to_marker.emplace(key, marker);
+ if (!result.second) { // exists
set_need_retry(key);
return false;
}
- key_to_marker[key] = marker;
- marker_to_key[marker] = key;
+ marker_to_op[marker] = operation{key, is_olh};
+ if (is_olh) {
+ // prevent other olh ops from starting on this object name
+ pending_olh.insert(key.name);
+ }
return true;
}
- bool can_do_op(const rgw_obj_key& key) {
+ bool can_do_op(const rgw_obj_key& key, bool is_olh) {
+ // serialize olh ops on the same object name
+ if (is_olh && pending_olh.count(key.name)) {
+ tn->log(20, SSTR("sync of " << key << " waiting for pending olh op"));
+ return false;
+ }
return (key_to_marker.find(key) == key_to_marker.end());
}
tn->log(20, SSTR("syncing object: "
<< bucket_shard_str{bs} << "/" << key));
updated_status = false;
- while (!marker_tracker.can_do_op(key)) {
+ while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
if (!updated_status) {
set_status() << "can't do op, conflicting inflight operation";
updated_status = true;
}
}
}
- if (!marker_tracker.index_key_to_marker(key, cur_id)) {
+ if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
set_status() << "can't do op, sync already in progress for object";
tn->log(20, SSTR("skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object"));
marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);