]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgwlc: optionally support notifications on object expiration
authorMatt Benjamin <mbenjamin@redhat.com>
Tue, 26 Jan 2021 17:52:47 +0000 (12:52 -0500)
committerMatt Benjamin <mbenjamin@redhat.com>
Tue, 4 Jan 2022 14:26:09 +0000 (09:26 -0500)
Most of the work is to remove direct knowledge of req_state from
methods in rgw_notify.

I've chosen to create new notification types matching the different
expire actions (but not transition).  The new event types are not
nested under Delete.  Notifications are sent iff rgw_lc_notify is true
(default false).

Adjusted per comments in initial review, in particular, notification from
lifecycle is no longer conditional on a config setting, and constness
is restored.

Fixes: https://tracker.ceph.com/issues/49068
Signed-off-by: Matt Benjamin <mbenjamin@redhat.com>
12 files changed:
src/rgw/rgw_lc.cc
src/rgw/rgw_notify.cc
src/rgw/rgw_notify.h
src/rgw/rgw_notify_event_type.cc
src/rgw/rgw_notify_event_type.h
src/rgw/rgw_op.cc
src/rgw/rgw_sal.h
src/rgw/rgw_sal_dbstore.cc
src/rgw/rgw_sal_dbstore.h
src/rgw/rgw_sal_rados.cc
src/rgw/rgw_sal_rados.h
src/rgw/rgw_sync_module_pubsub_rest.cc

index c930863d6a08ede4e03fee60edc2ba5f5551eeb6..2f8d381ddf50e438c278f0a1feea6da1ee3f43f0 100644 (file)
 #include "rgw_zone.h"
 #include "rgw_string.h"
 #include "rgw_multi.h"
-#include "rgw_sal.h"
+#include "rgw_sal_rados.h"
 #include "rgw_rados.h"
 #include "rgw_lc_tier.h"
+#include "rgw_notify.h"
 
 // this seems safe to use, at least for now--arguably, we should
 // prefer header-only fmt, in general
@@ -565,7 +566,13 @@ struct lc_op_ctx {
 
 }; /* lc_op_ctx */
 
-static int remove_expired_obj(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed)
+
+static std::string lc_id = "rgw lifecycle";
+static std::string lc_req_id = "0";
+
+static int remove_expired_obj(
+  const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed,
+  rgw::notify::EventType event_type)
 {
   auto& store = oc.store;
   auto& bucket_info = oc.bucket->get_info();
@@ -590,16 +597,48 @@ static int remove_expired_obj(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool
   }
 
   obj = bucket->get_object(obj_key);
-  std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = obj->get_delete_op(&oc.rctx);
-
-  del_op->params.versioning_status = obj->get_bucket()->get_info().versioning_status();
+  std::unique_ptr<rgw::sal::Object::DeleteOp> del_op
+    = obj->get_delete_op(&oc.rctx);
+  del_op->params.versioning_status
+    = obj->get_bucket()->get_info().versioning_status();
   del_op->params.obj_owner.set_id(rgw_user {meta.owner});
   del_op->params.obj_owner.set_name(meta.owner_display_name);
   del_op->params.bucket_owner.set_id(bucket_info.owner);
   del_op->params.unmod_since = meta.mtime;
   del_op->params.marker_version_id = version_id;
 
-  return del_op->delete_obj(dpp, null_yield);
+  std::unique_ptr<rgw::sal::Notification> notify
+    = store->get_notification(dpp, obj.get(), nullptr, &oc.rctx, event_type,
+                             bucket.get(), lc_id,
+                             const_cast<std::string&>(oc.bucket->get_tenant()),
+                             lc_req_id, null_yield);
+
+  /* can eliminate cast when reservation is lifted into Notification */
+  auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
+
+  ret = rgw::notify::publish_reserve(dpp, event_type, notify_res, nullptr);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1)
+      << "ERROR: notify reservation failed, deferring delete of object k="
+      << o.key
+      << dendl;
+    return ret;
+  }
+
+  ret =  del_op->delete_obj(dpp, null_yield);
+  if (ret < 0) {
+    ldpp_dout(dpp, 1) <<
+      "ERROR: publishing notification failed, with error: " << ret << dendl;
+  } else {
+      // send request to notification manager
+    (void) rgw::notify::publish_commit(
+      obj.get(), obj->get_obj_size(), ceph::real_clock::now(),
+      obj->get_attrs()[RGW_ATTR_ETAG].to_str(), version_id, event_type,
+      notify_res, dpp);
+  }
+
+  return ret;
+
 } /* remove_expired_obj */
 
 class LCOpAction {
@@ -1077,7 +1116,8 @@ public:
     auto& o = oc.o;
     int r;
     if (o.is_delete_marker()) {
-      r = remove_expired_obj(oc.dpp, oc, true);
+      r = remove_expired_obj(oc.dpp, oc, true,
+                            rgw::notify::ObjectDeleteMarkerExpiration);
       if (r < 0) {
        ldpp_dout(oc.dpp, 0) << "ERROR: current is-dm remove_expired_obj "
                         << oc.bucket << ":" << o.key
@@ -1090,7 +1130,8 @@ public:
                       << " " << oc.wq->thr_name() << dendl;
     } else {
       /* ! o.is_delete_marker() */
-      r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned());
+      r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned(),
+                            rgw::notify::ObjectExpiration);
       if (r < 0) {
        ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj "
                         << oc.bucket << ":" << o.key
@@ -1137,7 +1178,8 @@ public:
 
   int process(lc_op_ctx& oc) {
     auto& o = oc.o;
-    int r = remove_expired_obj(oc.dpp, oc, true);
+    int r = remove_expired_obj(oc.dpp, oc, true,
+                              rgw::notify::ObjectNoncurrentExpiration);
     if (r < 0) {
       ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (non-current expiration) " 
                       << oc.bucket << ":" << o.key
@@ -1181,7 +1223,8 @@ public:
 
   int process(lc_op_ctx& oc) {
     auto& o = oc.o;
-    int r = remove_expired_obj(oc.dpp, oc, true);
+    int r = remove_expired_obj(oc.dpp, oc, true,
+                              rgw::notify::ObjectDeleteMarkerExpiration);
     if (r < 0) {
       ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
                       << oc.bucket << ":" << o.key
@@ -1279,11 +1322,11 @@ public:
     /* If bucket is versioned, create delete_marker for current version
      */
     if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
-        ret = remove_expired_obj(oc.dpp, oc, false);
-        ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch:  " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
+      ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration);
+      ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch:  " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
     } else {
-        ret = remove_expired_obj(oc.dpp, oc, true);
-        ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch:  " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
+      ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration);
+      ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch:  " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
     }
     return ret;
   }
index cd1bdddb1f7c864e8866996b8f6eaa6077090f19..0875987aa607fb200e0cd04eb519dc3f85632b40 100644 (file)
@@ -616,22 +616,25 @@ int remove_persistent_topic(const std::string& topic_name, optional_yield y) {
   return s_manager->remove_persistent_topic(topic_name, y);
 }
 
-rgw::sal::Object* get_object_with_atttributes(const req_state* s, rgw::sal::Object* obj) {
+rgw::sal::Object* get_object_with_atttributes(
+  const reservation_t& res, rgw::sal::Object* obj) {
   // in case of copy obj, the tags and metadata are taken from source
-  const auto src_obj = s->src_object ? s->src_object.get() : obj;
+  const auto src_obj = res.src_object ? res.src_object : obj;
   if (src_obj->get_attrs().empty()) {
     if (!src_obj->get_bucket()) {
-      src_obj->set_bucket(s->bucket.get());
+      src_obj->set_bucket(res.bucket);
     }
-    if (src_obj->get_obj_attrs(s->obj_ctx, s->yield, s) < 0) {
+    if (src_obj->get_obj_attrs(res.obj_ctx, res.yield, res.dpp) < 0) {
       return nullptr;
     }
   }
   return src_obj;
 }
 
-void metadata_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyValueMap& metadata) {
-  const auto src_obj = get_object_with_atttributes(s, obj);
+static inline void metadata_from_attributes(
+  reservation_t& res, rgw::sal::Object* obj) {
+  auto& metadata = res.x_meta_map;
+  const auto src_obj = get_object_with_atttributes(res, obj);
   if (!src_obj) {
     return;
   }
@@ -646,8 +649,9 @@ void metadata_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyValu
   }
 }
 
-void tags_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyMultiValueMap& tags) {
-  const auto src_obj = get_object_with_atttributes(s, obj);
+static inline void tags_from_attributes(
+  const reservation_t& res, rgw::sal::Object* obj, KeyMultiValueMap& tags) {
+  const auto src_obj = get_object_with_atttributes(res, obj);
   if (!src_obj) {
     return;
   }
@@ -667,7 +671,7 @@ void tags_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyMultiVal
 }
 
 // populate event from request
-void populate_event_from_request(const reservation_t& res, 
+static inline void populate_event(reservation_t& res,
         rgw::sal::Object* obj,
         uint64_t size,
         const ceph::real_time& mtime, 
@@ -675,16 +679,15 @@ void populate_event_from_request(const reservation_t& res,
         const std::string& version, 
         EventType event_type,
         rgw_pubsub_s3_event& event) {
-  const auto s = res.s;
   event.eventTime = mtime;
-  event.eventName = to_event_string(event_type);
-  event.userIdentity = s->user->get_id().id;    // user that triggered the change
-  event.x_amz_request_id = s->req_id;          // request ID of the original change
-  event.x_amz_id_2 = s->host_id;               // RGW on which the change was made
+  event.eventName = to_string(event_type);
+  event.userIdentity = res.user_id;    // user that triggered the change
+  event.x_amz_request_id = res.req_id; // request ID of the original change
+  event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made
   // configurationId is filled from notification configuration
-  event.bucket_name = s->bucket_name;
-  event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
-  event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
+  event.bucket_name = res.bucket->get_name();
+  event.bucket_ownerIdentity = res.bucket->get_owner()->get_id().id;
+  event.bucket_arn = to_string(rgw::ARN(res.bucket->get_key()));
   event.object_key = res.object_name ? *res.object_name : obj->get_name();
   event.object_size = size;
   event.object_etag = etag;
@@ -695,27 +698,30 @@ void populate_event_from_request(const reservation_t& res,
   boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
           std::back_inserter(event.object_sequencer));
   set_event_id(event.id, etag, ts);
-  event.bucket_id = s->bucket->get_bucket_id();
-  // pass metadata
-  if (res.cached_metadata.empty()) {
+  event.bucket_id = res.bucket->get_bucket_id();
+  // pass meta data
+  if (res.x_meta_map.empty()) {
     // no metadata cached:
     // either no metadata exist or no metadata filter was used
-    event.x_meta_map = s->info.x_meta_map;
-    metadata_from_attributes(s, obj, event.x_meta_map);
+    metadata_from_attributes(res, obj);
   } else {
-    event.x_meta_map = std::move(res.cached_metadata);
+      event.x_meta_map = res.x_meta_map;
   }
   // pass tags
-  if (s->tagset.get_tags().empty()) {
+  if (!res.tagset ||
+      (*res.tagset).get_tags().empty()) {
     // try to fetch the tags from the attributes
-    tags_from_attributes(s, obj, event.tags);
+    tags_from_attributes(res, obj, event.tags);
   } else {
-    event.tags = s->tagset.get_tags();
+    event.tags = (*res.tagset).get_tags();
   }
   // opaque data will be filled from topic configuration
 }
 
-bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) {
+static inline bool notification_match(reservation_t& res,
+                                     const rgw_pubsub_topic_filter& filter,
+                                     EventType event,
+                                     const RGWObjTags* req_tags) {
   if (!match(filter.events, event)) { 
     return false;
   }
@@ -725,12 +731,13 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte
     return false;
   }
 
-  const auto s = res.s;
   if (!filter.s3_filter.metadata_filter.kv.empty()) {
     // metadata filter exists
-    res.cached_metadata = s->info.x_meta_map;
-    metadata_from_attributes(s, obj, res.cached_metadata);
-    if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) {
+    if (res.s) {
+      res.x_meta_map = res.s->info.x_meta_map;
+    }
+    metadata_from_attributes(res, obj);
+    if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) {
       return false;
     }
   }
@@ -742,15 +749,15 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte
       if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
         return false;
       }
-    } else if (!s->tagset.get_tags().empty()) { 
+    } else if (res.tagset && !(*res.tagset).get_tags().empty()) {
       // tags were cached in req_state
-      if (!match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
+      if (!match(filter.s3_filter.tag_filter, (*res.tagset).get_tags())) {
         return false;
       }
     } else {
       // try to fetch tags from the attributes
       KeyMultiValueMap tags;
-      tags_from_attributes(s, obj, tags);
+      tags_from_attributes(res, obj, tags);
       if (!match(filter.s3_filter.tag_filter, tags)) {
         return false;
       }
@@ -760,12 +767,13 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte
   return true;
 }
 
-int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
-      reservation_t& res,
-      const RGWObjTags* req_tags)
+  int publish_reserve(const DoutPrefixProvider* dpp,
+                     EventType event_type,
+                     reservation_t& res,
+                     const RGWObjTags* req_tags)
 {
-  RGWPubSub ps(res.store, res.s->user->get_id().tenant);
-  RGWPubSub::Bucket ps_bucket(&ps, res.s->bucket->get_key());
+  RGWPubSub ps(res.store, res.user_tenant);
+  RGWPubSub::Bucket ps_bucket(&ps, res.bucket->get_key());
   rgw_pubsub_bucket_topics bucket_topics;
   auto rc = ps_bucket.get_topics(&bucket_topics);
   if (rc < 0) {
@@ -779,9 +787,9 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
       // notification does not apply to req_state
       continue;
     }
-    ldpp_dout(dpp, 20) << "INFO: notification: '" << topic_filter.s3_id << 
+    ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
         "' on topic: '" << topic_cfg.dest.arn_topic << 
-        "' and bucket: '" << res.s->bucket->get_name() << 
+        "' and bucket: '" << res.bucket->get_name() <<
         "' (unique topic: '" << topic_cfg.name <<
         "') apply to event of type: '" << to_string(event_type) << "'" << dendl;
 
@@ -795,17 +803,19 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
       int rval;
       const auto& queue_name = topic_cfg.dest.arn_topic;
       cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
-      auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(), 
-          queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
+      auto ret = rgw_rados_operate(
+       res.dpp, res.store->getRados()->get_notif_pool_ctx(),
+       queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
       if (ret < 0) {
-        ldpp_dout(dpp, 1) << "ERROR: failed to reserve notification on queue: " << queue_name 
-          << ". error: " << ret << dendl;
+        ldpp_dout(res.dpp, 1) <<
+         "ERROR: failed to reserve notification on queue: "
+                             << queue_name << ". error: " << ret << dendl;
         // if no space is left in queue we ask client to slow down
         return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
       }
       ret = cls_2pc_queue_reserve_result(obl, res_id);
       if (ret < 0) {
-        ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
+        ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
         return ret;
       }
     }
@@ -815,86 +825,102 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
 }
 
 int publish_commit(rgw::sal::Object* obj,
-        uint64_t size,
-        const ceph::real_time& mtime, 
-        const std::string& etag, 
-        const std::string& version,
-        EventType event_type,
-        reservation_t& res,
-        const DoutPrefixProvider *dpp) 
+                  uint64_t size,
+                  const ceph::real_time& mtime,
+                  const std::string& etag,
+                  const std::string& version,
+                  EventType event_type,
+                  reservation_t& res,
+                  const DoutPrefixProvider* dpp)
 {
   for (auto& topic : res.topics) {
-    if (topic.cfg.dest.persistent && topic.res_id == cls_2pc_reservation::NO_ID) {
+    if (topic.cfg.dest.persistent &&
+       topic.res_id == cls_2pc_reservation::NO_ID) {
       // nothing to commit or already committed/aborted
       continue;
     }
     event_entry_t event_entry;
-    populate_event_from_request(res, obj, size, mtime, etag, version, event_type, event_entry.event);
+    populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event);
     event_entry.event.configurationId = topic.configurationId;
     event_entry.event.opaque_data = topic.cfg.opaque_data;
     if (topic.cfg.dest.persistent) { 
       event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
-      event_entry.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args);
+      event_entry.push_endpoint_args =
+       std::move(topic.cfg.dest.push_endpoint_args);
       event_entry.arn_topic = std::move(topic.cfg.dest.arn_topic);
       bufferlist bl;
       encode(event_entry, bl);
       const auto& queue_name = topic.cfg.dest.arn_topic;
       if (bl.length() > res.size) {
         // try to make a larger reservation, fail only if this is not possible
-        ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() << " exceeded reserved size: " << res.size <<
-          " . trying to make a larger reservation on queue:" << queue_name << dendl;
+        ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
+                         << " exceeded reserved size: " << res.size
+                         <<
+          " . trying to make a larger reservation on queue:" << queue_name
+                         << dendl;
         // first cancel the existing reservation
         librados::ObjectWriteOperation op;
         cls_2pc_queue_abort(op, topic.res_id);
-        auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
-            topic.cfg.dest.arn_topic, &op,
-            res.s->yield);
+        auto ret = rgw_rados_operate(
+         dpp, res.store->getRados()->get_notif_pool_ctx(),
+         topic.cfg.dest.arn_topic, &op,
+         res.yield);
         if (ret < 0) {
-          ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id << 
+          ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
+                           << topic.res_id << 
             " when trying to make a larger reservation on queue: " << queue_name
-            << ". error: " << ret << dendl;
+                           << ". error: " << ret << dendl;
           return ret;
         }
         // now try to make a bigger one
-        bufferlist obl;
+       buffer::list obl;
         int rval;
         cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
-        ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(), 
-          queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
+        ret = rgw_rados_operate(
+         dpp, res.store->getRados()->get_notif_pool_ctx(),
+          queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
         if (ret < 0) {
-          ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: " << queue_name
-            << ". error: " << ret << dendl;
+          ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: "
+                           << queue_name
+                           << ". error: " << ret << dendl;
           return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
         }
         ret = cls_2pc_queue_reserve_result(obl, topic.res_id);
         if (ret < 0) {
-          ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for extra space. error: " << ret << dendl;
+          ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for "
+           "extra space. error: " << ret << dendl;
           return ret;
         }
       }
-      std::vector<bufferlist> bl_data_vec{std::move(bl)};
+      std::vector<buffer::list> bl_data_vec{std::move(bl)};
       librados::ObjectWriteOperation op;
       cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
-      const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
-            queue_name, &op,
-            res.s->yield);
+      const auto ret = rgw_rados_operate(
+       dpp, res.store->getRados()->get_notif_pool_ctx(),
+       queue_name, &op, res.yield);
       topic.res_id = cls_2pc_reservation::NO_ID;
       if (ret < 0) {
-        ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: " << queue_name
-          << ". error: " << ret << dendl;
+        ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
+                         << queue_name << ". error: " << ret
+                         << dendl;
         return ret;
       }
     } else {
       try {
         // TODO add endpoint LRU cache
-        const auto push_endpoint = RGWPubSubEndpoint::create(topic.cfg.dest.push_endpoint, 
-                topic.cfg.dest.arn_topic,
-                RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp), 
-                res.s->cct);
-        ldpp_dout(dpp, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl;
-        const auto ret = push_endpoint->send_to_completion_async(res.s->cct, event_entry.event, res.s->yield);
+        const auto push_endpoint = RGWPubSubEndpoint::create(
+         topic.cfg.dest.push_endpoint,
+         topic.cfg.dest.arn_topic,
+         RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
+         dpp->get_cct());
+        ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
+                              << topic.cfg.dest.push_endpoint << dendl;
+        const auto ret = push_endpoint->send_to_completion_async(
+         dpp->get_cct(), event_entry.event, res.yield);
         if (ret < 0) {
-          ldpp_dout(dpp, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl;
+          ldpp_dout(dpp, 1) << "ERROR: push to endpoint "
+                           << topic.cfg.dest.push_endpoint
+                           << " failed. error: " << ret << dendl;
           if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
           return ret;
         }
@@ -910,20 +936,22 @@ int publish_commit(rgw::sal::Object* obj,
   return 0;
 }
 
-int publish_abort(const DoutPrefixProvider *dpp, reservation_t& res) {
+extern int publish_abort(reservation_t& res) {
   for (auto& topic : res.topics) {
-    if (!topic.cfg.dest.persistent || topic.res_id == cls_2pc_reservation::NO_ID) {
+    if (!topic.cfg.dest.persistent ||
+       topic.res_id == cls_2pc_reservation::NO_ID) {
       // nothing to abort or already committed/aborted
       continue;
     }
     const auto& queue_name = topic.cfg.dest.arn_topic;
     librados::ObjectWriteOperation op;
     cls_2pc_queue_abort(op, topic.res_id);
-    const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
-      queue_name, &op,
-      res.s->yield);
+    const auto ret = rgw_rados_operate(
+      res.dpp, res.store->getRados()->get_notif_pool_ctx(),
+      queue_name, &op, res.yield);
     if (ret < 0) {
-      ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id << 
+      ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: "
+                           << topic.res_id <<
         " from queue: " << queue_name << ". error: " << ret << dendl;
       return ret;
     }
@@ -932,9 +960,45 @@ int publish_abort(const DoutPrefixProvider *dpp, reservation_t& res) {
   return 0;
 }
 
-reservation_t::~reservation_t() {
-  publish_abort(dpp, *this);
-}
+reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
+                            rgw::sal::RadosStore* _store,
+                            req_state* _s,
+                            rgw::sal::Object* _object,
+                            rgw::sal::Object* _src_object,
+                            const std::string* _object_name) :
+  dpp(_s), store(_store), s(_s), size(0) /* XXX */, obj_ctx(_s->obj_ctx),
+  object(_object), src_object(_src_object), bucket(_s->bucket.get()),
+  object_name(_object_name),
+  tagset(_s->tagset),
+  x_meta_map(_s->info.x_meta_map),
+  user_id(_s->user->get_id().id),
+  user_tenant(_s->user->get_id().tenant),
+  req_id(_s->req_id),
+  yield(_s->yield)
+{}
+
+reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
+                            rgw::sal::RadosStore* _store,
+                            RGWObjectCtx* _obj_ctx,
+                            rgw::sal::Object* _object,
+                            rgw::sal::Object* _src_object,
+                            rgw::sal::Bucket* _bucket,
+                            std::string& _user_id,
+                            std::string& _user_tenant,
+                            std::string& _req_id,
+                            optional_yield y) :
+    dpp(_dpp), store(_store), s(nullptr), size(0) /* XXX */,
+    obj_ctx(_obj_ctx),
+    object(_object), src_object(_src_object), bucket(_bucket),
+    object_name(nullptr),
+    user_id(_user_id),
+    user_tenant(_user_tenant),
+    req_id(_req_id),
+    yield(y)
+{}
 
+reservation_t::~reservation_t() {
+  publish_abort(*this);
 }
 
+} // namespace rgw::notify
index 5139f736644aaa36a5847b295067aa90d50b3f3c..6470e26783388557d7418ecda2af289de450effd 100644 (file)
@@ -43,27 +43,52 @@ int remove_persistent_topic(const std::string& topic_name, optional_yield y);
 // then used to commit or abort the reservation
 struct reservation_t {
   struct topic_t {
-    topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg, cls_2pc_reservation::id_t _res_id) :
-        configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {}
+    topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg,
+           const cls_2pc_reservation::id_t _res_id) :
+      configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {}
 
-    const std::string configurationId;
-    const rgw_pubsub_topic cfg;
+    std::string configurationId;
+    rgw_pubsub_topic cfg;
     // res_id is reset after topic is committed/aborted
     cls_2pc_reservation::id_t res_id;
   };
 
-  const DoutPrefixProvider *dpp;
+  const DoutPrefixProviderdpp;
   std::vector<topic_t> topics;
   rgw::sal::RadosStore* const store;
   const req_state* const s;
   size_t size;
+  RGWObjectCtx* obj_ctx;
   rgw::sal::Object* const object;
+  rgw::sal::Object* const src_object; // may differ from object
+  rgw::sal::Bucket* const bucket;
   const std::string* const object_name;
-  KeyValueMap cached_metadata;
-
-  reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s, 
-      rgw::sal::Object* _object, const std::string* _object_name) :
-      dpp(_dpp), store(_store), s(_s), object(_object), object_name(_object_name) {}
+  boost::optional<RGWObjTags&> tagset;
+  meta_map_t x_meta_map; // metadata cached by value
+  std::string user_id;
+  std::string user_tenant;
+  std::string req_id;
+  optional_yield yield;
+
+  /* ctor for rgw_op callers */
+  reservation_t(const DoutPrefixProvider* _dpp,
+               rgw::sal::RadosStore* _store,
+               req_state* _s,
+               rgw::sal::Object* _object,
+               rgw::sal::Object* _src_object,
+               const std::string* _object_name);
+
+  /* ctor for non-request caller (e.g., lifecycle) */
+  reservation_t(const DoutPrefixProvider* _dpp,
+               rgw::sal::RadosStore* _store,
+               RGWObjectCtx* _obj_ctx,
+               rgw::sal::Object* _object,
+               rgw::sal::Object* _src_object,
+               rgw::sal::Bucket* _bucket,
+               std::string& _user_id,
+               std::string& _user_tenant,
+               std::string& _req_id,
+               optional_yield y);
 
   // dtor doing resource leak guarding
   // aborting the reservation if not already committed or aborted
@@ -71,10 +96,10 @@ struct reservation_t {
 };
 
 // create a reservation on the 2-phase-commit queue
-int publish_reserve(const DoutPrefixProvider *dpp, 
-        EventType event_type,
-        reservation_t& reservation,
-        const RGWObjTags* req_tags);
+  int publish_reserve(const DoutPrefixProvider *dpp,
+                     EventType event_type,
+                     reservation_t& reservation,
+                     const RGWObjTags* req_tags);
 
 // commit the reservation to the queue
 int publish_commit(rgw::sal::Object* obj,
index 4af9a32f7e3b36eb6e26028a4c685781bcdc0482..fd1dc8538f09aaea7d6394e1fad89fdb7e76f3b9 100644 (file)
@@ -8,43 +8,53 @@ namespace rgw::notify {
 
   std::string to_string(EventType t) {
     switch (t) {
-      case ObjectCreated:
-        return "s3:ObjectCreated:*";
-      case ObjectCreatedPut:
-        return "s3:ObjectCreated:Put";
-      case ObjectCreatedPost:
-        return "s3:ObjectCreated:Post";
-      case ObjectCreatedCopy:
-        return "s3:ObjectCreated:Copy";
-      case ObjectCreatedCompleteMultipartUpload:
-        return "s3:ObjectCreated:CompleteMultipartUpload";
-      case ObjectRemoved:
-        return "s3:ObjectRemoved:*";
-      case ObjectRemovedDelete:
-        return "s3:ObjectRemoved:Delete";
-      case ObjectRemovedDeleteMarkerCreated:
-        return "s3:ObjectRemoved:DeleteMarkerCreated";
-      case UnknownEvent:
-        return "s3:UnknownEvet";
+    case ObjectCreated:
+      return "s3:ObjectCreated:*";
+    case ObjectCreatedPut:
+      return "s3:ObjectCreated:Put";
+    case ObjectCreatedPost:
+      return "s3:ObjectCreated:Post";
+    case ObjectCreatedCopy:
+      return "s3:ObjectCreated:Copy";
+    case ObjectCreatedCompleteMultipartUpload:
+      return "s3:ObjectCreated:CompleteMultipartUpload";
+    case ObjectRemoved:
+      return "s3:ObjectRemoved:*";
+    case ObjectRemovedDelete:
+      return "s3:ObjectRemoved:Delete";
+    case ObjectRemovedDeleteMarkerCreated:
+      return "s3:ObjectRemoved:DeleteMarkerCreated";
+    case ObjectExpiration:
+      return "s3:ObjectLifecycle:Expiration";
+    case ObjectNoncurrentExpiration:
+      return "s3:ObjectLifecycle:NoncurrentExpiration";
+    case ObjectDeleteMarkerExpiration:
+      return "s3:ObjectLifecycle:DeleteMarkerExpiration";
+    case UnknownEvent:
+        return "s3:UnknownEvent";
     }
     return "s3:UnknownEvent";
   }
 
   std::string to_ceph_string(EventType t) {
     switch (t) {
-      case ObjectCreated:
-      case ObjectCreatedPut:
-      case ObjectCreatedPost:
-      case ObjectCreatedCopy:
-      case ObjectCreatedCompleteMultipartUpload:
-        return "OBJECT_CREATE";
-      case ObjectRemovedDelete:
-        return "OBJECT_DELETE";
-      case ObjectRemovedDeleteMarkerCreated:
-        return "DELETE_MARKER_CREATE";
-      case ObjectRemoved:
-      case UnknownEvent:
-        return "UNKNOWN_EVENT";
+    case ObjectCreated:
+    case ObjectCreatedPut:
+    case ObjectCreatedPost:
+    case ObjectCreatedCopy:
+    case ObjectCreatedCompleteMultipartUpload:
+      return "OBJECT_CREATE";
+    case ObjectRemovedDelete:
+      return "OBJECT_DELETE";
+    case ObjectRemovedDeleteMarkerCreated:
+      return "DELETE_MARKER_CREATE";
+    case ObjectExpiration:
+    case ObjectNoncurrentExpiration:
+    case ObjectDeleteMarkerExpiration:
+      return "OBJECT_EXPIRATION";
+    case ObjectRemoved:
+    case UnknownEvent:
+      return "UNKNOWN_EVENT";
     }
     return "UNKNOWN_EVENT";
   }
index f255bfd744eb23971d1153d26dcc6efcba904fe8..89e3e2545500a9af20f7a23088fea817595af486 100644 (file)
@@ -15,7 +15,11 @@ namespace rgw::notify {
     ObjectRemoved                        = 0xF0,
     ObjectRemovedDelete                  = 0x10,
     ObjectRemovedDeleteMarkerCreated     = 0x20,
-    UnknownEvent                         = 0x100
+    // lifecycle events (RGW extension)
+    ObjectExpiration                     = 0x40,
+    ObjectNoncurrentExpiration           = 0x80,
+    ObjectDeleteMarkerExpiration         = 0x100,
+    UnknownEvent                         = 0x200
   };
 
   using EventTypeList = std::vector<EventType>;
index 5c03d205af641998a62991aadf0492bc0eadda6f..fcda234ab1253b495765bb3e2fea7f6d96df3a46 100644 (file)
@@ -3904,8 +3904,10 @@ void RGWPutObj::execute(optional_yield y)
   }
 
   // make reservation for notification if needed
-  std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
-                                               s, rgw::notify::ObjectCreatedPut);
+  std::unique_ptr<rgw::sal::Notification> res
+                    = store->get_notification(
+                      s->object.get(), s->src_object.get(), s,
+                      rgw::notify::ObjectCreatedPut);
   if(!multipart) {
     op_ret = res->publish_reserve(this, obj_tags.get());
     if (op_ret < 0) {
@@ -4304,7 +4306,8 @@ void RGWPostObj::execute(optional_yield y)
   }
 
   // make reservation for notification if needed
-  std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(), s, rgw::notify::ObjectCreatedPost);
+  std::unique_ptr<rgw::sal::Notification> res
+    = store->get_notification(s->object.get(), s->src_object.get(), s, rgw::notify::ObjectCreatedPost);
   op_ret = res->publish_reserve(this);
   if (op_ret < 0) {
     return;
@@ -4985,10 +4988,13 @@ void RGWDeleteObj::execute(optional_yield y)
 
     // make reservation for notification if needed
     const auto versioned_object = s->bucket->versioning_enabled();
-    const auto event_type = versioned_object && s->object->get_instance().empty() ? 
-        rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete;
-    std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
-                                                                       s, event_type);
+    const auto event_type = versioned_object &&
+      s->object->get_instance().empty() ?
+      rgw::notify::ObjectRemovedDeleteMarkerCreated :
+      rgw::notify::ObjectRemovedDelete;
+    std::unique_ptr<rgw::sal::Notification> res
+      = store->get_notification(s->object.get(), s->src_object.get(), s,
+                               event_type);
     op_ret = res->publish_reserve(this);
     if (op_ret < 0) {
       return;
@@ -5388,8 +5394,10 @@ void RGWCopyObj::execute(optional_yield y)
     return;
 
   // make reservation for notification if needed
-  std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
-                                               s, rgw::notify::ObjectCreatedCopy);
+  std::unique_ptr<rgw::sal::Notification> res
+                                  = store->get_notification(
+                                    s->object.get(), s->src_object.get(),
+                                    s, rgw::notify::ObjectCreatedCopy);
   op_ret = res->publish_reserve(this);
   if (op_ret < 0) {
     return;
@@ -6349,8 +6357,8 @@ void RGWCompleteMultipart::execute(optional_yield y)
   
 
   // make reservation for notification if needed
-  std::unique_ptr<rgw::sal::Notification> res = store->get_notification(meta_obj.get(),
-                               s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name());
+  std::unique_ptr<rgw::sal::Notification> res
+    = store->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name());
   op_ret = res->publish_reserve(this);
   if (op_ret < 0) {
     return;
@@ -6915,10 +6923,11 @@ void RGWDeleteMultiObj::execute(optional_yield y)
 
     // make reservation for notification if needed
     const auto versioned_object = s->bucket->versioning_enabled();
-    const auto event_type = versioned_object && obj->get_instance().empty() ? 
-        rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete;
-    std::unique_ptr<rgw::sal::Notification> res = store->get_notification(obj.get(),
-                                                                       s, event_type);
+    const auto event_type = versioned_object && obj->get_instance().empty() ?
+      rgw::notify::ObjectRemovedDeleteMarkerCreated :
+      rgw::notify::ObjectRemovedDelete;
+    std::unique_ptr<rgw::sal::Notification> res
+      = store->get_notification(obj.get(), s->src_object.get(), s, event_type);
     op_ret = res->publish_reserve(this);
     if (op_ret < 0) {
       send_partial_response(*iter, false, "", op_ret);
index 5bb3aff8e4c3fda00bd3e84ad0405eef5f345b87..6fd4c9c0f0bb697336a2baf9d2e6775de5c9c0a5 100644 (file)
@@ -284,10 +284,18 @@ class Store {
     virtual std::unique_ptr<Lifecycle> get_lifecycle(void) = 0;
     /** Get a @a Completions object.  Used for Async I/O tracking */
     virtual std::unique_ptr<Completions> get_completions(void) = 0;
-    /** Get a @a Notification object.  Used to communicate with non-RGW daemons, such as
-     * management/tracking software */
-    virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, 
+
+     /** Get a @a Notification object.  Used to communicate with non-RGW daemons, such as
+      * management/tracking software */
+    /** RGWOp variant */
+    virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, 
         rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0;
+    /** No-req_state variant (e.g., rgwlc) */
+    virtual std::unique_ptr<Notification> get_notification(
+    const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx,
+    rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant,
+    std::string& _req_id, optional_yield y) = 0;
+
     /** Get access to the lifecycle management thread */
     virtual RGWLC* get_rgwlc(void) = 0;
     /** Get access to the coroutine registry.  Used to create new coroutine managers */
@@ -1319,10 +1327,14 @@ public:
 class Notification {
 protected:
   Object* obj;
+  Object* src_obj;
   rgw::notify::EventType event_type;
 
   public:
-    Notification(Object* _obj, rgw::notify::EventType _type) : obj(_obj), event_type(_type) {}
+    Notification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type)
+      : obj(_obj), src_obj(_src_obj), event_type(_type)
+    {}
+
     virtual ~Notification() = default;
 
     /** Indicate the start of the event associated with this notification */
index 537f96694ebd44842f5fc7545bfd1a77223d8fff..a81fc94685b18ce64da9bcadceddb60ed9f7e3d4 100644 (file)
@@ -1716,11 +1716,21 @@ namespace rgw::sal {
     return new LCDBSerializer(store, oid, lock_name, cookie);
   }
 
-  std::unique_ptr<Notification> DBStore::get_notification(rgw::sal::Object* obj,
-      struct req_state* s,
-      rgw::notify::EventType event_type, const std::string* object_name)
+  std::unique_ptr<Notification> DBStore::get_notification(
+    rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
+    rgw::notify::EventType event_type, const std::string* object_name)
   {
-    return std::make_unique<DBNotification>(obj, event_type);
+    return std::make_unique<DBNotification>(obj, src_obj, event_type);
+  }
+
+  std::unique_ptr<Notification> DBStore::get_notification(
+    const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
+    rgw::sal::Object* src_obj, RGWObjectCtx* rctx,
+    rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
+    std::string& _user_id, std::string& _user_tenant, std::string& _req_id,
+    optional_yield y)
+  {
+    return std::make_unique<DBNotification>(obj, src_obj, event_type);
   }
 
   RGWLC* DBStore::get_rgwlc(void) {
index da77e68af9cfc6de6c1eca34e8241797e2c4b6d1..30da100d1084a9a3992bafeb1e9f35677952adce 100644 (file)
@@ -59,11 +59,9 @@ public:
 
 class DBNotification : public Notification {
 protected:
-  Object* obj;
-  rgw::notify::EventType event_type;
-
   public:
-    DBNotification(Object* _obj, rgw::notify::EventType _type) : Notification(_obj, _type), obj(_obj), event_type(_type) {}
+  DBNotification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type)
+    : Notification(_obj, _src_obj, _type) {}
     ~DBNotification() = default;
 
     virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override { return 0;}
@@ -715,11 +713,20 @@ public:
       virtual int cluster_stat(RGWClusterStat& stats) override;
       virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
       virtual std::unique_ptr<Completions> get_completions(void) override;
-      virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, 
-          rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
+
+  virtual std::unique_ptr<Notification> get_notification(
+    rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
+    rgw::notify::EventType event_type, const std::string* object_name) override;
+
+  virtual std::unique_ptr<Notification> get_notification(
+    const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
+    rgw::sal::Object* src_obj, RGWObjectCtx* rctx,
+    rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
+    std::string& _user_id, std::string& _user_tenant, std::string& _req_id,
+    optional_yield y) override;
+    
       virtual RGWLC* get_rgwlc(void) override;
       virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
-
       virtual int log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info) override;
       virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
       virtual int register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type,
index 128fb984351dc21ecec2b45cbc2fe4517cba609a..85cf83b8f05be9a22d84c1948d5aa654e4aa74c2 100644 (file)
@@ -1141,12 +1141,15 @@ std::unique_ptr<Completions> RadosStore::get_completions(void)
   return std::make_unique<RadosCompletions>();
 }
 
-std::unique_ptr<Notification> RadosStore::get_notification(rgw::sal::Object* obj,
-                                                           struct req_state* s,
-                                                           rgw::notify::EventType event_type,
-                                                            const std::string* object_name)
+std::unique_ptr<Notification> RadosStore::get_notification(
+  rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name)
 {
-  return std::make_unique<RadosNotification>(s, this, obj, s, event_type, object_name);
+  return std::make_unique<RadosNotification>(s, this, obj, src_obj, s, event_type, object_name);
+}
+
+std::unique_ptr<Notification> RadosStore::get_notification(const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y)
+{
+  return std::make_unique<RadosNotification>(dpp, this, obj, src_obj, rctx, event_type, _bucket, _user_id, _user_tenant, _req_id, y);
 }
 
 int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj)
index 2b4dfb5169ce2629d955bb1b2678f6ae23a180f5..34ed1aeecf4ee6ea867774d66981c70294f67aa0 100644 (file)
@@ -396,7 +396,12 @@ class RadosStore : public Store {
     virtual int cluster_stat(RGWClusterStat& stats) override;
     virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
     virtual std::unique_ptr<Completions> get_completions(void) override;
-    virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
+
+    // op variant
+    virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
+
+    // non-op variant (e.g., rgwlc)
+    virtual std::unique_ptr<Notification> get_notification(const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) override;
     virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
     virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
 
@@ -608,14 +613,26 @@ public:
 
 class RadosNotification : public Notification {
   RadosStore* store;
+  /* XXX it feels incorrect to me that rgw::notify::reservation_t is
+   * currently RADOS-specific; instead, I think notification types such as
+   * reservation_t should be generally visible, whereas the internal
+   * notification behavior should be made portable (e.g., notification
+   * to non-RADOS message sinks) */
   rgw::notify::reservation_t res;
 
   public:
-    RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s, 
-        rgw::notify::EventType _type, const std::string* object_name=nullptr) :
-      Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj, object_name) { }
+    RadosNotification(const DoutPrefixProvider* _dpp, RadosStore* _store, Object* _obj, Object* _src_obj, req_state* _s, rgw::notify::EventType _type, const std::string* object_name=nullptr) :
+      Notification(_obj, _src_obj, _type), store(_store), res(_dpp, _store, _s, _obj, _src_obj, object_name) { }
+
+    RadosNotification(const DoutPrefixProvider* _dpp, RadosStore* _store, Object* _obj, Object* _src_obj, RGWObjectCtx* rctx, rgw::notify::EventType _type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) :
+      Notification(_obj, _src_obj, _type), store(_store), res(_dpp, _store, rctx, _obj, _src_obj, _bucket, _user_id, _user_tenant, _req_id, y) {}
+
     ~RadosNotification() = default;
 
+    rgw::notify::reservation_t& get_reservation(void) {
+      return res;
+    }
+
     virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override;
     virtual int publish_commit(const DoutPrefixProvider* dpp, uint64_t size,
                               const ceph::real_time& mtime, const std::string& etag, const std::string& version) override;
index 1067465f1228eb641c417adfa7010ab8d68e51ea..b663cb15e2cb445f195649830648f5d7a159bde0 100644 (file)
@@ -359,7 +359,8 @@ private:
     std::string events_str = s->info.args.get("events", &exists);
     if (!exists) {
       // if no events are provided, we notify on all of them
-      events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
+      events_str =
+       "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE,OBJECT_EXPIRATION";
     }
     rgw::notify::from_string_list(events_str, events);
     if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {