static void update_olh_log(rgw_bucket_olh_entry& olh_data_entry, OLHLogOp op, const string& op_tag,
cls_rgw_obj_key& key, bool delete_marker, uint64_t epoch)
{
- vector<rgw_bucket_olh_log_entry>& log = olh_data_entry.pending_log[olh_data_entry.epoch];
+ vector<rgw_bucket_olh_log_entry>& log = olh_data_entry.pending_log[epoch];
rgw_bucket_olh_log_entry log_entry;
log_entry.epoch = epoch;
log_entry.op = op;
int write(uint64_t epoch, bool current, rgw_bucket_dir_header& header) {
if (instance_entry.versioned_epoch > 0) {
- CLS_LOG(20, "%s: instance_entry.versioned_epoch=%d epoch=%d", __func__, (int)instance_entry.versioned_epoch, (int)epoch);
+ CLS_LOG(20, "%s: instance_entry.versioned_epoch=%lu epoch=%lu", __func__, instance_entry.versioned_epoch, epoch);
/* this instance has a previous list entry, remove that entry */
int ret = unlink_list_entry(header);
if (ret < 0) {
* This timestamp is then used later on to guard against OLH updates for add/remove instance ops that happened *before*
* the latest op that updated the OLH entry.
* @param candidate_epoch - this is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
+ * @param replace - true to replace the epoch if larger than the old one;
*/
- bool start_modify (uint64_t candidate_epoch) {
+ bool start_modify (uint64_t candidate_epoch, bool replace = true) {
// only update the olh.epoch if it is newer than the current one.
if (candidate_epoch < olh_data_entry.epoch) {
return false; /* olh cannot be modified, old epoch */
}
- olh_data_entry.epoch = candidate_epoch;
+ if (replace) {
+ olh_data_entry.epoch = candidate_epoch;
+ }
return true;
}
return olh_data_entry.epoch;
}
+ void set_epoch(uint64_t epoch) {
+ olh_data_entry.epoch = epoch;
+ }
+
rgw_bucket_olh_entry& get_entry() {
return olh_data_entry;
}
cls_rgw_obj_key& key,
bool demote_current,
bool instance_only,
- rgw_bucket_dir_header& header)
+ rgw_bucket_dir_header& header,
+ uint64_t& versioned_epoch)
{
if (!key.instance.empty()) {
return -EINVAL;
return ret;
}
- entry.versioned_epoch = 1; /* converted entries are always 1 */
+ entry.versioned_epoch = versioned_epoch =
+ duration_cast<std::chrono::nanoseconds>(entry.meta.mtime.time_since_epoch()).count();
entry.flags |= rgw_bucket_dir_entry::FLAG_VER;
if (demote_current) {
const uint64_t prev_epoch = olh.get_epoch();
// op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
- uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch :
- duration_cast<std::chrono::nanoseconds>(obj.mtime().time_since_epoch()).count();
+ uint64_t now_epoch = duration_cast<std::chrono::nanoseconds>(real_clock::now().time_since_epoch()).count();
+ uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch : now_epoch;
if (olh.start_modify(candidate_epoch)) {
// promote this version to current if it's a newer epoch, or if it matches the
// current epoch and sorts after the current instance
} else {
bool instance_only = (op.key.instance.empty() && op.delete_marker);
cls_rgw_obj_key key(op.key.name);
- ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header);
+ uint64_t versioned_epoch = candidate_epoch;
+ ret = convert_plain_entry_to_versioned(hctx, key, promote, instance_only, header, versioned_epoch);
if (ret < 0) {
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
return ret;
}
olh.set_tag(op.olh_tag);
if (op.key.instance.empty()) {
- obj.set_epoch(1);
+ obj.set_epoch(versioned_epoch);
}
}
/* update the olh log */
- olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker);
+ olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, op.key, op.delete_marker, now_epoch);
if (removing) {
- olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
+ olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch);
}
if (promote) {
}
ret = olh.write(header);
- }
- else {
+ } else {
ret = obj.write(candidate_epoch, false, header);
if (ret < 0) {
return ret;
// the epoch is already stale compared to the current - so no point in applying it;
if (removing) {
- olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch);
- ret = olh.write(header);
+ olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch);
+ } else {
+ olh.update_log(CLS_RGW_OLH_OP_STALE, op.op_tag, op.key, false, now_epoch);
}
+ ret = olh.write(header);
}
if (ret < 0) {
return ret;
}
+ uint64_t now_epoch = duration_cast<std::chrono::nanoseconds>(real_clock::now().time_since_epoch()).count();
+ uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch : now_epoch;
+
if (!olh_found) {
bool instance_only = false;
cls_rgw_obj_key key(dest_key.name);
- ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, header);
+ uint64_t versioned_epoch = candidate_epoch - 1;
+ ret = convert_plain_entry_to_versioned(hctx, key, true, instance_only, header, versioned_epoch);
if (ret < 0) {
CLS_LOG(0, "ERROR: convert_plain_entry_to_versioned ret=%d", ret);
return ret;
olh.update(dest_key, false);
olh.set_tag(op.olh_tag);
- obj.set_epoch(1);
+ obj.set_epoch(versioned_epoch);
}
// op.olh_epoch is provided (> 0) in the case when a remote epoch is coming in as the result of multisite sync;
- uint64_t candidate_epoch = op.olh_epoch ? op.olh_epoch :
- duration_cast<std::chrono::nanoseconds>(real_clock::now().time_since_epoch()).count();
- if (olh.start_modify(candidate_epoch)) {
+ if (olh.start_modify(candidate_epoch, false)) {
rgw_bucket_olh_entry &olh_entry = olh.get_entry();
cls_rgw_obj_key &olh_key = olh_entry.key;
CLS_LOG(20, "%s: updating olh log: existing olh entry: %s[%s] (delete_marker=%d)", __func__,
if (found) {
BIVerObjEntry next(hctx, next_key);
- ret = next.write(olh.get_epoch(), true, header);
+ ret = next.init();
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: next.init() returned ret=%d", ret);
+ return ret;
+ }
+
+ uint64_t next_epoch = next.get_dir_entry().versioned_epoch;
+ ret = next.write(next_epoch, true, header);
if (ret < 0) {
CLS_LOG(0, "ERROR: next.write() returned ret=%d", ret);
return ret;
next_key.name.c_str(), next_key.instance.c_str(), (int) next.is_delete_marker());
olh.update(next_key, next.is_delete_marker());
- olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker());
+ olh.update_log(CLS_RGW_OLH_OP_LINK_OLH, op.op_tag, next_key, next.is_delete_marker(), now_epoch);
+ // use the next entry's versioned_epoch in the olh entry since it's the new head now
+ olh.set_epoch(next_epoch);
} else {
// next_key is empty, but we need to preserve its name in case this entry
// gets resharded, because this key is used for hash placement
next_key.name = dest_key.name;
olh.update(next_key, false);
- olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false);
+ if (olh.get_epoch() == 0) {
+ olh.set_epoch(candidate_epoch);
+ }
+ olh.update_log(CLS_RGW_OLH_OP_UNLINK_OLH, op.op_tag, next_key, false, now_epoch);
olh.set_exists(false);
olh.set_pending_removal(true);
}
}
if (!obj.is_delete_marker()) {
- olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false);
+ olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch);
} else {
+ olh.update_log(CLS_RGW_OLH_OP_STALE, op.op_tag, op.key, false, now_epoch);
+
/* this is a delete marker, it's our responsibility to remove its
* instance entry */
ret = obj.unlink(header, op.key);
}
if (obj.is_delete_marker()) {
- return 0;
+ olh.update_log(CLS_RGW_OLH_OP_STALE, op.op_tag, op.key, false, now_epoch);
+
+ ret = olh.write(header);
+ if (ret < 0) {
+ CLS_LOG(0, "ERROR: failed to update olh ret=%d", ret);
+ }
+
+ return ret;
}
- olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, candidate_epoch);
+ olh.update_log(CLS_RGW_OLH_OP_REMOVE_INSTANCE, op.op_tag, op.key, false, now_epoch);
}
ret = olh.write(header);
string attr_name = RGW_ATTR_OLH_PENDING_PREFIX;
attr_name.append(*op_tag);
+ ldpp_dout(dpp, 20) << __func__ << " adding olh pending attr: " << attr_name << dendl;
op.setxattr(attr_name.c_str(), bl);
int ret = obj_operate(dpp, bucket_info, olh_obj, std::move(op), y);
uint64_t link_epoch = 0;
cls_rgw_obj_key key;
bool delete_marker = false;
- list<cls_rgw_obj_key> remove_instances;
+ set<cls_rgw_obj_key> remove_instances;
bool need_to_remove = false;
// decode current epoch and instance
<< " key=" << entry.key.name << "[" << entry.key.instance << "] "
<< (entry.delete_marker ? "(delete)" : "") << dendl;
- if (link_epoch == iter->first)
+ if (link_epoch == entry.epoch)
ldpp_dout(dpp, 1) << "apply_olh_log epoch collision detected for " << entry.key
<< "; incoming op: " << entry.op << "(" << entry.op_tag << ")" << dendl;
switch (entry.op) {
case CLS_RGW_OLH_OP_REMOVE_INSTANCE:
- remove_instances.push_back(entry.key);
+ remove_instances.insert(entry.key);
break;
case CLS_RGW_OLH_OP_LINK_OLH:
// only overwrite a link of the same epoch if its key sorts before
- if (link_epoch < iter->first || key.instance.empty() ||
+ // or there is a CLS_RGW_OLH_OP_UNLINK_OLH before this op in this batch
+ if (need_to_remove || link_epoch < entry.epoch || key.instance.empty() ||
key.instance > entry.key.instance) {
ldpp_dout(dpp, 20) << "apply_olh_log applying key=" << entry.key << " epoch=" << iter->first << " delete_marker=" << entry.delete_marker
<< " over current=" << key << " epoch=" << link_epoch << " delete_marker=" << delete_marker << dendl;
+
+ if (need_to_remove) {
+ // cancel the instance removal if it's linked again -- e.g. coming from a multisite remote zone
+ remove_instances.erase(entry.key);
+ }
need_to_link = true;
need_to_remove = false;
key = entry.key;
need_to_remove = true;
need_to_link = false;
break;
+ case CLS_RGW_OLH_OP_STALE:
+ break;
default:
ldpp_dout(dpp, 0) << "ERROR: apply_olh_log: invalid op: " << (int)entry.op << dendl;
return -EIO;
}
/* first remove object instances */
- for (list<cls_rgw_obj_key>::iterator liter = remove_instances.begin();
+ for (set<cls_rgw_obj_key>::iterator liter = remove_instances.begin();
liter != remove_instances.end(); ++liter) {
- cls_rgw_obj_key& key = *liter;
+ const cls_rgw_obj_key& key = *liter;
rgw_obj obj_instance(bucket, key);
int ret = delete_obj(dpp, obj_ctx, bucket_info, obj_instance, 0, y,
null_verid, RGW_BILOG_FLAG_VERSIONED_OP,
zonegroup_data_checkpoint(zonegroup_conns)
+@attr('bucket_sync_disable')
+def test_versioned_bucket_sync_disable_enable_object_delete():
+ zonegroup = realm.master_zonegroup()
+ zonegroup_conns = ZonegroupConns(zonegroup)
+
+ primary = zonegroup_conns.rw_zones[0]
+ secondary = zonegroup_conns.rw_zones[1]
+
+ # create a bucket
+ bucket = primary.create_bucket(gen_bucket_name())
+ log.debug('created bucket=%s', bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # enable versioning
+ primary.s3_client.put_bucket_versioning(
+ Bucket=bucket.name,
+ VersioningConfiguration={'Status': 'Enabled'}
+ )
+ zonegroup_meta_checkpoint(zonegroup)
+
+ obj = 'obj'
+
+ # upload an initial object
+ resp1 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+ version_id_1 = resp1.get('VersionId', 'null')
+ log.debug('created initial version id=%s', version_id_1)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # upload the second version
+ resp2 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+ version_id_2 = resp2['VersionId']
+ log.debug('created new version id=%s', version_id_2)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # upload the third version
+ resp3 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+ version_id_3 = resp3['VersionId']
+ log.debug('created new version id=%s', version_id_3)
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # test deleting the non-head from the secondary
+
+ log.debug(f"Disabling bucket sync for bucket:{bucket.name}")
+ disable_bucket_sync(realm.meta_master_zone(), bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # upload the fourth version - do this before the following object delete
+ # so it has a slightly smaller epoch
+ resp4 = primary.s3_client.put_object(Bucket=bucket.name, Key=obj, Body='')
+ version_id_4 = resp4['VersionId']
+ log.debug('created new version id=%s', version_id_4)
+
+ # Delete the second object version
+ cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_2]
+ secondary.zone.cluster.admin(cmd)
+
+ log.debug(f"Enabling bucket sync for bucket:{bucket.name}")
+ enable_bucket_sync(realm.meta_master_zone(), bucket.name)
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # test deleting the head from the secondary
+
+ log.debug(f"Disabling bucket sync for bucket:{bucket.name}")
+ disable_bucket_sync(realm.meta_master_zone(), bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # Delete the fourth object version
+ cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_4]
+ secondary.zone.cluster.admin(cmd)
+
+ log.debug(f"Enabling bucket sync for bucket:{bucket.name}")
+ enable_bucket_sync(realm.meta_master_zone(), bucket.name)
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
+ # test deleting everything from the secondary
+
+ log.debug(f"Disabling bucket sync for bucket:{bucket.name}")
+ disable_bucket_sync(realm.meta_master_zone(), bucket.name)
+ zonegroup_meta_checkpoint(zonegroup)
+
+ # Delete all object versions
+ cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_1]
+ secondary.zone.cluster.admin(cmd)
+ cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_2]
+ secondary.zone.cluster.admin(cmd)
+ cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_3]
+ secondary.zone.cluster.admin(cmd)
+ cmd = ['object', 'rm', '--bucket', bucket.name, '--object', obj, '--object-version', version_id_4]
+ secondary.zone.cluster.admin(cmd)
+
+ log.debug(f"Enabling bucket sync for bucket:{bucket.name}")
+ enable_bucket_sync(realm.meta_master_zone(), bucket.name)
+
+ zonegroup_bucket_checkpoint(zonegroup_conns, bucket.name)
+
def test_multipart_object_sync():
zonegroup = realm.master_zonegroup()
zonegroup_conns = ZonegroupConns(zonegroup)