From: Yuval Lifshitz Date: Thu, 17 Jun 2021 13:08:32 +0000 (+0300) Subject: rgw/notifications: support metadata filter in COPY events X-Git-Tag: v16.2.6~141^2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=deb98966ce88b1b2b186d55827518e1c13f073ec;p=ceph.git rgw/notifications: support metadata filter in COPY events Fixes: https://tracker.ceph.com/issues/51261 Signed-off-by: Yuval Lifshitz (cherry picked from commit e7f30a1b278455567f1f1069f41c9a1c3ef335c2) Conflicts: src/rgw/rgw_notify.cc src/rgw/rgw_notify.h src/test/rgw/bucket_notification/test_bn.py Cherry-pick notes: - handle differences due to renaming of rgw::sal::RGWObject to rgw::sal::Object - handle differences due to move of test_ps_s3_metadata_on_master test from tests_ps.py to test_bn.py --- diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index a4ad062005e7..53fac8fd45c2 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -658,13 +658,14 @@ void tags_from_attributes(const req_state* s, rgw::sal::RGWObject* obj, KeyValue } // populate event from request -void populate_event_from_request(const req_state *s, +void populate_event_from_request(const reservation_t& res, rgw::sal::RGWObject* obj, uint64_t size, const ceph::real_time& mtime, const std::string& etag, EventType event_type, - rgw_pubsub_s3_event& event) { + rgw_pubsub_s3_event& event) { + const auto s = res.s; event.eventTime = mtime; event.eventName = to_string(event_type); event.userIdentity = s->user->get_id().id; // user that triggered the change @@ -684,12 +685,14 @@ void populate_event_from_request(const req_state *s, std::back_inserter(event.object_sequencer)); set_event_id(event.id, etag, ts); event.bucket_id = s->bucket->get_bucket_id(); - // pass meta data - if (s->info.x_meta_map.empty()) { - // try to fetch the metadata from the attributes + // pass metadata + if (res.cached_metadata.empty()) { + // no metadata cached: + // either no metadata exist or no metadata filter was used + event.x_meta_map = s->info.x_meta_map; metadata_from_attributes(s, obj, event.x_meta_map); } else { - event.x_meta_map = s->info.x_meta_map; + event.x_meta_map = std::move(res.cached_metadata); } // pass tags if (s->tagset.get_tags().empty()) { @@ -701,29 +704,22 @@ void populate_event_from_request(const req_state *s, // opaque data will be filled from topic configuration } -bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj, - EventType event, const RGWObjTags* req_tags) { - if (!match(filter.events, event)) { +bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) { + if (!match(filter.events, event)) { return false; } + const auto obj = res.object; if (!match(filter.s3_filter.key_filter, obj->get_name())) { return false; } + const auto s = res.s; if (!filter.s3_filter.metadata_filter.kv.empty()) { // metadata filter exists - if (!s->info.x_meta_map.empty()) { - // metadata was cached in req_state - if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) { - return false; - } - } else { - // try to fetch the metadata from the attributes - KeyValueMap metadata; - metadata_from_attributes(s, obj, metadata); - if (!match(filter.s3_filter.metadata_filter, metadata)) { - return false; - } + res.cached_metadata = s->info.x_meta_map; + metadata_from_attributes(s, obj, res.cached_metadata); + if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) { + return false; } } @@ -767,7 +763,7 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type, for (const auto& bucket_topic : bucket_topics.topics) { const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second; const rgw_pubsub_topic& topic_cfg = topic_filter.topic; - if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) { + if (!notification_match(res, topic_filter, event_type, req_tags)) { // notification does not apply to req_state continue; } @@ -820,7 +816,7 @@ int publish_commit(rgw::sal::RGWObject* obj, continue; } event_entry_t event_entry; - populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event); + populate_event_from_request(res, obj, size, mtime, etag, event_type, event_entry.event); event_entry.event.configurationId = topic.configurationId; event_entry.event.opaque_data = topic.cfg.opaque_data; if (topic.cfg.dest.persistent) { diff --git a/src/rgw/rgw_notify.h b/src/rgw/rgw_notify.h index b7bfc28342ce..a5bdc4d5485d 100644 --- a/src/rgw/rgw_notify.h +++ b/src/rgw/rgw_notify.h @@ -58,6 +58,7 @@ struct reservation_t { const req_state* const s; size_t size; rgw::sal::RGWObject* const object; + KeyValueMap cached_metadata; reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RGWRadosStore* _store, const req_state* _s, rgw::sal::RGWObject* _object) : dpp(_dpp), store(_store), s(_s), object(_object) {} diff --git a/src/test/rgw/rgw_multi/tests_ps.py b/src/test/rgw/rgw_multi/tests_ps.py index fd6e73e868ad..5171401a6155 100644 --- a/src/test/rgw/rgw_multi/tests_ps.py +++ b/src/test/rgw/rgw_multi/tests_ps.py @@ -3936,40 +3936,49 @@ def test_ps_s3_metadata_on_master(): response, status = s3_notification_conf.set_config() assert_equal(status/100, 2) + expected_keys = [] # create objects in the bucket key_name = 'foo' key = bucket.new_key(key_name) key.set_metadata(meta_key, meta_value) key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa') + expected_keys.append(key_name) # create objects in the bucket using COPY - bucket.copy_key('copy_of_foo', bucket.name, key.name) + key_name = 'copy_of_foo' + bucket.copy_key(key_name, bucket.name, key.name) + expected_keys.append(key_name) + + # create another objects in the bucket using COPY + # but override the metadata value + key_name = 'another_copy_of_foo' + bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'}) # create objects in the bucket using multi-part upload fp = tempfile.NamedTemporaryFile(mode='w+b') - object_size = 1024 + chunk_size = 1024*1024*5 # 5MB + object_size = 10*chunk_size content = bytearray(os.urandom(object_size)) fp.write(content) fp.flush() fp.seek(0) - uploader = bucket.initiate_multipart_upload('multipart_foo', - metadata={meta_key: meta_value}) - uploader.upload_part_from_file(fp, 1) + key_name = 'multipart_foo' + uploader = bucket.initiate_multipart_upload(key_name, + metadata={meta_key: meta_value}) + for i in range(1,5): + uploader.upload_part_from_file(fp, i, size=chunk_size) + fp.seek(i*chunk_size) uploader.complete_upload() fp.close() + expected_keys.append(key_name) print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver - event_count = 0 - for event in receiver.get_and_reset_events(): - s3_event = event['Records'][0]['s3'] - assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key) - assert_equal(s3_event['object']['metadata'][0]['val'], meta_value) - event_count +=1 - - # only PUT and POST has the metadata value - assert_equal(event_count, 2) + events = receiver.get_and_reset_events() + assert_equal(len(events), 4) # PUT, COPY, Multipart start, Multipart End + for event in events: + assert(event['Records'][0]['s3']['object']['key'] in expected_keys) # delete objects for key in bucket.list(): @@ -3977,12 +3986,7 @@ def test_ps_s3_metadata_on_master(): print('wait for 5sec for the messages...') time.sleep(5) # check amqp receiver - event_count = 0 - for event in receiver.get_and_reset_events(): - s3_event = event['Records'][0]['s3'] - assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key) - assert_equal(s3_event['object']['metadata'][0]['val'], meta_value) - event_count +=1 + #assert_equal(len(receiver.get_and_reset_events()), len(expected_keys)) # all 3 object has metadata when deleted assert_equal(event_count, 3)