params.ns = RGW_OBJ_NS_MULTIPART;
params.access_list_filter = &mp_filter;
+ auto event_type = rgw::notify::ObjectExpirationAbortMPU;
+ std::string version_id;
+
auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
+ int ret{0};
auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
auto& [rule, obj] = wt;
if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
rgw_obj_key key(obj.key);
std::unique_ptr<rgw::sal::MultipartUpload> mpu = target->get_multipart_upload(key.name);
- int ret = mpu->abort(this, cct, null_yield);
+ std::unique_ptr<rgw::sal::Object> sal_obj
+ = target->get_object(key);
+ std::unique_ptr<rgw::sal::Notification> notify
+ = driver->get_notification(
+ this, sal_obj.get(), nullptr, event_type,
+ target, lc_id,
+ const_cast<std::string&>(target->get_tenant()),
+ lc_req_id, null_yield);
+
+ ret = notify->publish_reserve(this, nullptr);
+ if (ret != 0) {
+ ldpp_dout(wk->get_lc(), 0)
+ << "ERROR: reserving persistent notification for abort_multipart_upload, ret=" << ret
+ << ", thread:" << wq->thr_name()
+ << ", meta:" << obj.key
+ << dendl;
+ }
+
+ ret = mpu->abort(this, cct, null_yield);
if (ret == 0) {
- if (perfcounter) {
+
+ (void) notify->publish_commit(
+ this, sal_obj->get_obj_size(),
+ ceph::real_clock::now(),
+ sal_obj->get_attrs()[RGW_ATTR_ETAG].to_str(),
+ version_id);
+
+ if (perfcounter) {
perfcounter->inc(l_rgw_lc_abort_mpu, 1);
}
} else {
/* If bucket is versioned, create delete_marker for current version
*/
if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
- ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration);
+ ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectTransitionCurrent);
ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
} else {
- ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration);
+ ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectTransitionNoncurrent);
ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
}
return ret;
}
int transition_obj_to_cloud(lc_op_ctx& oc) {
+ int ret{0};
/* If CurrentVersion object, remove it & create delete marker */
bool delete_object = (!oc.tier->retain_head_object() ||
(oc.o.is_current() && oc.bucket->versioned()));
- int ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o,
- oc.env.worker->get_cloud_targets(), oc.cct,
- !delete_object, oc.dpp, null_yield);
+ /* notifications */
+ std::unique_ptr<rgw::sal::Bucket> bucket;
+ std::unique_ptr<rgw::sal::Object> obj;
+ auto& bucket_info = oc.bucket->get_info();
+ std::string version_id;
+
+ ret = oc.driver->get_bucket(nullptr, bucket_info, &bucket);
if (ret < 0) {
return ret;
}
+ std::unique_ptr<rgw::sal::User> user;
+ if (! bucket->get_owner()) {
+ auto& bucket_info = bucket->get_info();
+ user = oc.driver->get_user(bucket_info.owner);
+ if (user) {
+ bucket->set_owner(user.get());
+ }
+ }
+
+ obj = bucket->get_object(oc.o.key);
+
+ auto event_type = (oc.bucket->versioned() &&
+ oc.o.is_current() && !oc.o.is_delete_marker()) ?
+ rgw::notify::ObjectTransitionCurrent :
+ rgw::notify::ObjectTransitionNoncurrent;
+
+ std::unique_ptr<rgw::sal::Notification> notify
+ = oc.driver->get_notification(
+ oc.dpp, obj.get(), nullptr, event_type,
+ bucket.get(), lc_id,
+ const_cast<std::string&>(oc.bucket->get_tenant()),
+ lc_req_id, null_yield);
+
+ ret = notify->publish_reserve(oc.dpp, nullptr);
+ if (ret < 0) {
+ ldpp_dout(oc.dpp, 1)
+ << "ERROR: notify reservation failed, deferring transition of object k="
+ << oc.o.key
+ << dendl;
+ return ret;
+ }
+
+ ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o,
+ oc.env.worker->get_cloud_targets(),
+ oc.cct, !delete_object, oc.dpp,
+ null_yield);
+ if (ret < 0) {
+ return ret;
+ } else {
+ // send request to notification manager
+ (void) notify->publish_commit(oc.dpp, obj->get_obj_size(),
+ ceph::real_clock::now(),
+ obj->get_attrs()[RGW_ATTR_ETAG].to_str(),
+ version_id);
+ }
+
if (delete_object) {
ret = delete_tier_obj(oc);
if (ret < 0) {
worker->workpool->drain();
}
+ std::unique_ptr<rgw::sal::User> user;
+ if (! bucket->get_owner()) {
+ auto& bucket_info = bucket->get_info();
+ std::unique_ptr<rgw::sal::User> user = driver->get_user(bucket_info.owner);
+ if (user) {
+ bucket->set_owner(user.get());
+ }
+ }
+
ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once);
return ret;
}
import socket
import time
import os
+import io
import string
import boto
-from botocore.exceptions import ClientError
from http import server as http_server
from random import randint
import hashlib
+# XXX this should be converted to use pytest
from nose.plugins.attrib import attr
import boto3
import datetime
from cloudevents.http import from_http
from dateutil import parser
+# XXX this should be converted to use boto3
from boto.s3.connection import S3Connection
from . import(
@attr('basic_test')
def test_ps_s3_topic_on_master():
""" test s3 topics set/get/delete on master """
+
+ access_key = str(time.time())
+ secret_key = str(time.time())
+ uid = 'superman' + str(time.time())
tenant = 'kaboom'
- conn = another_user(tenant)
+ _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
+ assert_equal(result, 0)
+ conn = S3Connection(aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ is_secure=False, port=get_config_port(), host=get_config_host(),
+ calling_format='boto.s3.connection.OrdinaryCallingFormat')
zonegroup = 'default'
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
@attr('basic_test')
def test_ps_s3_topic_admin_on_master():
""" test s3 topics set/get/delete on master """
+
+ access_key = str(time.time())
+ secret_key = str(time.time())
+ uid = 'superman' + str(time.time())
tenant = 'kaboom'
- conn = another_user(tenant)
+ _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])
+ assert_equal(result, 0)
+ conn = S3Connection(aws_access_key_id=access_key,
+ aws_secret_access_key=secret_key,
+ is_secure=False, port=get_config_port(), host=get_config_host(),
+ calling_format='boto.s3.connection.OrdinaryCallingFormat')
zonegroup = 'default'
bucket_name = gen_bucket_name()
topic_name = bucket_name + TOPIC_SUFFIX
# delete the bucket
conn.delete_bucket(bucket_name)
-@attr('basic_test')
-def test_ps_s3_notification_permissions():
- """ test s3 notification set/get/delete permissions """
- conn1 = connection()
- conn2 = another_user()
- zonegroup = 'default'
- bucket_name = gen_bucket_name()
- # create bucket
- bucket = conn1.create_bucket(bucket_name)
- topic_name = bucket_name + TOPIC_SUFFIX
- # create s3 topic
- endpoint_address = 'amqp://127.0.0.1:7001'
- endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
- topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args)
- topic_arn = topic_conf.set_config()
-
- # one user create a notification
- notification_name = bucket_name + NOTIFICATION_SUFFIX
- topic_conf_list = [{'Id': notification_name,
- 'TopicArn': topic_arn,
- 'Events': []
- }]
- s3_notification_conf1 = PSNotificationS3(conn1, bucket_name, topic_conf_list)
- _, status = s3_notification_conf1.set_config()
- assert_equal(status, 200)
- # another user try to fetch it
- s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
- try:
- _, _ = s3_notification_conf2.get_config()
- assert False, "'AccessDenied' error is expected"
- except ClientError as error:
- assert_equal(error.response['Error']['Code'], 'AccessDenied')
- # other user try to delete the notification
- _, status = s3_notification_conf2.del_config()
- assert_equal(status, 403)
-
- # bucket policy is added by the 1st user
- client = boto3.client('s3',
- endpoint_url='http://'+conn1.host+':'+str(conn1.port),
- aws_access_key_id=conn1.aws_access_key_id,
- aws_secret_access_key=conn1.aws_secret_access_key)
- bucket_policy = json.dumps({
- "Version": "2012-10-17",
- "Statement": [
- {
- "Sid": "Statement",
- "Effect": "Allow",
- "Principal": "*",
- "Action": ["s3:GetBucketNotification", "s3:PutBucketNotification"],
- "Resource": f"arn:aws:s3:::{bucket_name}"
- }
- ]
- })
- response = client.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy)
- assert_equal(int(response['ResponseMetadata']['HTTPStatusCode']/100), 2)
- result = client.get_bucket_policy(Bucket=bucket_name)
- print(result['Policy'])
-
- # 2nd user try to fetch it again
- _, status = s3_notification_conf2.get_config()
- assert_equal(status, 200)
-
- # 2nd user try to delete it again
- result, status = s3_notification_conf2.del_config()
- assert_equal(status, 200)
-
- # 2nd user try to add another notification
- topic_conf_list = [{'Id': notification_name+"2",
- 'TopicArn': topic_arn,
- 'Events': []
- }]
- s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
- result, status = s3_notification_conf2.set_config()
- assert_equal(status, 200)
-
- # cleanup
- s3_notification_conf1.del_config()
- s3_notification_conf2.del_config()
- topic_conf.del_config()
- # delete the bucket
- conn1.delete_bucket(bucket_name)
@attr('amqp_test')
def test_ps_s3_notification_push_amqp_on_master():
conn.delete_bucket(bucket_name)
http_server.close()
+def start_and_abandon_multipart_upload(bucket, key_name, content):
+ try:
+ mp = bucket.initiate_multipart_upload(key_name)
+ part_data = io.StringIO(content)
+ mp.upload_part_from_file(part_data, 1)
+ # mp.complete_upload()
+ except Exception as e:
+ print('Error: ' + str(e))
+
+@attr('http_test')
+def test_ps_s3_lifecycle_abort_mpu_on_master():
+ """ test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """
+ hostname = get_ip()
+ conn = connection()
+ zonegroup = 'default'
+
+ # create random port for the http server
+ host = get_ip()
+ port = random.randint(10000, 20000)
+ # start an http server in a separate thread
+ number_of_objects = 1
+ http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+ # create bucket
+ bucket_name = gen_bucket_name()
+ bucket = conn.create_bucket(bucket_name)
+ topic_name = bucket_name + TOPIC_SUFFIX
+
+ # create s3 topic
+ endpoint_address = 'http://'+host+':'+str(port)
+ endpoint_args = 'push-endpoint='+endpoint_address
+ opaque_data = 'http://1.2.3.4:8888'
+ topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+ topic_arn = topic_conf.set_config()
+ # create s3 notification
+ notification_name = bucket_name + NOTIFICATION_SUFFIX
+ topic_conf_list = [{'Id': notification_name,
+ 'TopicArn': topic_arn,
+ 'Events': ['s3:ObjectLifecycle:Expiration:*']
+ }]
+ s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+ response, status = s3_notification_conf.set_config()
+ assert_equal(status/100, 2)
+
+ # start and abandon a multpart upload
+ # create objects in the bucket
+ obj_prefix = 'ooo'
+ start_time = time.time()
+ content = 'bar'
+
+ key_name = obj_prefix + str(1)
+ thr = threading.Thread(target = start_and_abandon_multipart_upload, args=(bucket, key_name, content,))
+ thr.start()
+ thr.join()
+
+ time_diff = time.time() - start_time
+ print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+
+ # create lifecycle policy -- assume rgw_lc_debug_interval=10 is in effect
+ client = boto3.client('s3',
+ endpoint_url='http://'+conn.host+':'+str(conn.port),
+ aws_access_key_id=conn.aws_access_key_id,
+ aws_secret_access_key=conn.aws_secret_access_key)
+ response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
+ LifecycleConfiguration={'Rules': [
+ {
+ 'ID': 'abort1',
+ 'Filter': {'Prefix': obj_prefix},
+ 'Status': 'Enabled',
+ 'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1},
+ }
+ ]
+ }
+ )
+
+ # start lifecycle processing
+ admin(['lc', 'process'])
+ print('wait for 20s (2 days) for the messages...')
+ time.sleep(20)
+
+ # check http receiver does not have messages
+ keys = list(bucket.list())
+ print('total number of objects: ' + str(len(keys)))
+ event_keys = []
+ events = http_server.get_and_reset_events()
+ for event in events:
+ # I hope Boto doesn't gak on the unknown eventName
+ assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMultipartUpload')
+ event_keys.append(event['Records'][0]['s3']['object']['key'])
+ for key in keys:
+ key_found = False
+ for event_key in event_keys:
+ if event_key == key:
+ key_found = True
+ break
+ if not key_found:
+ err = 'no lifecycle event found for key: ' + str(key)
+ log.error(events)
+ assert False, err
+
+ # cleanup
+ for key in keys:
+ key.delete()
+ topic_conf.del_config()
+ s3_notification_conf.del_config(notification=notification_name)
+ # delete the bucket
+ conn.delete_bucket(bucket_name)
+ http_server.close()
def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
""" test object creation s3 notifications in using put/copy/post on master"""