]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: dispatch S3 notifications on transition and mpu abort
authorMatt Benjamin <mbenjamin@redhat.com>
Fri, 3 Feb 2023 21:46:05 +0000 (16:46 -0500)
committerMatt Benjamin <mbenjamin@redhat.com>
Fri, 3 Nov 2023 15:41:38 +0000 (11:41 -0400)
Fixes: https://tracker.ceph.com/issues/58641
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
doc/radosgw/s3-notification-compatibility.rst
src/rgw/rgw_lc.cc
src/test/rgw/bucket_notification/test_bn.py

index 1627ed0c4db0f7953fd907165d922c6e35051e05..cced60924d09eb080d93a27211b5b0b3b3f1c00c 100644 (file)
@@ -91,7 +91,7 @@ Event Types
 +--------------------------------------------------------+-----------------------------------------+
 | ``s3:ObjectLifecycle:Expiration:DeleteMarker``         | Ceph extension                          |
 +--------------------------------------------------------+-----------------------------------------+
-| ``s3:ObjectLifecycle:Expiration:AbortMultipartUpload`` | Defined, Ceph extension (not generated) |
+| ``s3:ObjectLifecycle:Expiration:AbortMultipartUpload`` | Ceph extension                          |
 +--------------------------------------------------------+-----------------------------------------+
 | ``s3:ObjectLifecycle:Transition:Current``              | Ceph extension                          |
 +--------------------------------------------------------+-----------------------------------------+
index 4887c9d146a6e96354051ebf607fae1e0c69ebe9..cf0b65110507b0d5b5ed518d378300fe31a7c57c 100644 (file)
@@ -828,15 +828,44 @@ int RGWLC::handle_multipart_expiration(rgw::sal::Bucket* target,
   params.ns = RGW_OBJ_NS_MULTIPART;
   params.access_list_filter = &mp_filter;
 
+  auto event_type = rgw::notify::ObjectExpirationAbortMPU;
+  std::string version_id;
+
   auto pf = [&](RGWLC::LCWorker* wk, WorkQ* wq, WorkItem& wi) {
+    int ret{0};
     auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
     auto& [rule, obj] = wt;
     if (obj_has_expired(this, cct, obj.meta.mtime, rule.mp_expiration)) {
       rgw_obj_key key(obj.key);
       std::unique_ptr<rgw::sal::MultipartUpload> mpu = target->get_multipart_upload(key.name);
-      int ret = mpu->abort(this, cct, null_yield);
+      std::unique_ptr<rgw::sal::Object> sal_obj
+       = target->get_object(key);
+      std::unique_ptr<rgw::sal::Notification> notify
+       = driver->get_notification(
+         this, sal_obj.get(), nullptr, event_type,
+         target, lc_id,
+         const_cast<std::string&>(target->get_tenant()),
+         lc_req_id, null_yield);
+
+      ret = notify->publish_reserve(this, nullptr);
+      if (ret != 0) {
+       ldpp_dout(wk->get_lc(), 0)
+         << "ERROR: reserving persistent notification for abort_multipart_upload, ret=" << ret
+         << ", thread:" << wq->thr_name()
+         << ", meta:" << obj.key
+         << dendl;
+      }
+
+      ret = mpu->abort(this, cct, null_yield);
       if (ret == 0) {
-        if (perfcounter) {
+
+        (void) notify->publish_commit(
+         this, sal_obj->get_obj_size(),
+         ceph::real_clock::now(),
+         sal_obj->get_attrs()[RGW_ATTR_ETAG].to_str(),
+         version_id);
+
+       if (perfcounter) {
           perfcounter->inc(l_rgw_lc_abort_mpu, 1);
         }
       } else {
@@ -1268,27 +1297,78 @@ public:
     /* If bucket is versioned, create delete_marker for current version
      */
     if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
-      ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration);
+      ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectTransitionCurrent);
       ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch:  " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
     } else {
-      ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration);
+      ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectTransitionNoncurrent);
       ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch:  " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
     }
     return ret;
   }
 
   int transition_obj_to_cloud(lc_op_ctx& oc) {
+    int ret{0};
     /* If CurrentVersion object, remove it & create delete marker */
     bool delete_object = (!oc.tier->retain_head_object() ||
                      (oc.o.is_current() && oc.bucket->versioned()));
 
-    int ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o,
-                                         oc.env.worker->get_cloud_targets(), oc.cct,
-                                         !delete_object, oc.dpp, null_yield);
+    /* notifications */
+    std::unique_ptr<rgw::sal::Bucket> bucket;
+    std::unique_ptr<rgw::sal::Object> obj;
+    auto& bucket_info = oc.bucket->get_info();
+    std::string version_id;
+
+    ret = oc.driver->get_bucket(nullptr, bucket_info, &bucket);
     if (ret < 0) {
       return ret;
     }
 
+    std::unique_ptr<rgw::sal::User> user;
+    if (! bucket->get_owner()) {
+      auto& bucket_info = bucket->get_info();
+      user = oc.driver->get_user(bucket_info.owner);
+      if (user) {
+       bucket->set_owner(user.get());
+      }
+    }
+
+    obj = bucket->get_object(oc.o.key);
+
+    auto event_type = (oc.bucket->versioned() &&
+                      oc.o.is_current() && !oc.o.is_delete_marker()) ?
+      rgw::notify::ObjectTransitionCurrent :
+      rgw::notify::ObjectTransitionNoncurrent;
+
+    std::unique_ptr<rgw::sal::Notification> notify
+      = oc.driver->get_notification(
+       oc.dpp, obj.get(), nullptr, event_type,
+       bucket.get(), lc_id,
+       const_cast<std::string&>(oc.bucket->get_tenant()),
+       lc_req_id, null_yield);
+
+    ret = notify->publish_reserve(oc.dpp, nullptr);
+    if (ret < 0) {
+      ldpp_dout(oc.dpp, 1)
+       << "ERROR: notify reservation failed, deferring transition of object k="
+       << oc.o.key
+       << dendl;
+      return ret;
+    }
+
+    ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o,
+                                     oc.env.worker->get_cloud_targets(),
+                                     oc.cct, !delete_object, oc.dpp,
+                                     null_yield);
+    if (ret < 0) {
+      return ret;
+    } else {
+      // send request to notification manager
+      (void) notify->publish_commit(oc.dpp, obj->get_obj_size(),
+                                   ceph::real_clock::now(),
+                                   obj->get_attrs()[RGW_ATTR_ETAG].to_str(),
+                                   version_id);
+    }
+
     if (delete_object) {
       ret = delete_tier_obj(oc);
       if (ret < 0) {
@@ -1661,6 +1741,15 @@ int RGWLC::bucket_lc_process(string& shard_id, LCWorker* worker,
     worker->workpool->drain();
   }
 
+  std::unique_ptr<rgw::sal::User> user;
+  if (! bucket->get_owner()) {
+    auto& bucket_info = bucket->get_info();
+    std::unique_ptr<rgw::sal::User> user = driver->get_user(bucket_info.owner);
+      if (user) {
+       bucket->set_owner(user.get());
+      }
+  }
+
   ret = handle_multipart_expiration(bucket.get(), prefix_map, worker, stop_at, once);
   return ret;
 }
index 4fd9cca12d625c5b88b33ed38eb5d707b2d53d8f..87d3ca8eeaa56d7fee9c3e57d6575ec107d2732d 100644 (file)
@@ -7,18 +7,20 @@ import subprocess
 import socket
 import time
 import os
+import io
 import string
 import boto
-from botocore.exceptions import ClientError
 from http import server as http_server
 from random import randint
 import hashlib
+# XXX this should be converted to use pytest
 from nose.plugins.attrib import attr
 import boto3
 import datetime
 from cloudevents.http import from_http
 from dateutil import parser
 
+# XXX this should be converted to use boto3
 from boto.s3.connection import S3Connection
 
 from . import(
@@ -553,8 +555,17 @@ def another_user(tenant=None):
 @attr('basic_test')
 def test_ps_s3_topic_on_master():
     """ test s3 topics set/get/delete on master """
+    
+    access_key = str(time.time())
+    secret_key = str(time.time())
+    uid = 'superman' + str(time.time())
     tenant = 'kaboom'
-    conn = another_user(tenant)
+    _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])  
+    assert_equal(result, 0)
+    conn = S3Connection(aws_access_key_id=access_key,
+                  aws_secret_access_key=secret_key,
+                      is_secure=False, port=get_config_port(), host=get_config_host(), 
+                      calling_format='boto.s3.connection.OrdinaryCallingFormat')
     zonegroup = 'default' 
     bucket_name = gen_bucket_name()
     topic_name = bucket_name + TOPIC_SUFFIX
@@ -625,8 +636,17 @@ def test_ps_s3_topic_on_master():
 @attr('basic_test')
 def test_ps_s3_topic_admin_on_master():
     """ test s3 topics set/get/delete on master """
+    
+    access_key = str(time.time())
+    secret_key = str(time.time())
+    uid = 'superman' + str(time.time())
     tenant = 'kaboom'
-    conn = another_user(tenant)
+    _, result = admin(['user', 'create', '--uid', uid, '--tenant', tenant, '--access-key', access_key, '--secret-key', secret_key, '--display-name', '"Super Man"'])  
+    assert_equal(result, 0)
+    conn = S3Connection(aws_access_key_id=access_key,
+                  aws_secret_access_key=secret_key,
+                      is_secure=False, port=get_config_port(), host=get_config_host(), 
+                      calling_format='boto.s3.connection.OrdinaryCallingFormat')
     zonegroup = 'default' 
     bucket_name = gen_bucket_name()
     topic_name = bucket_name + TOPIC_SUFFIX
@@ -1216,87 +1236,6 @@ def test_ps_s3_notification_errors_on_master():
     # delete the bucket
     conn.delete_bucket(bucket_name)
 
-@attr('basic_test')
-def test_ps_s3_notification_permissions():
-    """ test s3 notification set/get/delete permissions """
-    conn1 = connection()
-    conn2 = another_user()
-    zonegroup = 'default'
-    bucket_name = gen_bucket_name()
-    # create bucket
-    bucket = conn1.create_bucket(bucket_name)
-    topic_name = bucket_name + TOPIC_SUFFIX
-    # create s3 topic
-    endpoint_address = 'amqp://127.0.0.1:7001'
-    endpoint_args = 'push-endpoint='+endpoint_address+'&amqp-exchange=amqp.direct&amqp-ack-level=none'
-    topic_conf = PSTopicS3(conn1, topic_name, zonegroup, endpoint_args=endpoint_args)
-    topic_arn = topic_conf.set_config()
-
-    # one user create a notification
-    notification_name = bucket_name + NOTIFICATION_SUFFIX
-    topic_conf_list = [{'Id': notification_name,
-                        'TopicArn': topic_arn,
-                        'Events': []
-                       }]
-    s3_notification_conf1 = PSNotificationS3(conn1, bucket_name, topic_conf_list)
-    _, status = s3_notification_conf1.set_config()
-    assert_equal(status, 200)
-    # another user try to fetch it
-    s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
-    try:
-        _, _ = s3_notification_conf2.get_config()
-        assert False, "'AccessDenied' error is expected"
-    except ClientError as error:
-        assert_equal(error.response['Error']['Code'], 'AccessDenied')
-    # other user try to delete the notification
-    _, status = s3_notification_conf2.del_config()
-    assert_equal(status, 403)
-
-    # bucket policy is added by the 1st user
-    client = boto3.client('s3',
-            endpoint_url='http://'+conn1.host+':'+str(conn1.port),
-            aws_access_key_id=conn1.aws_access_key_id,
-            aws_secret_access_key=conn1.aws_secret_access_key)
-    bucket_policy = json.dumps({
-        "Version": "2012-10-17",
-        "Statement": [
-            {
-                "Sid": "Statement",
-                "Effect": "Allow",
-                "Principal": "*",
-                "Action": ["s3:GetBucketNotification", "s3:PutBucketNotification"],
-                "Resource": f"arn:aws:s3:::{bucket_name}"
-            }
-        ]
-    })
-    response = client.put_bucket_policy(Bucket=bucket_name, Policy=bucket_policy)
-    assert_equal(int(response['ResponseMetadata']['HTTPStatusCode']/100), 2) 
-    result = client.get_bucket_policy(Bucket=bucket_name)
-    print(result['Policy'])
-    
-    # 2nd user try to fetch it again
-    _, status = s3_notification_conf2.get_config()
-    assert_equal(status, 200)
-
-    # 2nd user try to delete it again
-    result, status = s3_notification_conf2.del_config()
-    assert_equal(status, 200)
-
-    # 2nd user try to add another notification
-    topic_conf_list = [{'Id': notification_name+"2",
-                        'TopicArn': topic_arn,
-                        'Events': []
-                       }]
-    s3_notification_conf2 = PSNotificationS3(conn2, bucket_name, topic_conf_list)
-    result, status = s3_notification_conf2.set_config()
-    assert_equal(status, 200)
-
-    # cleanup
-    s3_notification_conf1.del_config()
-    s3_notification_conf2.del_config()
-    topic_conf.del_config()
-    # delete the bucket
-    conn1.delete_bucket(bucket_name)
 
 @attr('amqp_test')
 def test_ps_s3_notification_push_amqp_on_master():
@@ -2039,6 +1978,114 @@ def test_ps_s3_lifecycle_on_master():
     conn.delete_bucket(bucket_name)
     http_server.close()
 
+def start_and_abandon_multipart_upload(bucket, key_name, content):
+    try:
+        mp = bucket.initiate_multipart_upload(key_name)
+        part_data = io.StringIO(content)
+        mp.upload_part_from_file(part_data, 1)
+        # mp.complete_upload()
+    except Exception as e:
+        print('Error: ' + str(e))
+
+@attr('http_test')
+def test_ps_s3_lifecycle_abort_mpu_on_master():
+    """ test that when a multipart upload is aborted by lifecycle policy, notification is sent on master """
+    hostname = get_ip()
+    conn = connection()
+    zonegroup = 'default'
+
+    # create random port for the http server
+    host = get_ip()
+    port = random.randint(10000, 20000)
+    # start an http server in a separate thread
+    number_of_objects = 1
+    http_server = StreamingHTTPServer(host, port, num_workers=number_of_objects)
+
+    # create bucket
+    bucket_name = gen_bucket_name()
+    bucket = conn.create_bucket(bucket_name)
+    topic_name = bucket_name + TOPIC_SUFFIX
+
+    # create s3 topic
+    endpoint_address = 'http://'+host+':'+str(port)
+    endpoint_args = 'push-endpoint='+endpoint_address
+    opaque_data = 'http://1.2.3.4:8888'
+    topic_conf = PSTopicS3(conn, topic_name, zonegroup, endpoint_args=endpoint_args, opaque_data=opaque_data)
+    topic_arn = topic_conf.set_config()
+    # create s3 notification
+    notification_name = bucket_name + NOTIFICATION_SUFFIX
+    topic_conf_list = [{'Id': notification_name,
+                        'TopicArn': topic_arn,
+                        'Events': ['s3:ObjectLifecycle:Expiration:*']
+                       }]
+    s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
+    response, status = s3_notification_conf.set_config()
+    assert_equal(status/100, 2)
+
+    # start and abandon a multpart upload
+    # create objects in the bucket
+    obj_prefix = 'ooo'
+    start_time = time.time()
+    content = 'bar'
+
+    key_name = obj_prefix + str(1)
+    thr = threading.Thread(target = start_and_abandon_multipart_upload, args=(bucket, key_name, content,))
+    thr.start()
+    thr.join()    
+
+    time_diff = time.time() - start_time
+    print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
+    
+    # create lifecycle policy -- assume rgw_lc_debug_interval=10 is in effect
+    client = boto3.client('s3',
+            endpoint_url='http://'+conn.host+':'+str(conn.port),
+            aws_access_key_id=conn.aws_access_key_id,
+            aws_secret_access_key=conn.aws_secret_access_key)
+    response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name, 
+            LifecycleConfiguration={'Rules': [
+                {
+                    'ID': 'abort1',
+                    'Filter': {'Prefix': obj_prefix},
+                    'Status': 'Enabled',
+                    'AbortIncompleteMultipartUpload': {'DaysAfterInitiation': 1},
+                }
+            ]
+        }
+    )
+
+    # start lifecycle processing
+    admin(['lc', 'process'])
+    print('wait for 20s (2 days) for the messages...')
+    time.sleep(20)
+
+    # check http receiver does not have messages
+    keys = list(bucket.list())
+    print('total number of objects: ' + str(len(keys)))
+    event_keys = []
+    events = http_server.get_and_reset_events()
+    for event in events:
+        # I hope Boto doesn't gak on the unknown eventName
+        assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:AbortMultipartUpload')
+        event_keys.append(event['Records'][0]['s3']['object']['key'])
+    for key in keys:
+        key_found = False
+        for event_key in event_keys:
+            if event_key == key:
+                key_found = True
+                break
+        if not key_found:
+            err = 'no lifecycle event found for key: ' + str(key)
+            log.error(events)
+            assert False, err
+
+    # cleanup
+    for key in keys:
+        key.delete()
+    topic_conf.del_config()
+    s3_notification_conf.del_config(notification=notification_name)
+    # delete the bucket
+    conn.delete_bucket(bucket_name)
+    http_server.close()
 
 def ps_s3_creation_triggers_on_master(external_endpoint_address=None, ca_location=None, verify_ssl='true'):
     """ test object creation s3 notifications in using put/copy/post on master"""