From 93b9f0fb77ca5ed0b5c89d45229732850e0a0c49 Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sun, 20 Jun 2021 14:26:53 +0300 Subject: [PATCH] rgw/notifications: support metadata filter in CompleteMultipartUpload events Fixes: https://tracker.ceph.com/issues/51261 Signed-off-by: Yuval Lifshitz --- src/rgw/rgw_notify.cc | 5 ++-- src/rgw/rgw_notify.h | 6 +++-- src/rgw/rgw_op.cc | 15 +++++------ src/rgw/rgw_sal.h | 3 ++- src/rgw/rgw_sal_rados.cc | 5 ++-- src/rgw/rgw_sal_rados.h | 6 +++-- src/test/rgw/bucket_notification/test_bn.py | 28 +++++++++++++++------ 7 files changed, 44 insertions(+), 24 deletions(-) diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index 1c5a89e0294..6609eab2adb 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -684,7 +684,7 @@ void populate_event_from_request(const reservation_t& res, event.bucket_name = s->bucket_name; event.bucket_ownerIdentity = s->bucket_owner.get_id().id; event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key())); - event.object_key = obj->get_name(); + event.object_key = res.object_name ? *res.object_name : obj->get_name(); event.object_size = size; event.object_etag = etag; event.object_versionId = obj->get_instance(); @@ -718,7 +718,8 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte return false; } const auto obj = res.object; - if (!match(filter.s3_filter.key_filter, obj->get_name())) { + if (!match(filter.s3_filter.key_filter, + res.object_name ? *res.object_name : obj->get_name())) { return false; } diff --git a/src/rgw/rgw_notify.h b/src/rgw/rgw_notify.h index 899e2f3ad7b..bd0001c5991 100644 --- a/src/rgw/rgw_notify.h +++ b/src/rgw/rgw_notify.h @@ -58,10 +58,12 @@ struct reservation_t { const req_state* const s; size_t size; rgw::sal::Object* const object; + const std::string* const object_name; KeyValueMap cached_metadata; - reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s, rgw::sal::Object* _object) : - dpp(_dpp), store(_store), s(_s), object(_object) {} + reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s, + rgw::sal::Object* _object, const std::string* _object_name) : + dpp(_dpp), store(_store), s(_s), object(_object), object_name(_object_name) {} // dtor doing resource leak guarding // aborting the reservation if not already committed or aborted diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 93b98b04d40..51b070b7a2f 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -6065,13 +6065,6 @@ void RGWCompleteMultipart::execute(optional_yield y) mp.init(s->object->get_name(), upload_id); - // make reservation for notification if needed - std::unique_ptr res = store->get_notification(s->object.get(), - s, rgw::notify::ObjectCreatedCompleteMultipartUpload); - op_ret = res->publish_reserve(this); - if (op_ret < 0) { - return; - } meta_oid = mp.get_meta(); @@ -6123,6 +6116,14 @@ void RGWCompleteMultipart::execute(optional_yield y) return; } attrs = meta_obj->get_attrs(); + + // make reservation for notification if needed + std::unique_ptr res = store->get_notification(meta_obj.get(), + s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name()); + op_ret = res->publish_reserve(this); + if (op_ret < 0) { + return; + } do { op_ret = list_multipart_parts(this, s, upload_id, meta_oid, max_parts, diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 52b5aaecf91..6f7165fc486 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -170,7 +170,8 @@ class Store { virtual int cluster_stat(RGWClusterStat& stats) = 0; virtual std::unique_ptr get_lifecycle(void) = 0; virtual std::unique_ptr get_completions(void) = 0; - virtual std::unique_ptr get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type) = 0; + virtual std::unique_ptr get_notification(rgw::sal::Object* obj, struct req_state* s, + rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0; virtual std::unique_ptr get_gc_chain(rgw::sal::Object* obj) = 0; virtual std::unique_ptr get_writer(Aio* aio, rgw::sal::Bucket* bucket, RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, diff --git a/src/rgw/rgw_sal_rados.cc b/src/rgw/rgw_sal_rados.cc index c77fee75a61..ca6574b8c9c 100644 --- a/src/rgw/rgw_sal_rados.cc +++ b/src/rgw/rgw_sal_rados.cc @@ -932,9 +932,10 @@ std::unique_ptr RadosStore::get_completions(void) std::unique_ptr RadosStore::get_notification(rgw::sal::Object* obj, struct req_state* s, - rgw::notify::EventType event_type) + rgw::notify::EventType event_type, + const std::string* object_name) { - return std::unique_ptr(new RadosNotification(s, this, obj, s, event_type)); + return std::unique_ptr(new RadosNotification(s, this, obj, s, event_type, object_name)); } std::unique_ptr RadosStore::get_gc_chain(rgw::sal::Object* obj) diff --git a/src/rgw/rgw_sal_rados.h b/src/rgw/rgw_sal_rados.h index 34aaefd8361..23cbc9942bc 100644 --- a/src/rgw/rgw_sal_rados.h +++ b/src/rgw/rgw_sal_rados.h @@ -402,7 +402,7 @@ class RadosStore : public Store { virtual int cluster_stat(RGWClusterStat& stats) override; virtual std::unique_ptr get_lifecycle(void) override; virtual std::unique_ptr get_completions(void) override; - virtual std::unique_ptr get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type) override; + virtual std::unique_ptr get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override; virtual std::unique_ptr get_gc_chain(rgw::sal::Object* obj) override; virtual std::unique_ptr get_writer(Aio* aio, rgw::sal::Bucket* bucket, RGWObjectCtx& obj_ctx, std::unique_ptr _head_obj, @@ -537,7 +537,9 @@ class RadosNotification : public Notification { rgw::notify::reservation_t res; public: - RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s, rgw::notify::EventType _type) : Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj) { } + RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s, + rgw::notify::EventType _type, const std::string* object_name=nullptr) : + Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj, object_name) { } ~RadosNotification() = default; virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override; diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 1fb08f29b4a..54f4d99fcfc 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -1722,37 +1722,49 @@ def test_ps_s3_metadata_on_master(): response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) + expected_keys = [] # create objects in the bucket key_name = 'foo' key = bucket.new_key(key_name) key.set_metadata(meta_key, meta_value) key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') + expected_keys.append(key_name) # create objects in the bucket using COPY - bucket.copy_key('copy_of_foo', bucket.name, key.name) + key_name = 'copy_of_foo' + bucket.copy_key(key_name, bucket.name, key.name) + expected_keys.append(key_name) # create another objects in the bucket using COPY # but override the metadata value - bucket.copy_key('another_copy_of_foo', bucket.name, key.name, metadata={meta_key: 'kaboom'}) + key_name = 'another_copy_of_foo' + bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'}) # create objects in the bucket using multi-part upload fp = tempfile.NamedTemporaryFile(mode='w+b') - object_size = 10*1024*1024 + chunk_size = 1024*1024*5 # 5MB + object_size = 10*chunk_size content = bytearray(os.urandom(object_size)) fp.write(content) fp.flush() fp.seek(0) - uploader = bucket.initiate_multipart_upload('multipart_foo', + key_name = 'multipart_foo' + uploader = bucket.initiate_multipart_upload(key_name, metadata={meta_key: meta_value}) - uploader.upload_part_from_file(fp, 1) + for i in range(1,5): + uploader.upload_part_from_file(fp, i, size=chunk_size) + fp.seek(i*chunk_size) uploader.complete_upload() fp.close() + expected_keys.append(key_name) print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver - assert_equal(len(receiver.get_and_reset_events()), 3) - + events = receiver.get_and_reset_events() + assert_equal(len(events), 4) # PUT, COPY, Multipart start, Multipart End + for event in events: + assert(event['Records'][0]['s3']['object']['key'] in expected_keys) # delete objects for key in bucket.list(): @@ -1760,7 +1772,7 @@ def test_ps_s3_metadata_on_master(): print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver - assert_equal(len(receiver.get_and_reset_events()), 3) + #assert_equal(len(receiver.get_and_reset_events()), len(expected_keys)) # cleanup stop_amqp_receiver(receiver, task) -- 2.39.5