--- /dev/null
+tasks:
+- workunit:
+ clients:
+ client.0:
+ - rgw/run-versioning.sh
--- /dev/null
+#!/usr/bin/env python3
+
+import errno
+import subprocess
+import logging as log
+import boto3
+import botocore.exceptions
+
+log.basicConfig(format = '%(message)s', level=log.DEBUG)
+log.getLogger('botocore').setLevel(log.CRITICAL)
+log.getLogger('boto3').setLevel(log.CRITICAL)
+log.getLogger('urllib3').setLevel(log.CRITICAL)
+
+def exec_cmd(cmd, wait = True, **kwargs):
+ check_retcode = kwargs.pop('check_retcode', True)
+ kwargs['shell'] = True
+ kwargs['stdout'] = subprocess.PIPE
+ proc = subprocess.Popen(cmd, **kwargs)
+ log.info(proc.args)
+ if wait:
+ out, _ = proc.communicate()
+ if check_retcode:
+ assert(proc.returncode == 0)
+ return out
+ return (out, proc.returncode)
+ return ''
+
+def create_user(uid, display_name, access_key, secret_key):
+ _, ret = exec_cmd(f'radosgw-admin user create --uid {uid} --display-name "{display_name}" --access-key {access_key} --secret {secret_key}', check_retcode=False)
+ assert(ret == 0 or errno.EEXIST)
+
+def boto_connect(access_key, secret_key, config=None):
+ def try_connect(portnum, ssl, proto):
+ endpoint = proto + '://localhost:' + portnum
+ conn = boto3.resource('s3',
+ aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ use_ssl=ssl,
+ endpoint_url=endpoint,
+ verify=False,
+ config=config,
+ )
+ try:
+ list(conn.buckets.limit(1)) # just verify we can list buckets
+ except botocore.exceptions.ConnectionError as e:
+ print(e)
+ raise
+ print('connected to', endpoint)
+ return conn
+ try:
+ return try_connect('80', False, 'http')
+ except botocore.exceptions.ConnectionError:
+ try: # retry on non-privileged http port
+ return try_connect('8000', False, 'http')
+ except botocore.exceptions.ConnectionError:
+ # retry with ssl
+ return try_connect('443', True, 'https')
--- /dev/null
+#!/usr/bin/env bash
+set -ex
+
+# assume working ceph environment (radosgw-admin in path) and rgw on localhost:80
+# localhost::443 for ssl
+
+mydir=`dirname $0`
+
+python3 -m venv $mydir
+source $mydir/bin/activate
+pip install pip --upgrade
+pip install boto3
+
+## run test
+$mydir/bin/python3 $mydir/test_rgw_versioning.py
+
+deactivate
+echo OK.
+
#!/usr/bin/python3
import errno
-import logging as log
import time
-import subprocess
+import logging as log
import json
-import boto3
-import botocore.exceptions
import os
+from common import exec_cmd, boto_connect, create_user
"""
Rgw manual and dynamic resharding testing against a running instance
#
#
-log.basicConfig(format = '%(message)s', level=log.DEBUG)
-log.getLogger('botocore').setLevel(log.CRITICAL)
-log.getLogger('boto3').setLevel(log.CRITICAL)
-log.getLogger('urllib3').setLevel(log.CRITICAL)
-
""" Constants """
USER = 'tester'
DISPLAY_NAME = 'Testing'
VER_BUCKET_NAME = 'myver'
INDEX_POOL = 'default.rgw.buckets.index'
-def exec_cmd(cmd, wait = True, **kwargs):
- check_retcode = kwargs.pop('check_retcode', True)
- kwargs['shell'] = True
- kwargs['stdout'] = subprocess.PIPE
- proc = subprocess.Popen(cmd, **kwargs)
- log.info(proc.args)
- if wait:
- out, _ = proc.communicate()
- if check_retcode:
- assert(proc.returncode == 0)
- return out
- return (out, proc.returncode)
- return ('', 0)
-
class BucketStats:
def __init__(self, bucket_name, bucket_id, num_objs=0, size_kb=0, num_shards=0):
self.bucket_name = bucket_name
"""
execute manual and dynamic resharding commands
"""
- # create user
- _, ret = exec_cmd('radosgw-admin user create --uid {} --display-name {} --access-key {} --secret {}'.format(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY), check_retcode=False)
- assert(ret == 0 or errno.EEXIST)
-
- def boto_connect(portnum, ssl, proto):
- endpoint = proto + '://localhost:' + portnum
- conn = boto3.resource('s3',
- aws_access_key_id=ACCESS_KEY,
- aws_secret_access_key=SECRET_KEY,
- use_ssl=ssl,
- endpoint_url=endpoint,
- verify=False,
- config=None,
- )
- try:
- list(conn.buckets.limit(1)) # just verify we can list buckets
- except botocore.exceptions.ConnectionError as e:
- print(e)
- raise
- print('connected to', endpoint)
- return conn
-
- try:
- connection = boto_connect('80', False, 'http')
- except botocore.exceptions.ConnectionError:
- try: # retry on non-privileged http port
- connection = boto_connect('8000', False, 'http')
- except botocore.exceptions.ConnectionError:
- # retry with ssl
- connection = boto_connect('443', True, 'https')
+ create_user(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)
+
+ connection = boto_connect(ACCESS_KEY, SECRET_KEY)
# create a bucket
bucket = connection.create_bucket(Bucket=BUCKET_NAME)
exec_cmd('''radosgw-admin --inject-delay-at=do_reshard --inject-delay-ms=5000 \
bucket reshard --bucket {} --num-shards {}'''
.format(VER_BUCKET_NAME, num_shards + 1), wait = False)
+ time.sleep(1)
ver_bucket.put_object(Key='put_during_reshard', Body=b"some_data")
log.debug('put object successful')
--- /dev/null
+#!/usr/bin/env python3
+
+import logging as log
+import json
+import uuid
+import botocore
+from common import exec_cmd, create_user, boto_connect
+from botocore.config import Config
+
+"""
+Tests behavior of bucket versioning.
+"""
+# The test cases in this file have been annotated for inventory.
+# To extract the inventory (in csv format) use the command:
+#
+# grep '^ *# TESTCASE' | sed 's/^ *# TESTCASE //'
+#
+#
+
+""" Constants """
+USER = 'versioning-tester'
+DISPLAY_NAME = 'Versioning Testing'
+ACCESS_KEY = 'LTA662PVVDTDWX6M2AB0'
+SECRET_KEY = 'pvtchqajgzqx5581t6qbddbkj0bgf3a69qdkjcea'
+BUCKET_NAME = 'versioning-bucket'
+DATA_POOL = 'default.rgw.buckets.data'
+
+def main():
+ """
+ execute versioning tests
+ """
+ create_user(USER, DISPLAY_NAME, ACCESS_KEY, SECRET_KEY)
+
+ connection = boto_connect(ACCESS_KEY, SECRET_KEY, Config(retries = {
+ 'total_max_attempts': 1,
+ }))
+
+ # pre-test cleanup
+ try:
+ bucket = connection.Bucket(BUCKET_NAME)
+ bucket.objects.all().delete()
+ bucket.object_versions.all().delete()
+ bucket.delete()
+ except botocore.exceptions.ClientError as e:
+ if not e.response['Error']['Code'] == 'NoSuchBucket':
+ raise
+
+ bucket = connection.create_bucket(Bucket=BUCKET_NAME)
+ connection.BucketVersioning(BUCKET_NAME).enable()
+
+ # reproducer for bug from https://tracker.ceph.com/issues/59663
+ # TESTCASE 'verify that index entries and OLH objects are cleaned up after redundant deletes'
+ log.debug('TEST: verify that index entries and OLH objects are cleaned up after redundant deletes\n')
+ key = str(uuid.uuid4())
+ resp = bucket.Object(key).delete()
+ assert 'DeleteMarker' in resp, 'DeleteMarker key not present in response'
+ assert resp['DeleteMarker'], 'DeleteMarker value not True in response'
+ assert 'VersionId' in resp, 'VersionId key not present in response'
+ version_id = resp['VersionId']
+ bucket.Object(key).delete()
+ connection.ObjectVersion(bucket.name, key, version_id).delete()
+ # bucket index should now be empty
+ out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME}')
+ json_out = json.loads(out)
+ assert len(json_out) == 0, 'bucket index was not empty after all objects were deleted'
+
+ (_out, ret) = exec_cmd(f'rados -p {DATA_POOL} ls | grep {key}', check_retcode=False)
+ assert ret != 0, 'olh object was not cleaned up'
+
+ # TESTCASE 'verify that index entries and OLH objects are cleaned up after index linking error'
+ log.debug('TEST: verify that index entries and OLH objects are cleaned up after index linking error\n')
+ key = str(uuid.uuid4())
+ try:
+ exec_cmd('ceph config set client.rgw rgw_debug_inject_set_olh_err 2')
+ bucket.Object(key).delete()
+ finally:
+ exec_cmd('ceph config rm client.rgw rgw_debug_inject_set_olh_err')
+ out = exec_cmd(f'radosgw-admin bi list --bucket {BUCKET_NAME}')
+ json_out = json.loads(out)
+ assert len(json_out) == 0, 'bucket index was not empty after op failed'
+ (_out, ret) = exec_cmd(f'rados -p {DATA_POOL} ls | grep {key}', check_retcode=False)
+ assert ret != 0, 'olh object was not cleaned up'
+
+ # TESTCASE 'verify that original null object version is intact after failed olh upgrade'
+ log.debug('TEST: verify that original null object version is intact after failed olh upgrade\n')
+ connection.BucketVersioning(BUCKET_NAME).suspend()
+ key = str(uuid.uuid4())
+ put_resp = bucket.put_object(Key=key, Body=b"data")
+ connection.BucketVersioning(BUCKET_NAME).enable()
+ try:
+ exec_cmd('ceph config set client.rgw rgw_debug_inject_set_olh_err 2')
+ # expected to fail due to the above error injection
+ bucket.put_object(Key=key, Body=b"new data")
+ except Exception as e:
+ log.debug(e)
+ finally:
+ exec_cmd('ceph config rm client.rgw rgw_debug_inject_set_olh_err')
+ get_resp = bucket.Object(key).get()
+ assert put_resp.e_tag == get_resp['ETag'], 'get did not return null version with correct etag'
+
+ # Clean up
+ log.debug("Deleting bucket {}".format(BUCKET_NAME))
+ bucket.object_versions.all().delete()
+ bucket.delete()
+
+main()
+log.info("Completed bucket versioning tests")
- rgw
- rgw
min: 30
+- name: rgw_debug_inject_set_olh_err
+ type: uint
+ level: dev
+ desc: Whether to inject errors between rados olh modification initialization and
+ bucket index instance linking. The value determines the error code. This exists
+ for development and testing purposes to help simulate cases where bucket index
+ entries aren't cleaned up by the request thread after an error scenario.
+ default: 0
+ with_legacy: true
+ services:
+ - rgw
+- name: rgw_debug_inject_olh_cancel_modification_err
+ type: bool
+ level: dev
+ desc: Whether to inject an error to simulate a failure to cancel olh
+ modification. This exists for development and testing purposes.
+ default: false
+ with_legacy: true
+ services:
+ - rgw
- name: rgw_reshard_batch_size
type: uint
level: advanced
return rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, op, &outbl, null_yield);
}
+void RGWRados::olh_cancel_modification(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info,
+ RGWObjState& state, const rgw_obj& olh_obj,
+ const std::string& op_tag, optional_yield y)
+{
+ if (cct->_conf->rgw_debug_inject_olh_cancel_modification_err) {
+ // simulate the scenario where we fail to remove the pending xattr
+ return;
+ }
+
+ rgw_rados_ref ref;
+ int r = get_obj_head_ref(dpp, bucket_info, olh_obj, &ref);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << __func__ << " target_obj=" << olh_obj << " get_obj_head_ref() returned " << r << dendl;
+ return;
+ }
+ string attr_name = RGW_ATTR_OLH_PENDING_PREFIX;
+ attr_name.append(op_tag);
+
+ // first remove the relevant pending prefix
+ ObjectWriteOperation op;
+ bucket_index_guard_olh_op(dpp, state, op);
+ op.rmxattr(attr_name.c_str());
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, y);
+ if (r < 0) {
+ if (r != -ENOENT && r != -ECANCELED) {
+ ldpp_dout(dpp, 0) << __func__ << " target_obj=" << olh_obj << " rmxattr rgw_rados_operate() returned " << r << dendl;
+ }
+ return;
+ }
+
+ if (auto iter = state.attrset.find(RGW_ATTR_OLH_INFO); iter == state.attrset.end()) {
+ // attempt to remove the OLH object if there are no pending ops,
+ // its olh info attr is empty, and its tag hasn't changed
+ ObjectWriteOperation rm_op;
+ bucket_index_guard_olh_op(dpp, state, rm_op);
+ rm_op.cmpxattr(RGW_ATTR_OLH_INFO, CEPH_OSD_CMPXATTR_OP_EQ, bufferlist());
+ cls_obj_check_prefix_exist(rm_op, RGW_ATTR_OLH_PENDING_PREFIX, true);
+ rm_op.remove();
+ r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &rm_op, y);
+ }
+ if (r < 0 && (r != -ENOENT && r != -ECANCELED)) {
+ ldpp_dout(dpp, 0) << __func__ << " target_obj=" << olh_obj << " olh rm rgw_rados_operate() returned " << r << dendl;
+ }
+}
+
int RGWRados::olh_init_modification_impl(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, string *op_tag)
{
ObjectWriteOperation op;
r = rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &rm_op, y);
if (r == -ECANCELED) {
return r; /* someone else made a modification in the meantime */
- } else {
- /*
- * only clear if was successful, otherwise we might clobber pending operations on this object
- */
- r = bucket_index_clear_olh(dpp, bucket_info, tag, obj);
- if (r < 0) {
- ldpp_dout(dpp, 0) << "ERROR: could not clear bucket index olh entries r=" << r << dendl;
- return r;
- }
+ }
+ /*
+ * only clear if was successful, otherwise we might clobber pending operations on this object
+ */
+ r = bucket_index_clear_olh(dpp, bucket_info, tag, obj);
+ if (r < 0) {
+ ldpp_dout(dpp, 0) << "ERROR: could not clear bucket index olh entries r=" << r << dendl;
+ return r;
}
return 0;
}
}
return ret;
}
- ret = bucket_index_link_olh(dpp, bucket_info, *state, target_obj,
- delete_marker, op_tag, meta, olh_epoch, unmod_since,
- high_precision_time, y, zones_trace, log_data_change);
+ if (cct->_conf->rgw_debug_inject_set_olh_err) {
+ // fail here to simulate the scenario of an unlinked object instance
+ ret = -cct->_conf->rgw_debug_inject_set_olh_err;
+ } else {
+ ret = bucket_index_link_olh(dpp, bucket_info, *state, target_obj,
+ delete_marker, op_tag, meta, olh_epoch, unmod_since,
+ high_precision_time, y, zones_trace, log_data_change);
+ }
if (ret < 0) {
ldpp_dout(dpp, 20) << "bucket_index_link_olh() target_obj=" << target_obj << " delete_marker=" << (int)delete_marker << " returned " << ret << dendl;
+ olh_cancel_modification(dpp, bucket_info, *state, olh_obj, op_tag, y);
if (ret == -ECANCELED) {
// the bucket index rejected the link_olh() due to olh tag mismatch;
// attempt to reconstruct olh head attributes based on the bucket index
ret = bucket_index_unlink_instance(dpp, bucket_info, target_obj, op_tag, olh_tag, olh_epoch, zones_trace);
if (ret < 0) {
+ olh_cancel_modification(dpp, bucket_info, *state, olh_obj, op_tag, y);
ldpp_dout(dpp, 20) << "bucket_index_unlink_instance() target_obj=" << target_obj << " returned " << ret << dendl;
if (ret == -ECANCELED) {
continue;
const DoutPrefixProvider *dpp);
void bucket_index_guard_olh_op(const DoutPrefixProvider *dpp, RGWObjState& olh_state, librados::ObjectOperation& op);
+ void olh_cancel_modification(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, const std::string& op_tag, optional_yield y);
int olh_init_modification(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, std::string *op_tag);
int olh_init_modification_impl(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, std::string *op_tag);
int bucket_index_link_olh(const DoutPrefixProvider *dpp,