From 35a4eb4410394a0014648dda7df92642f3b536d3 Mon Sep 17 00:00:00 2001 From: Matt Benjamin Date: Tue, 26 Jan 2021 12:52:47 -0500 Subject: [PATCH] rgwlc: optionally support notifications on object expiration Most of the work is to remove direct knowledge of req_state from methods in rgw_notify. I've chosen to create new notification types matching the different expire actions (but not transition). The new event types are not nested under Delete. Notifications are sent iff rgw_lc_notify is true (default false). Adjusted per comments in initial review, in particular, notification from lifecycle is no longer conditional on a config setting, and constness is restored. Fixes: https://tracker.ceph.com/issues/49068 Signed-off-by: Matt Benjamin --- src/rgw/rgw_lc.cc | 71 +++++-- src/rgw/rgw_notify.cc | 246 ++++++++++++++++--------- src/rgw/rgw_notify.h | 53 ++++-- src/rgw/rgw_notify_event_type.cc | 72 ++++---- src/rgw/rgw_notify_event_type.h | 6 +- src/rgw/rgw_op.cc | 39 ++-- src/rgw/rgw_sal.h | 20 +- src/rgw/rgw_sal_dbstore.cc | 18 +- src/rgw/rgw_sal_dbstore.h | 21 ++- src/rgw/rgw_sal_rados.cc | 13 +- src/rgw/rgw_sal_rados.h | 25 ++- src/rgw/rgw_sync_module_pubsub_rest.cc | 3 +- 12 files changed, 396 insertions(+), 191 deletions(-) diff --git a/src/rgw/rgw_lc.cc b/src/rgw/rgw_lc.cc index c930863d6a08e..2f8d381ddf50e 100644 --- a/src/rgw/rgw_lc.cc +++ b/src/rgw/rgw_lc.cc @@ -26,9 +26,10 @@ #include "rgw_zone.h" #include "rgw_string.h" #include "rgw_multi.h" -#include "rgw_sal.h" +#include "rgw_sal_rados.h" #include "rgw_rados.h" #include "rgw_lc_tier.h" +#include "rgw_notify.h" // this seems safe to use, at least for now--arguably, we should // prefer header-only fmt, in general @@ -565,7 +566,13 @@ struct lc_op_ctx { }; /* lc_op_ctx */ -static int remove_expired_obj(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed) + +static std::string lc_id = "rgw lifecycle"; +static std::string lc_req_id = "0"; + +static int remove_expired_obj( + const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed, + rgw::notify::EventType event_type) { auto& store = oc.store; auto& bucket_info = oc.bucket->get_info(); @@ -590,16 +597,48 @@ static int remove_expired_obj(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool } obj = bucket->get_object(obj_key); - std::unique_ptr del_op = obj->get_delete_op(&oc.rctx); - - del_op->params.versioning_status = obj->get_bucket()->get_info().versioning_status(); + std::unique_ptr del_op + = obj->get_delete_op(&oc.rctx); + del_op->params.versioning_status + = obj->get_bucket()->get_info().versioning_status(); del_op->params.obj_owner.set_id(rgw_user {meta.owner}); del_op->params.obj_owner.set_name(meta.owner_display_name); del_op->params.bucket_owner.set_id(bucket_info.owner); del_op->params.unmod_since = meta.mtime; del_op->params.marker_version_id = version_id; - return del_op->delete_obj(dpp, null_yield); + std::unique_ptr notify + = store->get_notification(dpp, obj.get(), nullptr, &oc.rctx, event_type, + bucket.get(), lc_id, + const_cast(oc.bucket->get_tenant()), + lc_req_id, null_yield); + + /* can eliminate cast when reservation is lifted into Notification */ + auto notify_res = static_cast(notify.get())->get_reservation(); + + ret = rgw::notify::publish_reserve(dpp, event_type, notify_res, nullptr); + if (ret < 0) { + ldpp_dout(dpp, 1) + << "ERROR: notify reservation failed, deferring delete of object k=" + << o.key + << dendl; + return ret; + } + + ret = del_op->delete_obj(dpp, null_yield); + if (ret < 0) { + ldpp_dout(dpp, 1) << + "ERROR: publishing notification failed, with error: " << ret << dendl; + } else { + // send request to notification manager + (void) rgw::notify::publish_commit( + obj.get(), obj->get_obj_size(), ceph::real_clock::now(), + obj->get_attrs()[RGW_ATTR_ETAG].to_str(), version_id, event_type, + notify_res, dpp); + } + + return ret; + } /* remove_expired_obj */ class LCOpAction { @@ -1077,7 +1116,8 @@ public: auto& o = oc.o; int r; if (o.is_delete_marker()) { - r = remove_expired_obj(oc.dpp, oc, true); + r = remove_expired_obj(oc.dpp, oc, true, + rgw::notify::ObjectDeleteMarkerExpiration); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: current is-dm remove_expired_obj " << oc.bucket << ":" << o.key @@ -1090,7 +1130,8 @@ public: << " " << oc.wq->thr_name() << dendl; } else { /* ! o.is_delete_marker() */ - r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned()); + r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned(), + rgw::notify::ObjectExpiration); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj " << oc.bucket << ":" << o.key @@ -1137,7 +1178,8 @@ public: int process(lc_op_ctx& oc) { auto& o = oc.o; - int r = remove_expired_obj(oc.dpp, oc, true); + int r = remove_expired_obj(oc.dpp, oc, true, + rgw::notify::ObjectNoncurrentExpiration); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (non-current expiration) " << oc.bucket << ":" << o.key @@ -1181,7 +1223,8 @@ public: int process(lc_op_ctx& oc) { auto& o = oc.o; - int r = remove_expired_obj(oc.dpp, oc, true); + int r = remove_expired_obj(oc.dpp, oc, true, + rgw::notify::ObjectDeleteMarkerExpiration); if (r < 0) { ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (delete marker expiration) " << oc.bucket << ":" << o.key @@ -1279,11 +1322,11 @@ public: /* If bucket is versioned, create delete_marker for current version */ if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) { - ret = remove_expired_obj(oc.dpp, oc, false); - ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; + ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration); + ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; } else { - ret = remove_expired_obj(oc.dpp, oc, true); - ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; + ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration); + ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl; } return ret; } diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index cd1bdddb1f7c8..0875987aa607f 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -616,22 +616,25 @@ int remove_persistent_topic(const std::string& topic_name, optional_yield y) { return s_manager->remove_persistent_topic(topic_name, y); } -rgw::sal::Object* get_object_with_atttributes(const req_state* s, rgw::sal::Object* obj) { +rgw::sal::Object* get_object_with_atttributes( + const reservation_t& res, rgw::sal::Object* obj) { // in case of copy obj, the tags and metadata are taken from source - const auto src_obj = s->src_object ? s->src_object.get() : obj; + const auto src_obj = res.src_object ? res.src_object : obj; if (src_obj->get_attrs().empty()) { if (!src_obj->get_bucket()) { - src_obj->set_bucket(s->bucket.get()); + src_obj->set_bucket(res.bucket); } - if (src_obj->get_obj_attrs(s->obj_ctx, s->yield, s) < 0) { + if (src_obj->get_obj_attrs(res.obj_ctx, res.yield, res.dpp) < 0) { return nullptr; } } return src_obj; } -void metadata_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyValueMap& metadata) { - const auto src_obj = get_object_with_atttributes(s, obj); +static inline void metadata_from_attributes( + reservation_t& res, rgw::sal::Object* obj) { + auto& metadata = res.x_meta_map; + const auto src_obj = get_object_with_atttributes(res, obj); if (!src_obj) { return; } @@ -646,8 +649,9 @@ void metadata_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyValu } } -void tags_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyMultiValueMap& tags) { - const auto src_obj = get_object_with_atttributes(s, obj); +static inline void tags_from_attributes( + const reservation_t& res, rgw::sal::Object* obj, KeyMultiValueMap& tags) { + const auto src_obj = get_object_with_atttributes(res, obj); if (!src_obj) { return; } @@ -667,7 +671,7 @@ void tags_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyMultiVal } // populate event from request -void populate_event_from_request(const reservation_t& res, +static inline void populate_event(reservation_t& res, rgw::sal::Object* obj, uint64_t size, const ceph::real_time& mtime, @@ -675,16 +679,15 @@ void populate_event_from_request(const reservation_t& res, const std::string& version, EventType event_type, rgw_pubsub_s3_event& event) { - const auto s = res.s; event.eventTime = mtime; - event.eventName = to_event_string(event_type); - event.userIdentity = s->user->get_id().id; // user that triggered the change - event.x_amz_request_id = s->req_id; // request ID of the original change - event.x_amz_id_2 = s->host_id; // RGW on which the change was made + event.eventName = to_string(event_type); + event.userIdentity = res.user_id; // user that triggered the change + event.x_amz_request_id = res.req_id; // request ID of the original change + event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made // configurationId is filled from notification configuration - event.bucket_name = s->bucket_name; - event.bucket_ownerIdentity = s->bucket_owner.get_id().id; - event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key())); + event.bucket_name = res.bucket->get_name(); + event.bucket_ownerIdentity = res.bucket->get_owner()->get_id().id; + event.bucket_arn = to_string(rgw::ARN(res.bucket->get_key())); event.object_key = res.object_name ? *res.object_name : obj->get_name(); event.object_size = size; event.object_etag = etag; @@ -695,27 +698,30 @@ void populate_event_from_request(const reservation_t& res, boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), std::back_inserter(event.object_sequencer)); set_event_id(event.id, etag, ts); - event.bucket_id = s->bucket->get_bucket_id(); - // pass metadata - if (res.cached_metadata.empty()) { + event.bucket_id = res.bucket->get_bucket_id(); + // pass meta data + if (res.x_meta_map.empty()) { // no metadata cached: // either no metadata exist or no metadata filter was used - event.x_meta_map = s->info.x_meta_map; - metadata_from_attributes(s, obj, event.x_meta_map); + metadata_from_attributes(res, obj); } else { - event.x_meta_map = std::move(res.cached_metadata); + event.x_meta_map = res.x_meta_map; } // pass tags - if (s->tagset.get_tags().empty()) { + if (!res.tagset || + (*res.tagset).get_tags().empty()) { // try to fetch the tags from the attributes - tags_from_attributes(s, obj, event.tags); + tags_from_attributes(res, obj, event.tags); } else { - event.tags = s->tagset.get_tags(); + event.tags = (*res.tagset).get_tags(); } // opaque data will be filled from topic configuration } -bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) { +static inline bool notification_match(reservation_t& res, + const rgw_pubsub_topic_filter& filter, + EventType event, + const RGWObjTags* req_tags) { if (!match(filter.events, event)) { return false; } @@ -725,12 +731,13 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte return false; } - const auto s = res.s; if (!filter.s3_filter.metadata_filter.kv.empty()) { // metadata filter exists - res.cached_metadata = s->info.x_meta_map; - metadata_from_attributes(s, obj, res.cached_metadata); - if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) { + if (res.s) { + res.x_meta_map = res.s->info.x_meta_map; + } + metadata_from_attributes(res, obj); + if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) { return false; } } @@ -742,15 +749,15 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) { return false; } - } else if (!s->tagset.get_tags().empty()) { + } else if (res.tagset && !(*res.tagset).get_tags().empty()) { // tags were cached in req_state - if (!match(filter.s3_filter.tag_filter, s->tagset.get_tags())) { + if (!match(filter.s3_filter.tag_filter, (*res.tagset).get_tags())) { return false; } } else { // try to fetch tags from the attributes KeyMultiValueMap tags; - tags_from_attributes(s, obj, tags); + tags_from_attributes(res, obj, tags); if (!match(filter.s3_filter.tag_filter, tags)) { return false; } @@ -760,12 +767,13 @@ bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filte return true; } -int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type, - reservation_t& res, - const RGWObjTags* req_tags) + int publish_reserve(const DoutPrefixProvider* dpp, + EventType event_type, + reservation_t& res, + const RGWObjTags* req_tags) { - RGWPubSub ps(res.store, res.s->user->get_id().tenant); - RGWPubSub::Bucket ps_bucket(&ps, res.s->bucket->get_key()); + RGWPubSub ps(res.store, res.user_tenant); + RGWPubSub::Bucket ps_bucket(&ps, res.bucket->get_key()); rgw_pubsub_bucket_topics bucket_topics; auto rc = ps_bucket.get_topics(&bucket_topics); if (rc < 0) { @@ -779,9 +787,9 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type, // notification does not apply to req_state continue; } - ldpp_dout(dpp, 20) << "INFO: notification: '" << topic_filter.s3_id << + ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id << "' on topic: '" << topic_cfg.dest.arn_topic << - "' and bucket: '" << res.s->bucket->get_name() << + "' and bucket: '" << res.bucket->get_name() << "' (unique topic: '" << topic_cfg.name << "') apply to event of type: '" << to_string(event_type) << "'" << dendl; @@ -795,17 +803,19 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type, int rval; const auto& queue_name = topic_cfg.dest.arn_topic; cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval); - auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(), - queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC); + auto ret = rgw_rados_operate( + res.dpp, res.store->getRados()->get_notif_pool_ctx(), + queue_name, &op, res.yield, librados::OPERATION_RETURNVEC); if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to reserve notification on queue: " << queue_name - << ". error: " << ret << dendl; + ldpp_dout(res.dpp, 1) << + "ERROR: failed to reserve notification on queue: " + << queue_name << ". error: " << ret << dendl; // if no space is left in queue we ask client to slow down return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret; } ret = cls_2pc_queue_reserve_result(obl, res_id); if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl; + ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl; return ret; } } @@ -815,86 +825,102 @@ int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type, } int publish_commit(rgw::sal::Object* obj, - uint64_t size, - const ceph::real_time& mtime, - const std::string& etag, - const std::string& version, - EventType event_type, - reservation_t& res, - const DoutPrefixProvider *dpp) + uint64_t size, + const ceph::real_time& mtime, + const std::string& etag, + const std::string& version, + EventType event_type, + reservation_t& res, + const DoutPrefixProvider* dpp) { for (auto& topic : res.topics) { - if (topic.cfg.dest.persistent && topic.res_id == cls_2pc_reservation::NO_ID) { + if (topic.cfg.dest.persistent && + topic.res_id == cls_2pc_reservation::NO_ID) { // nothing to commit or already committed/aborted continue; } event_entry_t event_entry; - populate_event_from_request(res, obj, size, mtime, etag, version, event_type, event_entry.event); + populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event); event_entry.event.configurationId = topic.configurationId; event_entry.event.opaque_data = topic.cfg.opaque_data; if (topic.cfg.dest.persistent) { event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint); - event_entry.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args); + event_entry.push_endpoint_args = + std::move(topic.cfg.dest.push_endpoint_args); event_entry.arn_topic = std::move(topic.cfg.dest.arn_topic); bufferlist bl; encode(event_entry, bl); const auto& queue_name = topic.cfg.dest.arn_topic; if (bl.length() > res.size) { // try to make a larger reservation, fail only if this is not possible - ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() << " exceeded reserved size: " << res.size << - " . trying to make a larger reservation on queue:" << queue_name << dendl; + ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() + << " exceeded reserved size: " << res.size + << + " . trying to make a larger reservation on queue:" << queue_name + << dendl; // first cancel the existing reservation librados::ObjectWriteOperation op; cls_2pc_queue_abort(op, topic.res_id); - auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(), - topic.cfg.dest.arn_topic, &op, - res.s->yield); + auto ret = rgw_rados_operate( + dpp, res.store->getRados()->get_notif_pool_ctx(), + topic.cfg.dest.arn_topic, &op, + res.yield); if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id << + ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " + << topic.res_id << " when trying to make a larger reservation on queue: " << queue_name - << ". error: " << ret << dendl; + << ". error: " << ret << dendl; return ret; } // now try to make a bigger one - bufferlist obl; + buffer::list obl; int rval; cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval); - ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(), - queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC); + ret = rgw_rados_operate( + dpp, res.store->getRados()->get_notif_pool_ctx(), + queue_name, &op, res.yield, librados::OPERATION_RETURNVEC); if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: " << queue_name - << ". error: " << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: " + << queue_name + << ". error: " << ret << dendl; return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret; } ret = cls_2pc_queue_reserve_result(obl, topic.res_id); if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for extra space. error: " << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for " + "extra space. error: " << ret << dendl; return ret; } } - std::vector bl_data_vec{std::move(bl)}; + std::vector bl_data_vec{std::move(bl)}; librados::ObjectWriteOperation op; cls_2pc_queue_commit(op, bl_data_vec, topic.res_id); - const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(), - queue_name, &op, - res.s->yield); + const auto ret = rgw_rados_operate( + dpp, res.store->getRados()->get_notif_pool_ctx(), + queue_name, &op, res.yield); topic.res_id = cls_2pc_reservation::NO_ID; if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: " << queue_name - << ". error: " << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: " + << queue_name << ". error: " << ret + << dendl; return ret; } } else { try { // TODO add endpoint LRU cache - const auto push_endpoint = RGWPubSubEndpoint::create(topic.cfg.dest.push_endpoint, - topic.cfg.dest.arn_topic, - RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp), - res.s->cct); - ldpp_dout(dpp, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl; - const auto ret = push_endpoint->send_to_completion_async(res.s->cct, event_entry.event, res.s->yield); + const auto push_endpoint = RGWPubSubEndpoint::create( + topic.cfg.dest.push_endpoint, + topic.cfg.dest.arn_topic, + RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp), + dpp->get_cct()); + ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: " + << topic.cfg.dest.push_endpoint << dendl; + const auto ret = push_endpoint->send_to_completion_async( + dpp->get_cct(), event_entry.event, res.yield); if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl; + ldpp_dout(dpp, 1) << "ERROR: push to endpoint " + << topic.cfg.dest.push_endpoint + << " failed. error: " << ret << dendl; if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed); return ret; } @@ -910,20 +936,22 @@ int publish_commit(rgw::sal::Object* obj, return 0; } -int publish_abort(const DoutPrefixProvider *dpp, reservation_t& res) { +extern int publish_abort(reservation_t& res) { for (auto& topic : res.topics) { - if (!topic.cfg.dest.persistent || topic.res_id == cls_2pc_reservation::NO_ID) { + if (!topic.cfg.dest.persistent || + topic.res_id == cls_2pc_reservation::NO_ID) { // nothing to abort or already committed/aborted continue; } const auto& queue_name = topic.cfg.dest.arn_topic; librados::ObjectWriteOperation op; cls_2pc_queue_abort(op, topic.res_id); - const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(), - queue_name, &op, - res.s->yield); + const auto ret = rgw_rados_operate( + res.dpp, res.store->getRados()->get_notif_pool_ctx(), + queue_name, &op, res.yield); if (ret < 0) { - ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id << + ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: " + << topic.res_id << " from queue: " << queue_name << ". error: " << ret << dendl; return ret; } @@ -932,9 +960,45 @@ int publish_abort(const DoutPrefixProvider *dpp, reservation_t& res) { return 0; } -reservation_t::~reservation_t() { - publish_abort(dpp, *this); -} +reservation_t::reservation_t(const DoutPrefixProvider* _dpp, + rgw::sal::RadosStore* _store, + req_state* _s, + rgw::sal::Object* _object, + rgw::sal::Object* _src_object, + const std::string* _object_name) : + dpp(_s), store(_store), s(_s), size(0) /* XXX */, obj_ctx(_s->obj_ctx), + object(_object), src_object(_src_object), bucket(_s->bucket.get()), + object_name(_object_name), + tagset(_s->tagset), + x_meta_map(_s->info.x_meta_map), + user_id(_s->user->get_id().id), + user_tenant(_s->user->get_id().tenant), + req_id(_s->req_id), + yield(_s->yield) +{} + +reservation_t::reservation_t(const DoutPrefixProvider* _dpp, + rgw::sal::RadosStore* _store, + RGWObjectCtx* _obj_ctx, + rgw::sal::Object* _object, + rgw::sal::Object* _src_object, + rgw::sal::Bucket* _bucket, + std::string& _user_id, + std::string& _user_tenant, + std::string& _req_id, + optional_yield y) : + dpp(_dpp), store(_store), s(nullptr), size(0) /* XXX */, + obj_ctx(_obj_ctx), + object(_object), src_object(_src_object), bucket(_bucket), + object_name(nullptr), + user_id(_user_id), + user_tenant(_user_tenant), + req_id(_req_id), + yield(y) +{} +reservation_t::~reservation_t() { + publish_abort(*this); } +} // namespace rgw::notify diff --git a/src/rgw/rgw_notify.h b/src/rgw/rgw_notify.h index 5139f736644aa..6470e26783388 100644 --- a/src/rgw/rgw_notify.h +++ b/src/rgw/rgw_notify.h @@ -43,27 +43,52 @@ int remove_persistent_topic(const std::string& topic_name, optional_yield y); // then used to commit or abort the reservation struct reservation_t { struct topic_t { - topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg, cls_2pc_reservation::id_t _res_id) : - configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {} + topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg, + const cls_2pc_reservation::id_t _res_id) : + configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {} - const std::string configurationId; - const rgw_pubsub_topic cfg; + std::string configurationId; + rgw_pubsub_topic cfg; // res_id is reset after topic is committed/aborted cls_2pc_reservation::id_t res_id; }; - const DoutPrefixProvider *dpp; + const DoutPrefixProvider* dpp; std::vector topics; rgw::sal::RadosStore* const store; const req_state* const s; size_t size; + RGWObjectCtx* obj_ctx; rgw::sal::Object* const object; + rgw::sal::Object* const src_object; // may differ from object + rgw::sal::Bucket* const bucket; const std::string* const object_name; - KeyValueMap cached_metadata; - - reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s, - rgw::sal::Object* _object, const std::string* _object_name) : - dpp(_dpp), store(_store), s(_s), object(_object), object_name(_object_name) {} + boost::optional tagset; + meta_map_t x_meta_map; // metadata cached by value + std::string user_id; + std::string user_tenant; + std::string req_id; + optional_yield yield; + + /* ctor for rgw_op callers */ + reservation_t(const DoutPrefixProvider* _dpp, + rgw::sal::RadosStore* _store, + req_state* _s, + rgw::sal::Object* _object, + rgw::sal::Object* _src_object, + const std::string* _object_name); + + /* ctor for non-request caller (e.g., lifecycle) */ + reservation_t(const DoutPrefixProvider* _dpp, + rgw::sal::RadosStore* _store, + RGWObjectCtx* _obj_ctx, + rgw::sal::Object* _object, + rgw::sal::Object* _src_object, + rgw::sal::Bucket* _bucket, + std::string& _user_id, + std::string& _user_tenant, + std::string& _req_id, + optional_yield y); // dtor doing resource leak guarding // aborting the reservation if not already committed or aborted @@ -71,10 +96,10 @@ struct reservation_t { }; // create a reservation on the 2-phase-commit queue -int publish_reserve(const DoutPrefixProvider *dpp, - EventType event_type, - reservation_t& reservation, - const RGWObjTags* req_tags); + int publish_reserve(const DoutPrefixProvider *dpp, + EventType event_type, + reservation_t& reservation, + const RGWObjTags* req_tags); // commit the reservation to the queue int publish_commit(rgw::sal::Object* obj, diff --git a/src/rgw/rgw_notify_event_type.cc b/src/rgw/rgw_notify_event_type.cc index 4af9a32f7e3b3..fd1dc8538f09a 100644 --- a/src/rgw/rgw_notify_event_type.cc +++ b/src/rgw/rgw_notify_event_type.cc @@ -8,43 +8,53 @@ namespace rgw::notify { std::string to_string(EventType t) { switch (t) { - case ObjectCreated: - return "s3:ObjectCreated:*"; - case ObjectCreatedPut: - return "s3:ObjectCreated:Put"; - case ObjectCreatedPost: - return "s3:ObjectCreated:Post"; - case ObjectCreatedCopy: - return "s3:ObjectCreated:Copy"; - case ObjectCreatedCompleteMultipartUpload: - return "s3:ObjectCreated:CompleteMultipartUpload"; - case ObjectRemoved: - return "s3:ObjectRemoved:*"; - case ObjectRemovedDelete: - return "s3:ObjectRemoved:Delete"; - case ObjectRemovedDeleteMarkerCreated: - return "s3:ObjectRemoved:DeleteMarkerCreated"; - case UnknownEvent: - return "s3:UnknownEvet"; + case ObjectCreated: + return "s3:ObjectCreated:*"; + case ObjectCreatedPut: + return "s3:ObjectCreated:Put"; + case ObjectCreatedPost: + return "s3:ObjectCreated:Post"; + case ObjectCreatedCopy: + return "s3:ObjectCreated:Copy"; + case ObjectCreatedCompleteMultipartUpload: + return "s3:ObjectCreated:CompleteMultipartUpload"; + case ObjectRemoved: + return "s3:ObjectRemoved:*"; + case ObjectRemovedDelete: + return "s3:ObjectRemoved:Delete"; + case ObjectRemovedDeleteMarkerCreated: + return "s3:ObjectRemoved:DeleteMarkerCreated"; + case ObjectExpiration: + return "s3:ObjectLifecycle:Expiration"; + case ObjectNoncurrentExpiration: + return "s3:ObjectLifecycle:NoncurrentExpiration"; + case ObjectDeleteMarkerExpiration: + return "s3:ObjectLifecycle:DeleteMarkerExpiration"; + case UnknownEvent: + return "s3:UnknownEvent"; } return "s3:UnknownEvent"; } std::string to_ceph_string(EventType t) { switch (t) { - case ObjectCreated: - case ObjectCreatedPut: - case ObjectCreatedPost: - case ObjectCreatedCopy: - case ObjectCreatedCompleteMultipartUpload: - return "OBJECT_CREATE"; - case ObjectRemovedDelete: - return "OBJECT_DELETE"; - case ObjectRemovedDeleteMarkerCreated: - return "DELETE_MARKER_CREATE"; - case ObjectRemoved: - case UnknownEvent: - return "UNKNOWN_EVENT"; + case ObjectCreated: + case ObjectCreatedPut: + case ObjectCreatedPost: + case ObjectCreatedCopy: + case ObjectCreatedCompleteMultipartUpload: + return "OBJECT_CREATE"; + case ObjectRemovedDelete: + return "OBJECT_DELETE"; + case ObjectRemovedDeleteMarkerCreated: + return "DELETE_MARKER_CREATE"; + case ObjectExpiration: + case ObjectNoncurrentExpiration: + case ObjectDeleteMarkerExpiration: + return "OBJECT_EXPIRATION"; + case ObjectRemoved: + case UnknownEvent: + return "UNKNOWN_EVENT"; } return "UNKNOWN_EVENT"; } diff --git a/src/rgw/rgw_notify_event_type.h b/src/rgw/rgw_notify_event_type.h index f255bfd744eb2..89e3e2545500a 100644 --- a/src/rgw/rgw_notify_event_type.h +++ b/src/rgw/rgw_notify_event_type.h @@ -15,7 +15,11 @@ namespace rgw::notify { ObjectRemoved = 0xF0, ObjectRemovedDelete = 0x10, ObjectRemovedDeleteMarkerCreated = 0x20, - UnknownEvent = 0x100 + // lifecycle events (RGW extension) + ObjectExpiration = 0x40, + ObjectNoncurrentExpiration = 0x80, + ObjectDeleteMarkerExpiration = 0x100, + UnknownEvent = 0x200 }; using EventTypeList = std::vector; diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 5c03d205af641..fcda234ab1253 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -3904,8 +3904,10 @@ void RGWPutObj::execute(optional_yield y) } // make reservation for notification if needed - std::unique_ptr res = store->get_notification(s->object.get(), - s, rgw::notify::ObjectCreatedPut); + std::unique_ptr res + = store->get_notification( + s->object.get(), s->src_object.get(), s, + rgw::notify::ObjectCreatedPut); if(!multipart) { op_ret = res->publish_reserve(this, obj_tags.get()); if (op_ret < 0) { @@ -4304,7 +4306,8 @@ void RGWPostObj::execute(optional_yield y) } // make reservation for notification if needed - std::unique_ptr res = store->get_notification(s->object.get(), s, rgw::notify::ObjectCreatedPost); + std::unique_ptr res + = store->get_notification(s->object.get(), s->src_object.get(), s, rgw::notify::ObjectCreatedPost); op_ret = res->publish_reserve(this); if (op_ret < 0) { return; @@ -4985,10 +4988,13 @@ void RGWDeleteObj::execute(optional_yield y) // make reservation for notification if needed const auto versioned_object = s->bucket->versioning_enabled(); - const auto event_type = versioned_object && s->object->get_instance().empty() ? - rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete; - std::unique_ptr res = store->get_notification(s->object.get(), - s, event_type); + const auto event_type = versioned_object && + s->object->get_instance().empty() ? + rgw::notify::ObjectRemovedDeleteMarkerCreated : + rgw::notify::ObjectRemovedDelete; + std::unique_ptr res + = store->get_notification(s->object.get(), s->src_object.get(), s, + event_type); op_ret = res->publish_reserve(this); if (op_ret < 0) { return; @@ -5388,8 +5394,10 @@ void RGWCopyObj::execute(optional_yield y) return; // make reservation for notification if needed - std::unique_ptr res = store->get_notification(s->object.get(), - s, rgw::notify::ObjectCreatedCopy); + std::unique_ptr res + = store->get_notification( + s->object.get(), s->src_object.get(), + s, rgw::notify::ObjectCreatedCopy); op_ret = res->publish_reserve(this); if (op_ret < 0) { return; @@ -6349,8 +6357,8 @@ void RGWCompleteMultipart::execute(optional_yield y) // make reservation for notification if needed - std::unique_ptr res = store->get_notification(meta_obj.get(), - s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name()); + std::unique_ptr res + = store->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name()); op_ret = res->publish_reserve(this); if (op_ret < 0) { return; @@ -6915,10 +6923,11 @@ void RGWDeleteMultiObj::execute(optional_yield y) // make reservation for notification if needed const auto versioned_object = s->bucket->versioning_enabled(); - const auto event_type = versioned_object && obj->get_instance().empty() ? - rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete; - std::unique_ptr res = store->get_notification(obj.get(), - s, event_type); + const auto event_type = versioned_object && obj->get_instance().empty() ? + rgw::notify::ObjectRemovedDeleteMarkerCreated : + rgw::notify::ObjectRemovedDelete; + std::unique_ptr res + = store->get_notification(obj.get(), s->src_object.get(), s, event_type); op_ret = res->publish_reserve(this); if (op_ret < 0) { send_partial_response(*iter, false, "", op_ret); diff --git a/src/rgw/rgw_sal.h b/src/rgw/rgw_sal.h index 5bb3aff8e4c3f..6fd4c9c0f0bb6 100644 --- a/src/rgw/rgw_sal.h +++ b/src/rgw/rgw_sal.h @@ -284,10 +284,18 @@ class Store { virtual std::unique_ptr get_lifecycle(void) = 0; /** Get a @a Completions object. Used for Async I/O tracking */ virtual std::unique_ptr get_completions(void) = 0; - /** Get a @a Notification object. Used to communicate with non-RGW daemons, such as - * management/tracking software */ - virtual std::unique_ptr get_notification(rgw::sal::Object* obj, struct req_state* s, + + /** Get a @a Notification object. Used to communicate with non-RGW daemons, such as + * management/tracking software */ + /** RGWOp variant */ + virtual std::unique_ptr get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0; + /** No-req_state variant (e.g., rgwlc) */ + virtual std::unique_ptr get_notification( + const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx, + rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, + std::string& _req_id, optional_yield y) = 0; + /** Get access to the lifecycle management thread */ virtual RGWLC* get_rgwlc(void) = 0; /** Get access to the coroutine registry. Used to create new coroutine managers */ @@ -1319,10 +1327,14 @@ public: class Notification { protected: Object* obj; + Object* src_obj; rgw::notify::EventType event_type; public: - Notification(Object* _obj, rgw::notify::EventType _type) : obj(_obj), event_type(_type) {} + Notification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type) + : obj(_obj), src_obj(_src_obj), event_type(_type) + {} + virtual ~Notification() = default; /** Indicate the start of the event associated with this notification */ diff --git a/src/rgw/rgw_sal_dbstore.cc b/src/rgw/rgw_sal_dbstore.cc index 537f96694ebd4..a81fc94685b18 100644 --- a/src/rgw/rgw_sal_dbstore.cc +++ b/src/rgw/rgw_sal_dbstore.cc @@ -1716,11 +1716,21 @@ namespace rgw::sal { return new LCDBSerializer(store, oid, lock_name, cookie); } - std::unique_ptr DBStore::get_notification(rgw::sal::Object* obj, - struct req_state* s, - rgw::notify::EventType event_type, const std::string* object_name) + std::unique_ptr DBStore::get_notification( + rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, + rgw::notify::EventType event_type, const std::string* object_name) { - return std::make_unique(obj, event_type); + return std::make_unique(obj, src_obj, event_type); + } + + std::unique_ptr DBStore::get_notification( + const DoutPrefixProvider* dpp, rgw::sal::Object* obj, + rgw::sal::Object* src_obj, RGWObjectCtx* rctx, + rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, + std::string& _user_id, std::string& _user_tenant, std::string& _req_id, + optional_yield y) + { + return std::make_unique(obj, src_obj, event_type); } RGWLC* DBStore::get_rgwlc(void) { diff --git a/src/rgw/rgw_sal_dbstore.h b/src/rgw/rgw_sal_dbstore.h index da77e68af9cfc..30da100d1084a 100644 --- a/src/rgw/rgw_sal_dbstore.h +++ b/src/rgw/rgw_sal_dbstore.h @@ -59,11 +59,9 @@ public: class DBNotification : public Notification { protected: - Object* obj; - rgw::notify::EventType event_type; - public: - DBNotification(Object* _obj, rgw::notify::EventType _type) : Notification(_obj, _type), obj(_obj), event_type(_type) {} + DBNotification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type) + : Notification(_obj, _src_obj, _type) {} ~DBNotification() = default; virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override { return 0;} @@ -715,11 +713,20 @@ public: virtual int cluster_stat(RGWClusterStat& stats) override; virtual std::unique_ptr get_lifecycle(void) override; virtual std::unique_ptr get_completions(void) override; - virtual std::unique_ptr get_notification(rgw::sal::Object* obj, struct req_state* s, - rgw::notify::EventType event_type, const std::string* object_name=nullptr) override; + + virtual std::unique_ptr get_notification( + rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, + rgw::notify::EventType event_type, const std::string* object_name) override; + + virtual std::unique_ptr get_notification( + const DoutPrefixProvider* dpp, rgw::sal::Object* obj, + rgw::sal::Object* src_obj, RGWObjectCtx* rctx, + rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, + std::string& _user_id, std::string& _user_tenant, std::string& _req_id, + optional_yield y) override; + virtual RGWLC* get_rgwlc(void) override; virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; } - virtual int log_usage(const DoutPrefixProvider *dpp, map& usage_info) override; virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override; virtual int register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type, diff --git a/src/rgw/rgw_sal_rados.cc b/src/rgw/rgw_sal_rados.cc index 128fb984351dc..85cf83b8f05be 100644 --- a/src/rgw/rgw_sal_rados.cc +++ b/src/rgw/rgw_sal_rados.cc @@ -1141,12 +1141,15 @@ std::unique_ptr RadosStore::get_completions(void) return std::make_unique(); } -std::unique_ptr RadosStore::get_notification(rgw::sal::Object* obj, - struct req_state* s, - rgw::notify::EventType event_type, - const std::string* object_name) +std::unique_ptr RadosStore::get_notification( + rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name) { - return std::make_unique(s, this, obj, s, event_type, object_name); + return std::make_unique(s, this, obj, src_obj, s, event_type, object_name); +} + +std::unique_ptr RadosStore::get_notification(const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) +{ + return std::make_unique(dpp, this, obj, src_obj, rctx, event_type, _bucket, _user_id, _user_tenant, _req_id, y); } int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj) diff --git a/src/rgw/rgw_sal_rados.h b/src/rgw/rgw_sal_rados.h index 2b4dfb5169ce2..34ed1aeecf4ee 100644 --- a/src/rgw/rgw_sal_rados.h +++ b/src/rgw/rgw_sal_rados.h @@ -396,7 +396,12 @@ class RadosStore : public Store { virtual int cluster_stat(RGWClusterStat& stats) override; virtual std::unique_ptr get_lifecycle(void) override; virtual std::unique_ptr get_completions(void) override; - virtual std::unique_ptr get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override; + + // op variant + virtual std::unique_ptr get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override; + + // non-op variant (e.g., rgwlc) + virtual std::unique_ptr get_notification(const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) override; virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); } virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); } @@ -608,14 +613,26 @@ public: class RadosNotification : public Notification { RadosStore* store; + /* XXX it feels incorrect to me that rgw::notify::reservation_t is + * currently RADOS-specific; instead, I think notification types such as + * reservation_t should be generally visible, whereas the internal + * notification behavior should be made portable (e.g., notification + * to non-RADOS message sinks) */ rgw::notify::reservation_t res; public: - RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s, - rgw::notify::EventType _type, const std::string* object_name=nullptr) : - Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj, object_name) { } + RadosNotification(const DoutPrefixProvider* _dpp, RadosStore* _store, Object* _obj, Object* _src_obj, req_state* _s, rgw::notify::EventType _type, const std::string* object_name=nullptr) : + Notification(_obj, _src_obj, _type), store(_store), res(_dpp, _store, _s, _obj, _src_obj, object_name) { } + + RadosNotification(const DoutPrefixProvider* _dpp, RadosStore* _store, Object* _obj, Object* _src_obj, RGWObjectCtx* rctx, rgw::notify::EventType _type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) : + Notification(_obj, _src_obj, _type), store(_store), res(_dpp, _store, rctx, _obj, _src_obj, _bucket, _user_id, _user_tenant, _req_id, y) {} + ~RadosNotification() = default; + rgw::notify::reservation_t& get_reservation(void) { + return res; + } + virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override; virtual int publish_commit(const DoutPrefixProvider* dpp, uint64_t size, const ceph::real_time& mtime, const std::string& etag, const std::string& version) override; diff --git a/src/rgw/rgw_sync_module_pubsub_rest.cc b/src/rgw/rgw_sync_module_pubsub_rest.cc index 1067465f1228e..b663cb15e2cb4 100644 --- a/src/rgw/rgw_sync_module_pubsub_rest.cc +++ b/src/rgw/rgw_sync_module_pubsub_rest.cc @@ -359,7 +359,8 @@ private: std::string events_str = s->info.args.get("events", &exists); if (!exists) { // if no events are provided, we notify on all of them - events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE"; + events_str = + "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE,OBJECT_EXPIRATION"; } rgw::notify::from_string_list(events_str, events); if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) { -- 2.39.5