From: Yuval Lifshitz Date: Thu, 17 Jun 2021 13:08:32 +0000 (+0300) Subject: rgw/notifications: support metadata filter in COPY events X-Git-Tag: v17.1.0~1502^2~1 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=e7f30a1b278455567f1f1069f41c9a1c3ef335c2;p=ceph.git rgw/notifications: support metadata filter in COPY events Fixes: https://tracker.ceph.com/issues/51261 Signed-off-by: Yuval Lifshitz --- diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index a32f1e7eeaee..1c5a89e02948 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -667,13 +667,14 @@ void tags_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyValueMap } // populate event from request -void populate_event_from_request(const req_state *s, +void populate_event_from_request(const reservation_t& res, rgw::sal::Object* obj, uint64_t size, const ceph::real_time& mtime, const std::string& etag, EventType event_type, - rgw_pubsub_s3_event& event) { + rgw_pubsub_s3_event& event) { + const auto s = res.s; event.eventTime = mtime; event.eventName = to_event_string(event_type); event.userIdentity = s->user->get_id().id; // user that triggered the change @@ -693,12 +694,14 @@ void populate_event_from_request(const req_state *s, std::back_inserter(event.object_sequencer)); set_event_id(event.id, etag, ts); event.bucket_id = s->bucket->get_bucket_id(); - // pass meta data - if (s->info.x_meta_map.empty()) { - // try to fetch the metadata from the attributes + // pass metadata + if (res.cached_metadata.empty()) { + // no metadata cached: + // either no metadata exist or no metadata filter was used + event.x_meta_map = s->info.x_meta_map; metadata_from_attributes(s, obj, event.x_meta_map); } else { - event.x_meta_map = s->info.x_meta_map; + event.x_meta_map = std::move(res.cached_metadata); } // pass tags if (s->tagset.get_tags().empty()) { @@ -710,29 +713,22 @@ void populate_event_from_request(const req_state *s, // opaque data will be filled from topic configuration } -bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::Object* obj, - EventType event, const RGWObjTags* req_tags) { +bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) { if (!match(filter.events, event)) { return false; } + const auto obj = res.object; if (!match(filter.s3_filter.key_filter, obj->get_name())) { return false; } + const auto s = res.s; if (!filter.s3_filter.metadata_filter.kv.empty()) { // metadata filter exists - if (!s->info.x_meta_map.empty()) { - // metadata was cached in req_state - if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) { - return false; - } - } else { - // try to fetch the metadata from the attributes - KeyValueMap metadata; - metadata_from_attributes(s, obj, metadata); - if (!match(filter.s3_filter.metadata_filter, metadata)) { - return false; - } + res.cached_metadata = s->info.x_meta_map; + metadata_from_attributes(s, obj, res.cached_metadata); + if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) { + return false; } } @@ -776,7 +772,7 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type, for (const auto& bucket_topic : bucket_topics.topics) { const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; const rgw_pubsub_topic& topic_cfg = topic_filter.topic; - if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) { + if (!notification_match(res, topic_filter, event_type, req_tags)) { // notification does not apply to req_state continue; } @@ -829,7 +825,7 @@ int publish_commit(rgw::sal::Object* obj, continue; } event_entry_t event_entry; - populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event); + populate_event_from_request(res, obj, size, mtime, etag, event_type, event_entry.event); event_entry.event.configurationId = topic.configurationId; event_entry.event.opaque_data = topic.cfg.opaque_data; if (topic.cfg.dest.persistent) { diff --git a/src/rgw/rgw_notify.h b/src/rgw/rgw_notify.h index ba5f96c1fd0e..899e2f3ad7b7 100644 --- a/src/rgw/rgw_notify.h +++ b/src/rgw/rgw_notify.h @@ -58,6 +58,7 @@ struct reservation_t { const req_state* const s; size_t size; rgw::sal::Object* const object; + KeyValueMap cached_metadata; reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s, rgw::sal::Object* _object) : dpp(_dpp), store(_store), s(_s), object(_object) {} diff --git a/src/test/rgw/bucket_notification/test_bn.py b/src/test/rgw/bucket_notification/test_bn.py index 333b09d71551..1fb08f29b4ad 100644 --- a/src/test/rgw/bucket_notification/test_bn.py +++ b/src/test/rgw/bucket_notification/test_bn.py @@ -1730,10 +1730,14 @@ def test_ps_s3_metadata_on_master(): # create objects in the bucket using COPY bucket.copy_key('copy_of_foo', bucket.name, key.name) + + # create another objects in the bucket using COPY + # but override the metadata value + bucket.copy_key('another_copy_of_foo', bucket.name, key.name, metadata={meta_key: 'kaboom'}) # create objects in the bucket using multi-part upload fp = tempfile.NamedTemporaryFile(mode='w+b') - object_size = 1024 + object_size = 10*1024*1024 content = bytearray(os.urandom(object_size)) fp.write(content) fp.flush() @@ -1747,15 +1751,8 @@ def test_ps_s3_metadata_on_master(): print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver - event_count = 0 - for event in receiver.get_and_reset_events(): - s3_event = event['Records'][0]['s3'] - assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key) - assert_equal(s3_event['object']['metadata'][0]['val'], meta_value) - event_count +=1 + assert_equal(len(receiver.get_and_reset_events()), 3) - # only PUT and POST has the metadata value - assert_equal(event_count, 2) # delete objects for key in bucket.list(): @@ -1763,15 +1760,7 @@ def test_ps_s3_metadata_on_master(): print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver - event_count = 0 - for event in receiver.get_and_reset_events(): - s3_event = event['Records'][0]['s3'] - assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key) - assert_equal(s3_event['object']['metadata'][0]['val'], meta_value) - event_count +=1 - - # all 3 object has metadata when deleted - assert_equal(event_count, 3) + assert_equal(len(receiver.get_and_reset_events()), 3) # cleanup stop_amqp_receiver(receiver, task)