]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notifications: support metadata filter in CompleteMultipartUpload events 41945/head
authorYuval Lifshitz <ylifshit@redhat.com>
Sun, 20 Jun 2021 11:26:53 +0000 (14:26 +0300)
committerYuval Lifshitz <ylifshit@redhat.com>
Sun, 20 Jun 2021 11:26:53 +0000 (14:26 +0300)
Fixes: https://tracker.ceph.com/issues/51261
Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/rgw_notify.cc
src/rgw/rgw_notify.h
src/rgw/rgw_op.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_rados.cc
src/rgw/rgw_sal_rados.h
src/test/rgw/bucket_notification/test_bn.py

index 1c5a89e02948653ba774fa0edaf74b4699593910..6609eab2adbd3f43f7216923360d80a9628e670c 100644 (file)
@@ -684,7 +684,7 @@ void populate_event_from_request(const reservation_t& res,
   event.bucket_name = s->bucket_name;
   event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
   event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
-  event.object_key = obj->get_name();
+  event.object_key = res.object_name ? *res.object_name : obj->get_name();
   event.object_size = size;
   event.object_etag = etag;
   event.object_versionId = obj->get_instance();
@@ -718,7 +718,8 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte
     return false;
   }
   const auto obj = res.object;
-  if (!match(filter.s3_filter.key_filter, obj->get_name())) {
+  if (!match(filter.s3_filter.key_filter, 
+        res.object_name ? *res.object_name : obj->get_name())) {
     return false;
   }
 
index 899e2f3ad7b7d08d9fd41534fd9d9f963cca4134..bd0001c5991a0ed6b229c4f801867b444140569f 100644 (file)
@@ -58,10 +58,12 @@ struct reservation_t {
   const req_state* const s;
   size_t size;
   rgw::sal::Object* const object;
+  const std::string* const object_name;
   KeyValueMap cached_metadata;
 
-  reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s, rgw::sal::Object* _object) :
-      dpp(_dpp), store(_store), s(_s), object(_object) {}
+  reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s, 
+      rgw::sal::Object* _object, const std::string* _object_name) :
+      dpp(_dpp), store(_store), s(_s), object(_object), object_name(_object_name) {}
 
   // dtor doing resource leak guarding
   // aborting the reservation if not already committed or aborted
index 93b98b04d40468ed5d0b964eb85e5c0d73b53eb2..51b070b7a2fd655d3d01f3322f3d0149b4050206 100644 (file)
@@ -6065,13 +6065,6 @@ void RGWCompleteMultipart::execute(optional_yield y)
 
   mp.init(s->object->get_name(), upload_id);
 
-  // make reservation for notification if needed
-  std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
-                               s, rgw::notify::ObjectCreatedCompleteMultipartUpload);
-  op_ret = res->publish_reserve(this);
-  if (op_ret < 0) {
-    return;
-  }
 
   meta_oid = mp.get_meta();
 
@@ -6123,6 +6116,14 @@ void RGWCompleteMultipart::execute(optional_yield y)
     return;
   }
   attrs = meta_obj->get_attrs();
+  
+  // make reservation for notification if needed
+  std::unique_ptr<rgw::sal::Notification> res = store->get_notification(meta_obj.get(),
+                               s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name());
+  op_ret = res->publish_reserve(this);
+  if (op_ret < 0) {
+    return;
+  }
 
   do {
     op_ret = list_multipart_parts(this, s, upload_id, meta_oid, max_parts,
index 52b5aaecf91a23c99e7175e04897ed3b1ea15d09..6f7165fc48647175990ab3ea4ee62da3ce7df030 100644 (file)
@@ -170,7 +170,8 @@ class Store {
     virtual int cluster_stat(RGWClusterStat& stats) = 0;
     virtual std::unique_ptr<Lifecycle> get_lifecycle(void) = 0;
     virtual std::unique_ptr<Completions> get_completions(void) = 0;
-    virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type) = 0;
+    virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, 
+        rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0;
     virtual std::unique_ptr<GCChain> get_gc_chain(rgw::sal::Object* obj) = 0;
     virtual std::unique_ptr<Writer> get_writer(Aio* aio, rgw::sal::Bucket* bucket,
               RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
index c77fee75a6189e2f7586f2e4766091627234d0a0..ca6574b8c9cf16456dfb94e080a1190474eded11 100644 (file)
@@ -932,9 +932,10 @@ std::unique_ptr<Completions> RadosStore::get_completions(void)
 
 std::unique_ptr<Notification> RadosStore::get_notification(rgw::sal::Object* obj,
                                                            struct req_state* s,
-                                                           rgw::notify::EventType event_type)
+                                                           rgw::notify::EventType event_type,
+                                                            const std::string* object_name)
 {
-  return std::unique_ptr<Notification>(new RadosNotification(s, this, obj, s, event_type));
+  return std::unique_ptr<Notification>(new RadosNotification(s, this, obj, s, event_type, object_name));
 }
 
 std::unique_ptr<GCChain> RadosStore::get_gc_chain(rgw::sal::Object* obj)
index 34aaefd8361de32ae143cbd7b74ed461dbca95a1..23cbc9942bc48b3e2613de6001682d0e69d32941 100644 (file)
@@ -402,7 +402,7 @@ class RadosStore : public Store {
     virtual int cluster_stat(RGWClusterStat& stats) override;
     virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
     virtual std::unique_ptr<Completions> get_completions(void) override;
-    virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type) override;
+    virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
     virtual std::unique_ptr<GCChain> get_gc_chain(rgw::sal::Object* obj) override;
     virtual std::unique_ptr<Writer> get_writer(Aio* aio, rgw::sal::Bucket* bucket,
               RGWObjectCtx& obj_ctx, std::unique_ptr<rgw::sal::Object> _head_obj,
@@ -537,7 +537,9 @@ class RadosNotification : public Notification {
   rgw::notify::reservation_t res;
 
   public:
-    RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s, rgw::notify::EventType _type) : Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj) { }
+    RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s, 
+        rgw::notify::EventType _type, const std::string* object_name=nullptr) : 
+      Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj, object_name) { }
     ~RadosNotification() = default;
 
     virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override;
index 1fb08f29b4ade8662d012b93d6eb6423f98bef77..54f4d99fcfcdddb89a5a1693441e0c283be9e769 100644 (file)
@@ -1722,37 +1722,49 @@ def test_ps_s3_metadata_on_master():
     response, status = s3_notification_conf.set_config()
     assert_equal(status/100, 2)
 
+    expected_keys = []
     # create objects in the bucket
     key_name = 'foo'
     key = bucket.new_key(key_name)
     key.set_metadata(meta_key, meta_value)
     key.set_contents_from_string('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa')
+    expected_keys.append(key_name)
 
     # create objects in the bucket using COPY
-    bucket.copy_key('copy_of_foo', bucket.name, key.name)
+    key_name = 'copy_of_foo'
+    bucket.copy_key(key_name, bucket.name, key.name)
+    expected_keys.append(key_name)
     
     # create another objects in the bucket using COPY
     # but override the metadata value
-    bucket.copy_key('another_copy_of_foo', bucket.name, key.name, metadata={meta_key: 'kaboom'})
+    key_name = 'another_copy_of_foo'
+    bucket.copy_key(key_name, bucket.name, key.name, metadata={meta_key: 'kaboom'})
 
     # create objects in the bucket using multi-part upload
     fp = tempfile.NamedTemporaryFile(mode='w+b')
-    object_size = 10*1024*1024
+    chunk_size = 1024*1024*5 # 5MB
+    object_size = 10*chunk_size
     content = bytearray(os.urandom(object_size))
     fp.write(content)
     fp.flush()
     fp.seek(0)
-    uploader = bucket.initiate_multipart_upload('multipart_foo',
+    key_name = 'multipart_foo'
+    uploader = bucket.initiate_multipart_upload(key_name,
             metadata={meta_key: meta_value})
-    uploader.upload_part_from_file(fp, 1)
+    for i in range(1,5):
+        uploader.upload_part_from_file(fp, i, size=chunk_size)
+        fp.seek(i*chunk_size)
     uploader.complete_upload()
     fp.close()
+    expected_keys.append(key_name)
 
     print('wait for 5sec for the messages...')
     time.sleep(5)
     # check amqp receiver
-    assert_equal(len(receiver.get_and_reset_events()), 3)
-
+    events = receiver.get_and_reset_events()
+    assert_equal(len(events), 4) # PUT, COPY, Multipart start, Multipart End
+    for event in events:
+        assert(event['Records'][0]['s3']['object']['key'] in expected_keys)
 
     # delete objects
     for key in bucket.list():
@@ -1760,7 +1772,7 @@ def test_ps_s3_metadata_on_master():
     print('wait for 5sec for the messages...')
     time.sleep(5)
     # check amqp receiver
-    assert_equal(len(receiver.get_and_reset_events()), 3)
+    #assert_equal(len(receiver.get_and_reset_events()), len(expected_keys))
 
     # cleanup
     stop_amqp_receiver(receiver, task)