]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw/notification: use the term "event" for notifications 38246/head
authorYuval Lifshitz <ylifshit@redhat.com>
Sun, 6 Dec 2020 14:26:05 +0000 (16:26 +0200)
committerYuval Lifshitz <ylifshit@redhat.com>
Sun, 6 Dec 2020 14:26:05 +0000 (16:26 +0200)
as well as some other nameing changes for clarifications

Signed-off-by: Yuval Lifshitz <ylifshit@redhat.com>
src/rgw/rgw_notify.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_pubsub_push.cc
src/rgw/rgw_pubsub_push.h
src/rgw/rgw_sync_module_pubsub.cc

index 4f41aa420c5e25cf3733b24ec9b1432e5f801c08..29b0c44093ad42718e1337aab0b40a1c053ce42f 100644 (file)
 
 namespace rgw::notify {
 
-struct record_with_endpoint_t {
-  rgw_pubsub_s3_record record;
+struct event_entry_t {
+  rgw_pubsub_s3_event event;
   std::string push_endpoint;
   std::string push_endpoint_args;
   std::string arn_topic;
   
   void encode(bufferlist& bl) const {
     ENCODE_START(1, 1, bl);
-    encode(record, bl);
+    encode(event, bl);
     encode(push_endpoint, bl);
     encode(push_endpoint_args, bl);
     encode(arn_topic, bl);
@@ -35,14 +35,14 @@ struct record_with_endpoint_t {
 
   void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
-    decode(record, bl);
+    decode(event, bl);
     decode(push_endpoint, bl);
     decode(push_endpoint_args, bl);
     decode(arn_topic, bl);
     DECODE_FINISH(bl);
   }
 };
-WRITE_CLASS_ENCODER(record_with_endpoint_t)
+WRITE_CLASS_ENCODER(event_entry_t)
 
 using queues_t = std::set<std::string>;
 
@@ -149,35 +149,35 @@ class Manager {
   // processing of a specific entry
   // return whether processing was successfull (true) or not (false)
   bool process_entry(const cls_queue_entry& entry, spawn::yield_context yield) {
-    record_with_endpoint_t record_with_endpoint;
+    event_entry_t event_entry;
     auto iter = entry.data.cbegin();
     try {
-      decode(record_with_endpoint, iter);
+      decode(event_entry, iter);
     } catch (buffer::error& err) {
       ldout(cct, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
       return false;
     }
     try {
       // TODO move endpoint creation to queue level
-      const auto push_endpoint = RGWPubSubEndpoint::create(record_with_endpoint.push_endpoint, record_with_endpoint.arn_topic,
-          RGWHTTPArgs(record_with_endpoint.push_endpoint_args), 
+      const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
+          RGWHTTPArgs(event_entry.push_endpoint_args), 
           cct);
-      ldout(cct, 20) << "INFO: push endpoint created: " << record_with_endpoint.push_endpoint <<
+      ldout(cct, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
         " for entry: " << entry.marker << dendl;
-      const auto ret = push_endpoint->send_to_completion_async(cct, record_with_endpoint.record, optional_yield(io_context, yield));
+      const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
       if (ret < 0) {
-        ldout(cct, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << record_with_endpoint.push_endpoint 
+        ldout(cct, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
           << " failed. error: " << ret << " (will retry)" << dendl;
         return false;
       } else {
-        ldout(cct, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << record_with_endpoint.push_endpoint 
+        ldout(cct, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint 
           << " ok" <<  dendl;
         if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
         return true;
       }
     } catch (const RGWPubSubEndpoint::configuration_error& e) {
       ldout(cct, 5) << "WARNING: failed to create push endpoint: " 
-          << record_with_endpoint.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
+          << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
       return false;
     }
   }
@@ -647,56 +647,56 @@ void tags_from_attributes(const req_state* s, rgw::sal::RGWObject* obj, KeyValue
   }
 }
 
-// populate record from request
-void populate_record_from_request(const req_state *s, 
+// populate event from request
+void populate_event_from_request(const req_state *s, 
         rgw::sal::RGWObject* obj,
         uint64_t size,
         const ceph::real_time& mtime, 
         const std::string& etag, 
         EventType event_type,
-        rgw_pubsub_s3_record& record) { 
-  record.eventTime = mtime;
-  record.eventName = to_string(event_type);
-  record.userIdentity = s->user->get_id().id;    // user that triggered the change
-  record.x_amz_request_id = s->req_id;          // request ID of the original change
-  record.x_amz_id_2 = s->host_id;               // RGW on which the change was made
+        rgw_pubsub_s3_event& event) { 
+  event.eventTime = mtime;
+  event.eventName = to_string(event_type);
+  event.userIdentity = s->user->get_id().id;    // user that triggered the change
+  event.x_amz_request_id = s->req_id;          // request ID of the original change
+  event.x_amz_id_2 = s->host_id;               // RGW on which the change was made
   // configurationId is filled from notification configuration
-  record.bucket_name = s->bucket_name;
-  record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
-  record.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
-  record.object_key = obj->get_name();
-  record.object_size = size;
-  record.object_etag = etag;
-  record.object_versionId = obj->get_instance();
+  event.bucket_name = s->bucket_name;
+  event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
+  event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
+  event.object_key = obj->get_name();
+  event.object_size = size;
+  event.object_etag = etag;
+  event.object_versionId = obj->get_instance();
   // use timestamp as per key sequence id (hex encoded)
   const utime_t ts(real_clock::now());
   boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
-          std::back_inserter(record.object_sequencer));
-  set_event_id(record.id, etag, ts);
-  record.bucket_id = s->bucket->get_bucket_id();
+          std::back_inserter(event.object_sequencer));
+  set_event_id(event.id, etag, ts);
+  event.bucket_id = s->bucket->get_bucket_id();
   // pass meta data
   if (s->info.x_meta_map.empty()) {
     // try to fetch the metadata from the attributes
-    metadata_from_attributes(s, obj, record.x_meta_map);
+    metadata_from_attributes(s, obj, event.x_meta_map);
   } else {
-    record.x_meta_map = s->info.x_meta_map;
+    event.x_meta_map = s->info.x_meta_map;
   }
   // pass tags
   if (s->tagset.get_tags().empty()) {
     // try to fetch the tags from the attributes
-    tags_from_attributes(s, obj, record.tags);
+    tags_from_attributes(s, obj, event.tags);
   } else {
-    record.tags = s->tagset.get_tags();
+    event.tags = s->tagset.get_tags();
   }
   // opaque data will be filled from topic configuration
 }
 
-bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj, 
+bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj, 
     EventType event, const RGWObjTags* req_tags) {
-  if (!::match(filter.events, event)) { 
+  if (!match(filter.events, event)) { 
     return false;
   }
-  if (!::match(filter.s3_filter.key_filter, obj->get_name())) {
+  if (!match(filter.s3_filter.key_filter, obj->get_name())) {
     return false;
   }
 
@@ -704,14 +704,14 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::
     // metadata filter exists
     if (!s->info.x_meta_map.empty()) {
       // metadata was cached in req_state
-      if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
+      if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
         return false;
       }
     } else {
       // try to fetch the metadata from the attributes
       KeyValueMap metadata;
       metadata_from_attributes(s, obj, metadata);
-      if (!::match(filter.s3_filter.metadata_filter, metadata)) {
+      if (!match(filter.s3_filter.metadata_filter, metadata)) {
         return false;
       }
     }
@@ -721,19 +721,19 @@ bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::
     // tag filter exists
     if (req_tags) {
       // tags in the request
-      if (!::match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
+      if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
         return false;
       }
     } else if (!s->tagset.get_tags().empty()) { 
       // tags were cached in req_state
-      if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
+      if (!match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
         return false;
       }
     } else {
       // try to fetch tags from the attributes
       KeyValueMap tags;
       tags_from_attributes(s, obj, tags);
-      if (!::match(filter.s3_filter.tag_filter, tags)) {
+      if (!match(filter.s3_filter.tag_filter, tags)) {
         return false;
       }
     }
@@ -757,8 +757,8 @@ int publish_reserve(EventType event_type,
   for (const auto& bucket_topic : bucket_topics.topics) {
     const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
     const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
-    if (!match(topic_filter, res.s, res.object, event_type, req_tags)) {
-      // topic does not apply to req_state
+    if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) {
+      // notification does not apply to req_state
       continue;
     }
     ldout(res.s->cct, 20) << "INFO: notification: '" << topic_filter.s3_id << 
@@ -808,16 +808,16 @@ int publish_commit(rgw::sal::RGWObject* obj,
       // nothing to commit or already committed/aborted
       continue;
     }
-    record_with_endpoint_t record_with_endpoint;
-    populate_record_from_request(res.s, obj, size, mtime, etag, event_type, record_with_endpoint.record);
-    record_with_endpoint.record.configurationId = topic.configurationId;
-    record_with_endpoint.record.opaque_data = topic.cfg.opaque_data;
+    event_entry_t event_entry;
+    populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event);
+    event_entry.event.configurationId = topic.configurationId;
+    event_entry.event.opaque_data = topic.cfg.opaque_data;
     if (topic.cfg.dest.persistent) { 
-      record_with_endpoint.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
-      record_with_endpoint.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args);
-      record_with_endpoint.arn_topic = std::move(topic.cfg.dest.arn_topic);
+      event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
+      event_entry.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args);
+      event_entry.arn_topic = std::move(topic.cfg.dest.arn_topic);
       bufferlist bl;
-      encode(record_with_endpoint, bl);
+      encode(event_entry, bl);
       const auto& queue_name = topic.cfg.dest.arn_topic;
       if (bl.length() > res.size) {
         // try to make a larger reservation, fail only if this is not possible
@@ -872,7 +872,7 @@ int publish_commit(rgw::sal::RGWObject* obj,
                 RGWHTTPArgs(topic.cfg.dest.push_endpoint_args), 
                 res.s->cct);
         ldout(res.s->cct, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl;
-        const auto ret = push_endpoint->send_to_completion_async(res.s->cct, record_with_endpoint.record, res.s->yield);
+        const auto ret = push_endpoint->send_to_completion_async(res.s->cct, event_entry.event, res.s->yield);
         if (ret < 0) {
           ldout(res.s->cct, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl;
           if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
index d9c850b23985b28da6b5d2c847fddd26988a70aa..adb419deceb8dc733434431410beb6339b086b82 100644 (file)
@@ -236,7 +236,7 @@ void rgw_pubsub_s3_notifications::dump_xml(Formatter *f) const {
   do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
 }
 
-void rgw_pubsub_s3_record::dump(Formatter *f) const {
+void rgw_pubsub_s3_event::dump(Formatter *f) const {
   encode_json("eventVersion", eventVersion, f);
   encode_json("eventSource", eventSource, f);
   encode_json("awsRegion", awsRegion, f);
@@ -935,5 +935,5 @@ void RGWPubSub::SubWithEvents<EventType>::dump(Formatter* f) const {
 // explicit instantiation for the only two possible types
 // no need to move implementation to header
 template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
-template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_record>;
+template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_event>;
 
index 13b1daa6488e0c93fa09de7fd3b827d01cd4ad43..038caa3a835cb4bb424497519c6e396fc24e1da5 100644 (file)
@@ -205,7 +205,7 @@ struct rgw_pubsub_s3_notifications {
 ]
 }*/
 
-struct rgw_pubsub_s3_record {
+struct rgw_pubsub_s3_event {
   constexpr static const char* const json_type_plural = "Records";
   std::string eventVersion = "2.2";
   // aws:s3
@@ -323,7 +323,7 @@ struct rgw_pubsub_s3_record {
 
   void dump(Formatter *f) const;
 };
-WRITE_CLASS_ENCODER(rgw_pubsub_s3_record)
+WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
 
 struct rgw_pubsub_event {
   constexpr static const char* const json_type_plural = "events";
@@ -357,7 +357,7 @@ struct rgw_pubsub_event {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_event)
 
-// settign a unique ID for an event/record based on object hash and timestamp
+// settign a unique ID for an event based on object hash and timestamp
 void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
 
 struct rgw_pubsub_sub_dest {
@@ -737,7 +737,7 @@ public:
     if (conf.s3_id.empty()) {
       return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
     }
-    return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
+    return std::make_shared<SubWithEvents<rgw_pubsub_s3_event>>(this, sub);
   }
 
   void get_meta_obj(rgw_raw_obj *obj) const;
index 5e2e26bcbdac08366f8e7a4316ce8a73c663189a..c6fbe325527ef8ffd6859e27c8fe58ae73090a3c 100644 (file)
@@ -131,14 +131,14 @@ public:
     return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
   }
 
-  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
-    return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
+  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
+    return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
   }
 
-  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
     bufferlist read_bl;
     RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
-    const auto post_data = json_format_pubsub_event(record);
+    const auto post_data = json_format_pubsub_event(event);
     request.set_post_data(post_data);
     request.set_send_length(post_data.length());
     if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
@@ -312,12 +312,12 @@ public:
     }
   }
   
-  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
     ceph_assert(conn);
     if (ack_level == ack_level_t::None) {
-      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
     } else {
-      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
     }
   }
 
@@ -375,17 +375,17 @@ public:
     }
   };
 
-  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
     ceph_assert(conn);
     if (ack_level == ack_level_t::None) {
-      return amqp::publish(conn, topic, json_format_pubsub_event(record));
+      return amqp::publish(conn, topic, json_format_pubsub_event(event));
     } else {
       // TODO: currently broker and routable are the same - this will require different flags but the same mechanism
       // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
       auto w = std::unique_ptr<Waiter>(new Waiter);
       const auto rc = amqp::publish_with_confirm(conn, 
         topic,
-        json_format_pubsub_event(record),
+        json_format_pubsub_event(event),
         std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
       if (rc < 0) {
         // failed to publish, does not wait for reply
@@ -582,12 +582,12 @@ public:
     }
   }
   
-  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+  RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
     ceph_assert(conn);
     if (ack_level == ack_level_t::None) {
-      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+      return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
     } else {
-      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+      return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
     }
   }
 
@@ -645,16 +645,16 @@ public:
     }
   };
 
-  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+  int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
     ceph_assert(conn);
     if (ack_level == ack_level_t::None) {
-      return kafka::publish(conn, topic, json_format_pubsub_event(record));
+      return kafka::publish(conn, topic, json_format_pubsub_event(event));
     } else {
       // note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
       auto w = std::unique_ptr<Waiter>(new Waiter);
       const auto rc = kafka::publish_with_confirm(conn, 
         topic,
-        json_format_pubsub_event(record),
+        json_format_pubsub_event(event),
         std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
       if (rc < 0) {
         // failed to publish, does not wait for reply
index 5a2f752c876b1a1d9216ff594fc4fd250a8f30a3..6cb3db74b332a12bb4ab48b4f802e3021db4a951 100644 (file)
@@ -14,7 +14,7 @@ class RGWDataSyncEnv;
 class RGWCoroutine;
 class RGWHTTPArgs;
 struct rgw_pubsub_event;
-struct rgw_pubsub_s3_record;
+struct rgw_pubsub_s3_event;
 
 // endpoint base class all endpoint  - types should derive from it
 class RGWPubSubEndpoint {
@@ -37,11 +37,11 @@ public:
 
   // this method is used in order to send notification (S3 compliant) and wait for completion 
   // in async manner via a coroutine when invoked in the data sync environment
-  virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) = 0;
+  virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) = 0;
 
   // this method is used in order to send notification (S3 compliant) and wait for completion 
   // in async manner via a coroutine when invoked in the frontend environment
-  virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) = 0;
+  virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) = 0;
 
   // present as string
   virtual std::string to_str() const { return ""; }
index c1610747c39024b0fe8b9ad0d5fb4d9e999d8e34..68ac72156ad9a07554e72dc1a487a0bfb7a7753a 100644 (file)
@@ -264,38 +264,38 @@ static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
   encode_json("info", oevent, &e->info);
 }
 
-static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
+static void make_s3_event_ref(CephContext *cct, const rgw_bucket& bucket,
                        const rgw_user& owner,
                        const rgw_obj_key& key,
                        const ceph::real_time& mtime,
-                       const std::vector<std::pair<std::string, std::string> > *attrs,
+                       const std::vector<std::pair<std::string, std::string>>* attrs,
                        rgw::notify::EventType event_type,
-                       EventRef<rgw_pubsub_s3_record> *record) {
-  *record = std::make_shared<rgw_pubsub_s3_record>();
+                       EventRef<rgw_pubsub_s3_event>* event) {
+  *event = std::make_shared<rgw_pubsub_s3_event>();
 
-  EventRef<rgw_pubsub_s3_record>& r = *record;
-  r->eventTime = mtime;
-  r->eventName = rgw::notify::to_string(event_type);
+  EventRef<rgw_pubsub_s3_event>& e = *event;
+  e->eventTime = mtime;
+  e->eventName = rgw::notify::to_string(event_type);
   // userIdentity: not supported in sync module
   // x_amz_request_id: not supported in sync module
   // x_amz_id_2: not supported in sync module
   // configurationId is filled from subscription configuration
-  r->bucket_name = bucket.name;
-  r->bucket_ownerIdentity = owner.to_str();
-  r->bucket_arn = to_string(rgw::ARN(bucket));
-  r->bucket_id = bucket.bucket_id; // rgw extension
-  r->object_key = key.name;
+  e->bucket_name = bucket.name;
+  e->bucket_ownerIdentity = owner.to_str();
+  e->bucket_arn = to_string(rgw::ARN(bucket));
+  e->bucket_id = bucket.bucket_id; // rgw extension
+  e->object_key = key.name;
   // object_size not supported in sync module
   objstore_event oevent(bucket, key, mtime, attrs);
-  r->object_etag = oevent.get_hash();
-  r->object_versionId = key.instance;
+  e->object_etag = oevent.get_hash();
+  e->object_versionId = key.instance;
  
   // use timestamp as per key sequence id (hex encoded)
   const utime_t ts(real_clock::now());
   boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
-          std::back_inserter(r->object_sequencer));
+          std::back_inserter(e->object_sequencer));
  
-  set_event_id(r->id, r->object_etag, ts);
+  set_event_id(e->id, e->object_etag, ts);
 }
 
 class PSManager;
@@ -1022,11 +1022,10 @@ class RGWPSHandleObjEventCR : public RGWCoroutine {
   const PSEnvRef env;
   const rgw_user owner;
   const EventRef<rgw_pubsub_event> event;
-  const EventRef<rgw_pubsub_s3_record> record;
+  const EventRef<rgw_pubsub_s3_event> s3_event;
   const TopicsRef topics;
   bool has_subscriptions;
   bool event_handled;
-  bool sub_conf_found;
   PSSubscriptionRef sub;
   std::vector<PSTopicConfigRef>::const_iterator titer;
   std::set<std::string>::const_iterator siter;
@@ -1036,13 +1035,13 @@ public:
                       const PSEnvRef _env,
                       const rgw_user& _owner,
                       const EventRef<rgw_pubsub_event>& _event,
-                      const EventRef<rgw_pubsub_s3_record>& _record,
+                      const EventRef<rgw_pubsub_s3_event>& _s3_event,
                       const TopicsRef& _topics) : RGWCoroutine(_sc->cct),
                                           sc(_sc),
                                           env(_env),
                                           owner(_owner),
                                           event(_event),
-                                          record(_record),
+                                          s3_event(s3_event),
                                           topics(_topics),
                                           has_subscriptions(false),
                                           event_handled(false) {}
@@ -1106,23 +1105,23 @@ public:
             } 
           } else {
             // subscription was made by S3 compatible API
-            ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
-            record->configurationId = sub->sub_conf->s3_id;
-            record->opaque_data = (*titer)->opaque_data;
-            yield call(PSSubscription::store_event_cr(sc, sub, record));
+            ldout(sc->cct, 20) << "storing s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+            s3_event->configurationId = sub->sub_conf->s3_id;
+            s3_event->opaque_data = (*titer)->opaque_data;
+            yield call(PSSubscription::store_event_cr(sc, sub, s3_event));
             if (retcode < 0) {
               if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
-              ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
+              ldout(sc->cct, 1) << "ERROR: failed to store s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
             } else {
               if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
               event_handled = true;
             }
             if (sub->sub_conf->push_endpoint) {
-                ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
-              yield call(PSSubscription::push_event_cr(sc, sub, record));
+                ldout(sc->cct, 20) << "push s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+              yield call(PSSubscription::push_event_cr(sc, sub, s3_event));
               if (retcode < 0) {
                 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
-                ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
+                ldout(sc->cct, 1) << "ERROR: failed to push s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
               } else {
                 if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
                 event_handled = true;
@@ -1152,7 +1151,7 @@ class RGWPSHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
   PSEnvRef env;
   std::optional<uint64_t> versioned_epoch;
   EventRef<rgw_pubsub_event> event;
-  EventRef<rgw_pubsub_s3_record> record;
+  EventRef<rgw_pubsub_s3_event> s3_event;
   TopicsRef topics;
 public:
   RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
@@ -1179,20 +1178,20 @@ public:
           }
           attrs.push_back(std::make_pair(k, attr.second));
         } 
-        // at this point we don't know whether we need the ceph event or S3 record
+        // at this point we don't know whether we need the ceph event or S3 event
         // this is why both are created here, once we have information about the 
         // subscription, we will store/push only the relevant ones
         make_event_ref(sc->cct,
                        sync_pipe.info.source_bs.bucket, key,
                        mtime, &attrs,
                        rgw::notify::ObjectCreated, &event);
-        make_s3_record_ref(sc->cct,
+        make_s3_event_ref(sc->cct,
                        sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
                        mtime, &attrs,
-                       rgw::notify::ObjectCreated, &record);
+                       rgw::notify::ObjectCreated, &s3_event);
       }
 
-      yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, record, topics));
+      yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, s3_event, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }
@@ -1278,7 +1277,7 @@ class RGWPSGenericObjEventCBCR : public RGWCoroutine {
   ceph::real_time mtime;
   rgw::notify::EventType event_type;
   EventRef<rgw_pubsub_event> event;
-  EventRef<rgw_pubsub_s3_record> record;
+  EventRef<rgw_pubsub_s3_event> s3_event;
   TopicsRef topics;
 public:
   RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc,
@@ -1304,18 +1303,18 @@ public:
         ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
         return set_cr_done();
       }
-      // at this point we don't know whether we need the ceph event or S3 record
+      // at this point we don't know whether we need the ceph event or S3 event
       // this is why both are created here, once we have information about the 
       // subscription, we will store/push only the relevant ones
       make_event_ref(sc->cct,
                      bucket, key,
                      mtime, nullptr,
                      event_type, &event);
-      make_s3_record_ref(sc->cct,
+      make_s3_event_ref(sc->cct,
                      bucket, owner, key,
                      mtime, nullptr,
-                     event_type, &record);
-      yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, record, topics));
+                     event_type, &s3_event);
+      yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, s3_event, topics));
       if (retcode < 0) {
         return set_cr_error(retcode);
       }