]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: support metadata filter in COPY events
authorYuval Lifshitz <ylifshit@redhat.com>
Thu, 17 Jun 2021 13:08:32 +0000 (16:08 +0300)
committerYuval Lifshitz <ylifshit@redhat.com>
Thu, 17 Jun 2021 13:08:32 +0000 (16:08 +0300)
Fixes: https://tracker.ceph.com/issues/51261
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/rgw_notify.cc
src/rgw/rgw_notify.h
src/test/rgw/bucket_notification/test_bn.py

index a32f1e7eeaeeceb8b6f7d2a371b546b494de915f..1c5a89e02948653ba774fa0edaf74b4699593910 100644 (file)
@@ -667,13 +667,14 @@ void tags_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyValueMap
 }
 
 // populate event from request
-void populate_event_from_request(const req_state *s, 
+void populate_event_from_request(const reservation_t& res, 
         rgw::sal::Object* obj,
         uint64_t size,
         const ceph::real_time& mtime, 
         const std::string& etag, 
         EventType event_type,
-        rgw_pubsub_s3_event& event) { 
+        rgw_pubsub_s3_event& event) {
+  const auto s = res.s;
   event.eventTime = mtime;
   event.eventName = to_event_string(event_type);
   event.userIdentity = s->user->get_id().id;    // user that triggered the change
@@ -693,12 +694,14 @@ void populate_event_from_request(const req_state *s,
           std::back_inserter(event.object_sequencer));
   set_event_id(event.id, etag, ts);
   event.bucket_id = s->bucket->get_bucket_id();
-  // pass meta data
-  if (s->info.x_meta_map.empty()) {
-    // try to fetch the metadata from the attributes
+  // pass metadata
+  if (res.cached_metadata.empty()) {
+    // no metadata cached:
+    // either no metadata exist or no metadata filter was used
+    event.x_meta_map = s->info.x_meta_map;
     metadata_from_attributes(s, obj, event.x_meta_map);
   } else {
-    event.x_meta_map = s->info.x_meta_map;
+    event.x_meta_map = std::move(res.cached_metadata);
   }
   // pass tags
   if (s->tagset.get_tags().empty()) {
@@ -710,29 +713,22 @@ void populate_event_from_request(const req_state *s,
   // opaque data will be filled from topic configuration
 }
 
-bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::Object* obj,
-    EventType event, const RGWObjTags* req_tags) {
+bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) {
   if (!match(filter.events, event)) { 
     return false;
   }
+  const auto obj = res.object;
   if (!match(filter.s3_filter.key_filter, obj->get_name())) {
     return false;
   }
 
+  const auto s = res.s;
   if (!filter.s3_filter.metadata_filter.kv.empty()) {
     // metadata filter exists
-    if (!s->info.x_meta_map.empty()) {
-      // metadata was cached in req_state
-      if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
-        return false;
-      }
-    } else {
-      // try to fetch the metadata from the attributes
-      KeyValueMap metadata;
-      metadata_from_attributes(s, obj, metadata);
-      if (!match(filter.s3_filter.metadata_filter, metadata)) {
-        return false;
-      }
+    res.cached_metadata = s->info.x_meta_map;
+    metadata_from_attributes(s, obj, res.cached_metadata);
+    if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) {
+      return false;
     }
   }
 
@@ -776,7 +772,7 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
   for (const auto& bucket_topic : bucket_topics.topics) {
     const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
     const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
-    if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) {
+    if (!notification_match(res, topic_filter, event_type, req_tags)) {
       // notification does not apply to req_state
       continue;
     }
@@ -829,7 +825,7 @@ int publish_commit(rgw::sal::Object* obj,
       continue;
     }
     event_entry_t event_entry;
-    populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event);
+    populate_event_from_request(res, obj, size, mtime, etag, event_type, event_entry.event);
     event_entry.event.configurationId = topic.configurationId;
     event_entry.event.opaque_data = topic.cfg.opaque_data;
     if (topic.cfg.dest.persistent) { 
index ba5f96c1fd0efcaedd12f873399c6c5cfae734d3..899e2f3ad7b7d08d9fd41534fd9d9f963cca4134 100644 (file)
@@ -58,6 +58,7 @@ struct reservation_t {
   const req_state* const s;
   size_t size;
   rgw::sal::Object* const object;
+  KeyValueMap cached_metadata;
 
   reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s, rgw::sal::Object* _object) :
       dpp(_dpp), store(_store), s(_s), object(_object) {}
index 333b09d71551f2462ff1a797341f5190a933e1df..1fb08f29b4ade8662d012b93d6eb6423f98bef77 100644 (file)
@@ -1730,10 +1730,14 @@ def test_ps_s3_metadata_on_master():
 
     # create objects in the bucket using COPY
     bucket.copy_key('copy_of_foo', bucket.name, key.name)
+    
+    # create another objects in the bucket using COPY
+    # but override the metadata value
+    bucket.copy_key('another_copy_of_foo', bucket.name, key.name, metadata={meta_key: 'kaboom'})
 
     # create objects in the bucket using multi-part upload
     fp = tempfile.NamedTemporaryFile(mode='w+b')
-    object_size = 1024
+    object_size = 10*1024*1024
     content = bytearray(os.urandom(object_size))
     fp.write(content)
     fp.flush()
@@ -1747,15 +1751,8 @@ def test_ps_s3_metadata_on_master():
     print('wait for 5sec for the messages...')
     time.sleep(5)
     # check amqp receiver
-    event_count = 0
-    for event in receiver.get_and_reset_events():
-        s3_event = event['Records'][0]['s3']
-        assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
-        assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
-        event_count +=1
+    assert_equal(len(receiver.get_and_reset_events()), 3)
 
-    # only PUT and POST has the metadata value
-    assert_equal(event_count, 2)
 
     # delete objects
     for key in bucket.list():
@@ -1763,15 +1760,7 @@ def test_ps_s3_metadata_on_master():
     print('wait for 5sec for the messages...')
     time.sleep(5)
     # check amqp receiver
-    event_count = 0
-    for event in receiver.get_and_reset_events():
-        s3_event = event['Records'][0]['s3']
-        assert_equal(s3_event['object']['metadata'][0]['key'], meta_prefix+meta_key)
-        assert_equal(s3_event['object']['metadata'][0]['val'], meta_value)
-        event_count +=1
-
-    # all 3 object has metadata when deleted
-    assert_equal(event_count, 3)
+    assert_equal(len(receiver.get_and_reset_events()), 3)
 
     # cleanup
     stop_amqp_receiver(receiver, task)