From: Matt Benjamin Date: Fri, 3 Feb 2023 21:46:05 +0000 (-0500) Subject: rgwlc: dispatch S3 notifications on transition and mpu abort X-Git-Tag: v19.0.0~159^2~8 X-Git-Url: http://git.apps.os.sepia.ceph.com/?a=commitdiff_plain;h=fc45a32734d72e2e7b569242581cb2adcd0738bf;p=ceph.git rgwlc: dispatch S3 notifications on transition and mpu abort Fixes: https://tracker.ceph.com/issues/58641 Signed-off-by: Matt Benjamin --- diff --git a/doc/radosgw/s3-notification-compatibility.rst b/doc/radosgw/s3-notification-compatibility.rst index 1627ed0c4db0f..cced60924d09e 100644 --- a/doc/radosgw/s3-notification-compatibility.rst +++ b/doc/radosgw/s3-notification-compatibility.rst @@ -91,7 +91,7 @@ Event Types +--------------------------------------------------------+-----------------------------------------+ | ``s3:ObjectLifecycle:Expiration:DeleteMarker`` | Ceph extension | +--------------------------------------------------------+-----------------------------------------+ -| ``s3:ObjectLifecycle:Expiration:AbortMultipartUpload`` | Defined, Ceph extension (not generated) | +| ``s3:ObjectLifecycle:Expiration:AbortMultipartUpload`` | Ceph extension | +--------------------------------------------------------+-----------------------------------------+ | ``s3:ObjectLifecycle:Transition:Current`` | Ceph extension | +--------------------------------------------------------+-----------------------------------------+ diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index 4887c9d146a6e..cf0b65110507b 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -828,15 +828,44 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target, params.ns = RGW_OBJ_NS_MULTIPART; params.access_list_filter = &mp_filter; + auto event_type = rgw::notify::ObjectExpirationAbortMPU; + std::string version_id; + auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) { + int ret{0}; auto wt = boost::get>(wi); auto& [rule, obj] = wt; if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) { rgw_obj_key key(obj.key); std::unique_ptr mpu = target->get_multipart_upload(key.name); - int ret = mpu->abort(this, cct, null_yield); + std::unique_ptr sal_obj + = target->get_object(key); + std::unique_ptr notify + = driver->get_notification( + this, sal_obj.get(), nullptr, event_type, + target, lc_id, + const_cast(target->get_tenant()), + lc_req_id, null_yield); + + ret = notify->publish_reserve(this, nullptr); + if (ret != 0) { + ldpp_dout(wk->get_lc(), 0) + << "ERROR: reserving persistent notification for abort_multipart_upload, ret=" << ret + << ", thread:" << wq->thr_name() + << ", meta:" << obj.key + << dendl; + } + + ret = mpu->abort(this, cct, null_yield); if (ret == 0) { - if (perfcounter) { + + (void) notify->publish_commit( + this, sal_obj->get_obj_size(), + ceph::real_clock::now(), + sal_obj->get_attrs()[RGW_ATTR_ETAG].to_str(), + version_id); + + if (perfcounter) { perfcounter->inc(l_rgw_lc_abort_mpu, 1); } } else { @@ -1268,27 +1297,78 @@ public: /* If bucket is versioned, create delete_marker for current version */ if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) { - ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration); + ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectTransitionCurrent); ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; } else { - ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration); + ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectTransitionNoncurrent); ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; } return ret; } int transition_obj_to_cloud(lc_op_ctx& oc) { + int ret{0}; /* If CurrentVersion object, remove it & create delete marker */ bool delete_object = (!oc.tier->retain_head_object() || (oc.o.is_current() && oc.bucket->versioned())); - int ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o, - oc.env.worker->get_cloud_targets(), oc.cct, - !delete_object, oc.dpp, null_yield); + /* notifications */ + std::unique_ptr bucket; + std::unique_ptr obj; + auto& bucket_info = oc.bucket->get_info(); + std::string version_id; + + ret = oc.driver->get_bucket(nullptr, bucket_info, &bucket); if (ret < 0) { return ret; } + std::unique_ptr user; + if (! bucket->get_owner()) { + auto& bucket_info = bucket->get_info(); + user = oc.driver->get_user(bucket_info.owner); + if (user) { + bucket->set_owner(user.get()); + } + } + + obj = bucket->get_object(oc.o.key); + + auto event_type = (oc.bucket->versioned() && + oc.o.is_current() && !oc.o.is_delete_marker()) ? + rgw::notify::ObjectTransitionCurrent : + rgw::notify::ObjectTransitionNoncurrent; + + std::unique_ptr notify + = oc.driver->get_notification( + oc.dpp, obj.get(), nullptr, event_type, + bucket.get(), lc_id, + const_cast(oc.bucket->get_tenant()), + lc_req_id, null_yield); + + ret = notify->publish_reserve(oc.dpp, nullptr); + if (ret < 0) { + ldpp_dout(oc.dpp, 1) + << "ERROR: notify reservation failed, deferring transition of object k=" + << oc.o.key + << dendl; + return ret; + } + + ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o, + oc.env.worker->get_cloud_targets(), + oc.cct, !delete_object, oc.dpp, + null_yield); + if (ret < 0) { + return ret; + } else { + // send request to notification manager + (void) notify->publish_commit(oc.dpp, obj->get_obj_size(), + ceph::real_clock::now(), + obj->get_attrs()[RGW_ATTR_ETAG].to_str(), + version_id); + } + if (delete_object) { ret = delete_tier_obj(oc); if (ret < 0) { @@ -1661,6 +1741,15 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker, worker->workpool->drain(); } + std::unique_ptr user; + if (! bucket->get_owner()) { + auto& bucket_info = bucket->get_info(); + std::unique_ptr user = driver->get_user(bucket_info.owner); + if (user) { + bucket->set_owner(user.get()); + } + } + ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once); return ret; } diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 4fd9cca12d625..87d3ca8eeaa56 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -7,18 +7,20 @@ import subprocess import socket import time import os +import io import string import boto -from botocore.exceptions import ClientError from http import server as http_server from random import randint import hashlib +# XXX this should be converted to use pytest from nose.plugins.attrib import attr import boto3 import datetime from cloudevents.http import from_http from dateutil import parser +# XXX this should be converted to use boto3 from boto.s3.connection import S3Connection from . import( @@ -553,8 +555,17 @@ def another_user(tenant=None): @attr('basic_test') def test_ps_s3_topic_on_master(): """ test s3 topics set/get/delete on master """ + + access_key = str(time.time()) + secret_key = str(time.time()) + uid = 'superman' + str(time.time()) tenant = 'kaboom' - conn = another_user(tenant) + _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"']) + assert_equal(result, 0) + conn = S3Connection(aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + is_secure=False, port=get_config_port(), host=get_config_host(), + calling_format='boto.s3.connection.OrdinaryCallingFormat') zonegroup = 'default' bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX @@ -625,8 +636,17 @@ def test_ps_s3_topic_on_master(): @attr('basic_test') def test_ps_s3_topic_admin_on_master(): """ test s3 topics set/get/delete on master """ + + access_key = str(time.time()) + secret_key = str(time.time()) + uid = 'superman' + str(time.time()) tenant = 'kaboom' - conn = another_user(tenant) + _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"']) + assert_equal(result, 0) + conn = S3Connection(aws_access_key_id=access_key, + aws_secret_access_key=secret_key, + is_secure=False, port=get_config_port(), host=get_config_host(), + calling_format='boto.s3.connection.OrdinaryCallingFormat') zonegroup = 'default' bucket_name = gen_bucket_name() topic_name = bucket_name + TOPIC_SUFFIX @@ -1216,87 +1236,6 @@ def test_ps_s3_notification_errors_on_master(): # delete the bucket conn.delete_bucket(bucket_name) -@attr('basic_test') -def test_ps_s3_notification_permissions(): - """ test s3 notification set/get/delete permissions """ - conn1 = connection() - conn2 = another_user() - zonegroup = 'default' - bucket_name = gen_bucket_name() - # create bucket - bucket = conn1.create_bucket(bucket_name) - topic_name = bucket_name + TOPIC_SUFFIX - # create s3 topic - endpoint_address = 'amqp://127.0.0.1:7001' - endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none' - topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args) - topic_arn = topic_conf.set_config() - - # one user create a notification - notification_name = bucket_name + NOTIFICATION_SUFFIX - topic_conf_list = [{'Id': notification_name, - 'TopicArn': topic_arn, - 'Events': [] - }] - s3_notification_conf1 = PSNotificationS3(conn1, bucket_name, topic_conf_list) - _, status = s3_notification_conf1.set_config() - assert_equal(status, 200) - # another user try to fetch it - s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list) - try: - _, _ = s3_notification_conf2.get_config() - assert False, "'AccessDenied' error is expected" - except ClientError as error: - assert_equal(error.response['Error']['Code'], 'AccessDenied') - # other user try to delete the notification - _, status = s3_notification_conf2.del_config() - assert_equal(status, 403) - - # bucket policy is added by the 1st user - client = boto3.client('s3', - endpoint_url='http://'+conn1.host+':'+str(conn1.port), - aws_access_key_id=conn1.aws_access_key_id, - aws_secret_access_key=conn1.aws_secret_access_key) - bucket_policy = json.dumps({ - "Version": "2012-10-17", - "Statement": [ - { - "Sid": "Statement", - "Effect": "Allow", - "Principal": "*", - "Action": ["s3:GetBucketNotification", "s3:PutBucketNotification"], - "Resource": f"arn:aws:s3:::{bucket_name}" - } - ] - }) - response = client.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy) - assert_equal(int(response['ResponseMetadata']['HTTPStatusCode']/100), 2) - result = client.get_bucket_policy(Bucket=bucket_name) - print(result['Policy']) - - # 2nd user try to fetch it again - _, status = s3_notification_conf2.get_config() - assert_equal(status, 200) - - # 2nd user try to delete it again - result, status = s3_notification_conf2.del_config() - assert_equal(status, 200) - - # 2nd user try to add another notification - topic_conf_list = [{'Id': notification_name+"2", - 'TopicArn': topic_arn, - 'Events': [] - }] - s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list) - result, status = s3_notification_conf2.set_config() - assert_equal(status, 200) - - # cleanup - s3_notification_conf1.del_config() - s3_notification_conf2.del_config() - topic_conf.del_config() - # delete the bucket - conn1.delete_bucket(bucket_name) @attr('amqp_test') def test_ps_s3_notification_push_amqp_on_master(): @@ -2039,6 +1978,114 @@ def test_ps_s3_lifecycle_on_master(): conn.delete_bucket(bucket_name) http_server.close() +def start_and_abandon_multipart_upload(bucket, key_name, content): + try: + mp = bucket.initiate_multipart_upload(key_name) + part_data = io.StringIO(content) + mp.upload_part_from_file(part_data, 1) + # mp.complete_upload() + except Exception as e: + print('Error: ' + str(e)) + +@attr('http_test') +def test_ps_s3_lifecycle_abort_mpu_on_master(): + """ test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """ + hostname = get_ip() + conn = connection() + zonegroup = 'default' + + # create random port for the http server + host = get_ip() + port = random.randint(10000, 20000) + # start an http server in a separate thread + number_of_objects = 1 + http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects) + + # create bucket + bucket_name = gen_bucket_name() + bucket = conn.create_bucket(bucket_name) + topic_name = bucket_name + TOPIC_SUFFIX + + # create s3 topic + endpoint_address = 'http://'+host+':'+str(port) + endpoint_args = 'push-endpoint='+endpoint_address + opaque_data = 'http://1.2.3.4:8888' + topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data) + topic_arn = topic_conf.set_config() + # create s3 notification + notification_name = bucket_name + NOTIFICATION_SUFFIX + topic_conf_list = [{'Id': notification_name, + 'TopicArn': topic_arn, + 'Events': ['s3:ObjectLifecycle:Expiration:*'] + }] + s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list) + response, status = s3_notification_conf.set_config() + assert_equal(status/100, 2) + + # start and abandon a multpart upload + # create objects in the bucket + obj_prefix = 'ooo' + start_time = time.time() + content = 'bar' + + key_name = obj_prefix + str(1) + thr = threading.Thread(target = start_and_abandon_multipart_upload, args=(bucket, key_name, content,)) + thr.start() + thr.join() + + time_diff = time.time() - start_time + print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds') + + # create lifecycle policy -- assume rgw_lc_debug_interval=10 is in effect + client = boto3.client('s3', + endpoint_url='http://'+conn.host+':'+str(conn.port), + aws_access_key_id=conn.aws_access_key_id, + aws_secret_access_key=conn.aws_secret_access_key) + response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name, + LifecycleConfiguration={'Rules': [ + { + 'ID': 'abort1', + 'Filter': {'Prefix': obj_prefix}, + 'Status': 'Enabled', + 'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1}, + } + ] + } + ) + + # start lifecycle processing + admin(['lc', 'process']) + print('wait for 20s (2 days) for the messages...') + time.sleep(20) + + # check http receiver does not have messages + keys = list(bucket.list()) + print('total number of objects: ' + str(len(keys))) + event_keys = [] + events = http_server.get_and_reset_events() + for event in events: + # I hope Boto doesn't gak on the unknown eventName + assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMultipartUpload') + event_keys.append(event['Records'][0]['s3']['object']['key']) + for key in keys: + key_found = False + for event_key in event_keys: + if event_key == key: + key_found = True + break + if not key_found: + err = 'no lifecycle event found for key: ' + str(key) + log.error(events) + assert False, err + + # cleanup + for key in keys: + key.delete() + topic_conf.del_config() + s3_notification_conf.del_config(notification=notification_name) + # delete the bucket + conn.delete_bucket(bucket_name) + http_server.close() def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'): """ test object creation s3 notifications in using put/copy/post on master"""