#include "rgw_zone.h"
#include "rgw_string.h"
#include "rgw_multi.h"
-#include "rgw_sal.h"
+#include "rgw_sal_rados.h"
#include "rgw_rados.h"
#include "rgw_lc_tier.h"
+#include "rgw_notify.h"
// this seems safe to use, at least for now--arguably, we should
// prefer header-only fmt, in general
}; /* lc_op_ctx */
-static int remove_expired_obj(const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed)
+
+static std::string lc_id = "rgw lifecycle";
+static std::string lc_req_id = "0";
+
+static int remove_expired_obj(
+ const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed,
+ rgw::notify::EventType event_type)
{
auto& store = oc.store;
auto& bucket_info = oc.bucket->get_info();
}
obj = bucket->get_object(obj_key);
- std::unique_ptr<rgw::sal::Object::DeleteOp> del_op = obj->get_delete_op(&oc.rctx);
-
- del_op->params.versioning_status = obj->get_bucket()->get_info().versioning_status();
+ std::unique_ptr<rgw::sal::Object::DeleteOp> del_op
+ = obj->get_delete_op(&oc.rctx);
+ del_op->params.versioning_status
+ = obj->get_bucket()->get_info().versioning_status();
del_op->params.obj_owner.set_id(rgw_user {meta.owner});
del_op->params.obj_owner.set_name(meta.owner_display_name);
del_op->params.bucket_owner.set_id(bucket_info.owner);
del_op->params.unmod_since = meta.mtime;
del_op->params.marker_version_id = version_id;
- return del_op->delete_obj(dpp, null_yield);
+ std::unique_ptr<rgw::sal::Notification> notify
+ = store->get_notification(dpp, obj.get(), nullptr, &oc.rctx, event_type,
+ bucket.get(), lc_id,
+ const_cast<std::string&>(oc.bucket->get_tenant()),
+ lc_req_id, null_yield);
+
+ /* can eliminate cast when reservation is lifted into Notification */
+ auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
+
+ ret = rgw::notify::publish_reserve(dpp, event_type, notify_res, nullptr);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: notify reservation failed, deferring delete of object k="
+ << o.key
+ << dendl;
+ return ret;
+ }
+
+ ret = del_op->delete_obj(dpp, null_yield);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) <<
+ "ERROR: publishing notification failed, with error: " << ret << dendl;
+ } else {
+ // send request to notification manager
+ (void) rgw::notify::publish_commit(
+ obj.get(), obj->get_obj_size(), ceph::real_clock::now(),
+ obj->get_attrs()[RGW_ATTR_ETAG].to_str(), version_id, event_type,
+ notify_res, dpp);
+ }
+
+ return ret;
+
} /* remove_expired_obj */
class LCOpAction {
auto& o = oc.o;
int r;
if (o.is_delete_marker()) {
- r = remove_expired_obj(oc.dpp, oc, true);
+ r = remove_expired_obj(oc.dpp, oc, true,
+ rgw::notify::ObjectDeleteMarkerExpiration);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: current is-dm remove_expired_obj "
<< oc.bucket << ":" << o.key
<< " " << oc.wq->thr_name() << dendl;
} else {
/* ! o.is_delete_marker() */
- r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned());
+ r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned(),
+ rgw::notify::ObjectExpiration);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj "
<< oc.bucket << ":" << o.key
int process(lc_op_ctx& oc) {
auto& o = oc.o;
- int r = remove_expired_obj(oc.dpp, oc, true);
+ int r = remove_expired_obj(oc.dpp, oc, true,
+ rgw::notify::ObjectNoncurrentExpiration);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (non-current expiration) "
<< oc.bucket << ":" << o.key
int process(lc_op_ctx& oc) {
auto& o = oc.o;
- int r = remove_expired_obj(oc.dpp, oc, true);
+ int r = remove_expired_obj(oc.dpp, oc, true,
+ rgw::notify::ObjectDeleteMarkerExpiration);
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
<< oc.bucket << ":" << o.key
/* If bucket is versioned, create delete_marker for current version
*/
if (oc.bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
- ret = remove_expired_obj(oc.dpp, oc, false);
- ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
+ ret = remove_expired_obj(oc.dpp, oc, false, rgw::notify::ObjectExpiration);
+ ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") current & not delete_marker" << " versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
} else {
- ret = remove_expired_obj(oc.dpp, oc, true);
- ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
+ ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectExpiration);
+ ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key << ") not current " << "versioned_epoch: " << oc.o.versioned_epoch << "flags: " << oc.o.flags << dendl;
}
return ret;
}
return s_manager->remove_persistent_topic(topic_name, y);
}
-rgw::sal::Object* get_object_with_atttributes(const req_state* s, rgw::sal::Object* obj) {
+rgw::sal::Object* get_object_with_atttributes(
+ const reservation_t& res, rgw::sal::Object* obj) {
// in case of copy obj, the tags and metadata are taken from source
- const auto src_obj = s->src_object ? s->src_object.get() : obj;
+ const auto src_obj = res.src_object ? res.src_object : obj;
if (src_obj->get_attrs().empty()) {
if (!src_obj->get_bucket()) {
- src_obj->set_bucket(s->bucket.get());
+ src_obj->set_bucket(res.bucket);
}
- if (src_obj->get_obj_attrs(s->obj_ctx, s->yield, s) < 0) {
+ if (src_obj->get_obj_attrs(res.obj_ctx, res.yield, res.dpp) < 0) {
return nullptr;
}
}
return src_obj;
}
-void metadata_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyValueMap& metadata) {
- const auto src_obj = get_object_with_atttributes(s, obj);
+static inline void metadata_from_attributes(
+ reservation_t& res, rgw::sal::Object* obj) {
+ auto& metadata = res.x_meta_map;
+ const auto src_obj = get_object_with_atttributes(res, obj);
if (!src_obj) {
return;
}
}
}
-void tags_from_attributes(const req_state* s, rgw::sal::Object* obj, KeyMultiValueMap& tags) {
- const auto src_obj = get_object_with_atttributes(s, obj);
+static inline void tags_from_attributes(
+ const reservation_t& res, rgw::sal::Object* obj, KeyMultiValueMap& tags) {
+ const auto src_obj = get_object_with_atttributes(res, obj);
if (!src_obj) {
return;
}
}
// populate event from request
-void populate_event_from_request(const reservation_t& res,
+static inline void populate_event(reservation_t& res,
rgw::sal::Object* obj,
uint64_t size,
const ceph::real_time& mtime,
const std::string& version,
EventType event_type,
rgw_pubsub_s3_event& event) {
- const auto s = res.s;
event.eventTime = mtime;
- event.eventName = to_event_string(event_type);
- event.userIdentity = s->user->get_id().id; // user that triggered the change
- event.x_amz_request_id = s->req_id; // request ID of the original change
- event.x_amz_id_2 = s->host_id; // RGW on which the change was made
+ event.eventName = to_string(event_type);
+ event.userIdentity = res.user_id; // user that triggered the change
+ event.x_amz_request_id = res.req_id; // request ID of the original change
+ event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made
// configurationId is filled from notification configuration
- event.bucket_name = s->bucket_name;
- event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
- event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
+ event.bucket_name = res.bucket->get_name();
+ event.bucket_ownerIdentity = res.bucket->get_owner()->get_id().id;
+ event.bucket_arn = to_string(rgw::ARN(res.bucket->get_key()));
event.object_key = res.object_name ? *res.object_name : obj->get_name();
event.object_size = size;
event.object_etag = etag;
boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
std::back_inserter(event.object_sequencer));
set_event_id(event.id, etag, ts);
- event.bucket_id = s->bucket->get_bucket_id();
- // pass metadata
- if (res.cached_metadata.empty()) {
+ event.bucket_id = res.bucket->get_bucket_id();
+ // pass meta data
+ if (res.x_meta_map.empty()) {
// no metadata cached:
// either no metadata exist or no metadata filter was used
- event.x_meta_map = s->info.x_meta_map;
- metadata_from_attributes(s, obj, event.x_meta_map);
+ metadata_from_attributes(res, obj);
} else {
- event.x_meta_map = std::move(res.cached_metadata);
+ event.x_meta_map = res.x_meta_map;
}
// pass tags
- if (s->tagset.get_tags().empty()) {
+ if (!res.tagset ||
+ (*res.tagset).get_tags().empty()) {
// try to fetch the tags from the attributes
- tags_from_attributes(s, obj, event.tags);
+ tags_from_attributes(res, obj, event.tags);
} else {
- event.tags = s->tagset.get_tags();
+ event.tags = (*res.tagset).get_tags();
}
// opaque data will be filled from topic configuration
}
-bool notification_match(reservation_t& res, const rgw_pubsub_topic_filter& filter, EventType event, const RGWObjTags* req_tags) {
+static inline bool notification_match(reservation_t& res,
+ const rgw_pubsub_topic_filter& filter,
+ EventType event,
+ const RGWObjTags* req_tags) {
if (!match(filter.events, event)) {
return false;
}
return false;
}
- const auto s = res.s;
if (!filter.s3_filter.metadata_filter.kv.empty()) {
// metadata filter exists
- res.cached_metadata = s->info.x_meta_map;
- metadata_from_attributes(s, obj, res.cached_metadata);
- if (!match(filter.s3_filter.metadata_filter, res.cached_metadata)) {
+ if (res.s) {
+ res.x_meta_map = res.s->info.x_meta_map;
+ }
+ metadata_from_attributes(res, obj);
+ if (!match(filter.s3_filter.metadata_filter, res.x_meta_map)) {
return false;
}
}
if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
return false;
}
- } else if (!s->tagset.get_tags().empty()) {
+ } else if (res.tagset && !(*res.tagset).get_tags().empty()) {
// tags were cached in req_state
- if (!match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
+ if (!match(filter.s3_filter.tag_filter, (*res.tagset).get_tags())) {
return false;
}
} else {
// try to fetch tags from the attributes
KeyMultiValueMap tags;
- tags_from_attributes(s, obj, tags);
+ tags_from_attributes(res, obj, tags);
if (!match(filter.s3_filter.tag_filter, tags)) {
return false;
}
return true;
}
-int publish_reserve(const DoutPrefixProvider *dpp, EventType event_type,
- reservation_t& res,
- const RGWObjTags* req_tags)
+ int publish_reserve(const DoutPrefixProvider* dpp,
+ EventType event_type,
+ reservation_t& res,
+ const RGWObjTags* req_tags)
{
- RGWPubSub ps(res.store, res.s->user->get_id().tenant);
- RGWPubSub::Bucket ps_bucket(&ps, res.s->bucket->get_key());
+ RGWPubSub ps(res.store, res.user_tenant);
+ RGWPubSub::Bucket ps_bucket(&ps, res.bucket->get_key());
rgw_pubsub_bucket_topics bucket_topics;
auto rc = ps_bucket.get_topics(&bucket_topics);
if (rc < 0) {
// notification does not apply to req_state
continue;
}
- ldpp_dout(dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
+ ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
"' on topic: '" << topic_cfg.dest.arn_topic <<
- "' and bucket: '" << res.s->bucket->get_name() <<
+ "' and bucket: '" << res.bucket->get_name() <<
"' (unique topic: '" << topic_cfg.name <<
"') apply to event of type: '" << to_string(event_type) << "'" << dendl;
int rval;
const auto& queue_name = topic_cfg.dest.arn_topic;
cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
- auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
- queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
+ auto ret = rgw_rados_operate(
+ res.dpp, res.store->getRados()->get_notif_pool_ctx(),
+ queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to reserve notification on queue: " << queue_name
- << ". error: " << ret << dendl;
+ ldpp_dout(res.dpp, 1) <<
+ "ERROR: failed to reserve notification on queue: "
+ << queue_name << ". error: " << ret << dendl;
// if no space is left in queue we ask client to slow down
return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
}
ret = cls_2pc_queue_reserve_result(obl, res_id);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
+ ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
return ret;
}
}
}
int publish_commit(rgw::sal::Object* obj,
- uint64_t size,
- const ceph::real_time& mtime,
- const std::string& etag,
- const std::string& version,
- EventType event_type,
- reservation_t& res,
- const DoutPrefixProvider *dpp)
+ uint64_t size,
+ const ceph::real_time& mtime,
+ const std::string& etag,
+ const std::string& version,
+ EventType event_type,
+ reservation_t& res,
+ const DoutPrefixProvider* dpp)
{
for (auto& topic : res.topics) {
- if (topic.cfg.dest.persistent && topic.res_id == cls_2pc_reservation::NO_ID) {
+ if (topic.cfg.dest.persistent &&
+ topic.res_id == cls_2pc_reservation::NO_ID) {
// nothing to commit or already committed/aborted
continue;
}
event_entry_t event_entry;
- populate_event_from_request(res, obj, size, mtime, etag, version, event_type, event_entry.event);
+ populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event);
event_entry.event.configurationId = topic.configurationId;
event_entry.event.opaque_data = topic.cfg.opaque_data;
if (topic.cfg.dest.persistent) {
event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
- event_entry.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args);
+ event_entry.push_endpoint_args =
+ std::move(topic.cfg.dest.push_endpoint_args);
event_entry.arn_topic = std::move(topic.cfg.dest.arn_topic);
bufferlist bl;
encode(event_entry, bl);
const auto& queue_name = topic.cfg.dest.arn_topic;
if (bl.length() > res.size) {
// try to make a larger reservation, fail only if this is not possible
- ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length() << " exceeded reserved size: " << res.size <<
- " . trying to make a larger reservation on queue:" << queue_name << dendl;
+ ldpp_dout(dpp, 5) << "WARNING: committed size: " << bl.length()
+ << " exceeded reserved size: " << res.size
+ <<
+ " . trying to make a larger reservation on queue:" << queue_name
+ << dendl;
// first cancel the existing reservation
librados::ObjectWriteOperation op;
cls_2pc_queue_abort(op, topic.res_id);
- auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
- topic.cfg.dest.arn_topic, &op,
- res.s->yield);
+ auto ret = rgw_rados_operate(
+ dpp, res.store->getRados()->get_notif_pool_ctx(),
+ topic.cfg.dest.arn_topic, &op,
+ res.yield);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id <<
+ ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: "
+ << topic.res_id <<
" when trying to make a larger reservation on queue: " << queue_name
- << ". error: " << ret << dendl;
+ << ". error: " << ret << dendl;
return ret;
}
// now try to make a bigger one
- bufferlist obl;
+ buffer::list obl;
int rval;
cls_2pc_queue_reserve(op, bl.length(), 1, &obl, &rval);
- ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
- queue_name, &op, res.s->yield, librados::OPERATION_RETURNVEC);
+ ret = rgw_rados_operate(
+ dpp, res.store->getRados()->get_notif_pool_ctx(),
+ queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: " << queue_name
- << ". error: " << ret << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to reserve extra space on queue: "
+ << queue_name
+ << ". error: " << ret << dendl;
return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
}
ret = cls_2pc_queue_reserve_result(obl, topic.res_id);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for extra space. error: " << ret << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to parse reservation id for "
+ "extra space. error: " << ret << dendl;
return ret;
}
}
- std::vector<bufferlist> bl_data_vec{std::move(bl)};
+ std::vector<buffer::list> bl_data_vec{std::move(bl)};
librados::ObjectWriteOperation op;
cls_2pc_queue_commit(op, bl_data_vec, topic.res_id);
- const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
- queue_name, &op,
- res.s->yield);
+ const auto ret = rgw_rados_operate(
+ dpp, res.store->getRados()->get_notif_pool_ctx(),
+ queue_name, &op, res.yield);
topic.res_id = cls_2pc_reservation::NO_ID;
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: " << queue_name
- << ". error: " << ret << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: failed to commit reservation to queue: "
+ << queue_name << ". error: " << ret
+ << dendl;
return ret;
}
} else {
try {
// TODO add endpoint LRU cache
- const auto push_endpoint = RGWPubSubEndpoint::create(topic.cfg.dest.push_endpoint,
- topic.cfg.dest.arn_topic,
- RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
- res.s->cct);
- ldpp_dout(dpp, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl;
- const auto ret = push_endpoint->send_to_completion_async(res.s->cct, event_entry.event, res.s->yield);
+ const auto push_endpoint = RGWPubSubEndpoint::create(
+ topic.cfg.dest.push_endpoint,
+ topic.cfg.dest.arn_topic,
+ RGWHTTPArgs(topic.cfg.dest.push_endpoint_args, dpp),
+ dpp->get_cct());
+ ldpp_dout(res.dpp, 20) << "INFO: push endpoint created: "
+ << topic.cfg.dest.push_endpoint << dendl;
+ const auto ret = push_endpoint->send_to_completion_async(
+ dpp->get_cct(), event_entry.event, res.yield);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl;
+ ldpp_dout(dpp, 1) << "ERROR: push to endpoint "
+ << topic.cfg.dest.push_endpoint
+ << " failed. error: " << ret << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return ret;
}
return 0;
}
-int publish_abort(const DoutPrefixProvider *dpp, reservation_t& res) {
+extern int publish_abort(reservation_t& res) {
for (auto& topic : res.topics) {
- if (!topic.cfg.dest.persistent || topic.res_id == cls_2pc_reservation::NO_ID) {
+ if (!topic.cfg.dest.persistent ||
+ topic.res_id == cls_2pc_reservation::NO_ID) {
// nothing to abort or already committed/aborted
continue;
}
const auto& queue_name = topic.cfg.dest.arn_topic;
librados::ObjectWriteOperation op;
cls_2pc_queue_abort(op, topic.res_id);
- const auto ret = rgw_rados_operate(dpp, res.store->getRados()->get_notif_pool_ctx(),
- queue_name, &op,
- res.s->yield);
+ const auto ret = rgw_rados_operate(
+ res.dpp, res.store->getRados()->get_notif_pool_ctx(),
+ queue_name, &op, res.yield);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: failed to abort reservation: " << topic.res_id <<
+ ldpp_dout(res.dpp, 1) << "ERROR: failed to abort reservation: "
+ << topic.res_id <<
" from queue: " << queue_name << ". error: " << ret << dendl;
return ret;
}
return 0;
}
-reservation_t::~reservation_t() {
- publish_abort(dpp, *this);
-}
+reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
+ rgw::sal::RadosStore* _store,
+ req_state* _s,
+ rgw::sal::Object* _object,
+ rgw::sal::Object* _src_object,
+ const std::string* _object_name) :
+ dpp(_s), store(_store), s(_s), size(0) /* XXX */, obj_ctx(_s->obj_ctx),
+ object(_object), src_object(_src_object), bucket(_s->bucket.get()),
+ object_name(_object_name),
+ tagset(_s->tagset),
+ x_meta_map(_s->info.x_meta_map),
+ user_id(_s->user->get_id().id),
+ user_tenant(_s->user->get_id().tenant),
+ req_id(_s->req_id),
+ yield(_s->yield)
+{}
+
+reservation_t::reservation_t(const DoutPrefixProvider* _dpp,
+ rgw::sal::RadosStore* _store,
+ RGWObjectCtx* _obj_ctx,
+ rgw::sal::Object* _object,
+ rgw::sal::Object* _src_object,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) :
+ dpp(_dpp), store(_store), s(nullptr), size(0) /* XXX */,
+ obj_ctx(_obj_ctx),
+ object(_object), src_object(_src_object), bucket(_bucket),
+ object_name(nullptr),
+ user_id(_user_id),
+ user_tenant(_user_tenant),
+ req_id(_req_id),
+ yield(y)
+{}
+reservation_t::~reservation_t() {
+ publish_abort(*this);
}
+} // namespace rgw::notify
// then used to commit or abort the reservation
struct reservation_t {
struct topic_t {
- topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg, cls_2pc_reservation::id_t _res_id) :
- configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {}
+ topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg,
+ const cls_2pc_reservation::id_t _res_id) :
+ configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {}
- const std::string configurationId;
- const rgw_pubsub_topic cfg;
+ std::string configurationId;
+ rgw_pubsub_topic cfg;
// res_id is reset after topic is committed/aborted
cls_2pc_reservation::id_t res_id;
};
- const DoutPrefixProvider *dpp;
+ const DoutPrefixProvider* dpp;
std::vector<topic_t> topics;
rgw::sal::RadosStore* const store;
const req_state* const s;
size_t size;
+ RGWObjectCtx* obj_ctx;
rgw::sal::Object* const object;
+ rgw::sal::Object* const src_object; // may differ from object
+ rgw::sal::Bucket* const bucket;
const std::string* const object_name;
- KeyValueMap cached_metadata;
-
- reservation_t(const DoutPrefixProvider *_dpp, rgw::sal::RadosStore* _store, const req_state* _s,
- rgw::sal::Object* _object, const std::string* _object_name) :
- dpp(_dpp), store(_store), s(_s), object(_object), object_name(_object_name) {}
+ boost::optional<RGWObjTags&> tagset;
+ meta_map_t x_meta_map; // metadata cached by value
+ std::string user_id;
+ std::string user_tenant;
+ std::string req_id;
+ optional_yield yield;
+
+ /* ctor for rgw_op callers */
+ reservation_t(const DoutPrefixProvider* _dpp,
+ rgw::sal::RadosStore* _store,
+ req_state* _s,
+ rgw::sal::Object* _object,
+ rgw::sal::Object* _src_object,
+ const std::string* _object_name);
+
+ /* ctor for non-request caller (e.g., lifecycle) */
+ reservation_t(const DoutPrefixProvider* _dpp,
+ rgw::sal::RadosStore* _store,
+ RGWObjectCtx* _obj_ctx,
+ rgw::sal::Object* _object,
+ rgw::sal::Object* _src_object,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y);
// dtor doing resource leak guarding
// aborting the reservation if not already committed or aborted
};
// create a reservation on the 2-phase-commit queue
-int publish_reserve(const DoutPrefixProvider *dpp,
- EventType event_type,
- reservation_t& reservation,
- const RGWObjTags* req_tags);
+ int publish_reserve(const DoutPrefixProvider *dpp,
+ EventType event_type,
+ reservation_t& reservation,
+ const RGWObjTags* req_tags);
// commit the reservation to the queue
int publish_commit(rgw::sal::Object* obj,
std::string to_string(EventType t) {
switch (t) {
- case ObjectCreated:
- return "s3:ObjectCreated:*";
- case ObjectCreatedPut:
- return "s3:ObjectCreated:Put";
- case ObjectCreatedPost:
- return "s3:ObjectCreated:Post";
- case ObjectCreatedCopy:
- return "s3:ObjectCreated:Copy";
- case ObjectCreatedCompleteMultipartUpload:
- return "s3:ObjectCreated:CompleteMultipartUpload";
- case ObjectRemoved:
- return "s3:ObjectRemoved:*";
- case ObjectRemovedDelete:
- return "s3:ObjectRemoved:Delete";
- case ObjectRemovedDeleteMarkerCreated:
- return "s3:ObjectRemoved:DeleteMarkerCreated";
- case UnknownEvent:
- return "s3:UnknownEvet";
+ case ObjectCreated:
+ return "s3:ObjectCreated:*";
+ case ObjectCreatedPut:
+ return "s3:ObjectCreated:Put";
+ case ObjectCreatedPost:
+ return "s3:ObjectCreated:Post";
+ case ObjectCreatedCopy:
+ return "s3:ObjectCreated:Copy";
+ case ObjectCreatedCompleteMultipartUpload:
+ return "s3:ObjectCreated:CompleteMultipartUpload";
+ case ObjectRemoved:
+ return "s3:ObjectRemoved:*";
+ case ObjectRemovedDelete:
+ return "s3:ObjectRemoved:Delete";
+ case ObjectRemovedDeleteMarkerCreated:
+ return "s3:ObjectRemoved:DeleteMarkerCreated";
+ case ObjectExpiration:
+ return "s3:ObjectLifecycle:Expiration";
+ case ObjectNoncurrentExpiration:
+ return "s3:ObjectLifecycle:NoncurrentExpiration";
+ case ObjectDeleteMarkerExpiration:
+ return "s3:ObjectLifecycle:DeleteMarkerExpiration";
+ case UnknownEvent:
+ return "s3:UnknownEvent";
}
return "s3:UnknownEvent";
}
std::string to_ceph_string(EventType t) {
switch (t) {
- case ObjectCreated:
- case ObjectCreatedPut:
- case ObjectCreatedPost:
- case ObjectCreatedCopy:
- case ObjectCreatedCompleteMultipartUpload:
- return "OBJECT_CREATE";
- case ObjectRemovedDelete:
- return "OBJECT_DELETE";
- case ObjectRemovedDeleteMarkerCreated:
- return "DELETE_MARKER_CREATE";
- case ObjectRemoved:
- case UnknownEvent:
- return "UNKNOWN_EVENT";
+ case ObjectCreated:
+ case ObjectCreatedPut:
+ case ObjectCreatedPost:
+ case ObjectCreatedCopy:
+ case ObjectCreatedCompleteMultipartUpload:
+ return "OBJECT_CREATE";
+ case ObjectRemovedDelete:
+ return "OBJECT_DELETE";
+ case ObjectRemovedDeleteMarkerCreated:
+ return "DELETE_MARKER_CREATE";
+ case ObjectExpiration:
+ case ObjectNoncurrentExpiration:
+ case ObjectDeleteMarkerExpiration:
+ return "OBJECT_EXPIRATION";
+ case ObjectRemoved:
+ case UnknownEvent:
+ return "UNKNOWN_EVENT";
}
return "UNKNOWN_EVENT";
}
ObjectRemoved = 0xF0,
ObjectRemovedDelete = 0x10,
ObjectRemovedDeleteMarkerCreated = 0x20,
- UnknownEvent = 0x100
+ // lifecycle events (RGW extension)
+ ObjectExpiration = 0x40,
+ ObjectNoncurrentExpiration = 0x80,
+ ObjectDeleteMarkerExpiration = 0x100,
+ UnknownEvent = 0x200
};
using EventTypeList = std::vector<EventType>;
}
// make reservation for notification if needed
- std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
- s, rgw::notify::ObjectCreatedPut);
+ std::unique_ptr<rgw::sal::Notification> res
+ = store->get_notification(
+ s->object.get(), s->src_object.get(), s,
+ rgw::notify::ObjectCreatedPut);
if(!multipart) {
op_ret = res->publish_reserve(this, obj_tags.get());
if (op_ret < 0) {
}
// make reservation for notification if needed
- std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(), s, rgw::notify::ObjectCreatedPost);
+ std::unique_ptr<rgw::sal::Notification> res
+ = store->get_notification(s->object.get(), s->src_object.get(), s, rgw::notify::ObjectCreatedPost);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
// make reservation for notification if needed
const auto versioned_object = s->bucket->versioning_enabled();
- const auto event_type = versioned_object && s->object->get_instance().empty() ?
- rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete;
- std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
- s, event_type);
+ const auto event_type = versioned_object &&
+ s->object->get_instance().empty() ?
+ rgw::notify::ObjectRemovedDeleteMarkerCreated :
+ rgw::notify::ObjectRemovedDelete;
+ std::unique_ptr<rgw::sal::Notification> res
+ = store->get_notification(s->object.get(), s->src_object.get(), s,
+ event_type);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
return;
// make reservation for notification if needed
- std::unique_ptr<rgw::sal::Notification> res = store->get_notification(s->object.get(),
- s, rgw::notify::ObjectCreatedCopy);
+ std::unique_ptr<rgw::sal::Notification> res
+ = store->get_notification(
+ s->object.get(), s->src_object.get(),
+ s, rgw::notify::ObjectCreatedCopy);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
// make reservation for notification if needed
- std::unique_ptr<rgw::sal::Notification> res = store->get_notification(meta_obj.get(),
- s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name());
+ std::unique_ptr<rgw::sal::Notification> res
+ = store->get_notification(meta_obj.get(), nullptr, s, rgw::notify::ObjectCreatedCompleteMultipartUpload, &s->object->get_name());
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
return;
// make reservation for notification if needed
const auto versioned_object = s->bucket->versioning_enabled();
- const auto event_type = versioned_object && obj->get_instance().empty() ?
- rgw::notify::ObjectRemovedDeleteMarkerCreated : rgw::notify::ObjectRemovedDelete;
- std::unique_ptr<rgw::sal::Notification> res = store->get_notification(obj.get(),
- s, event_type);
+ const auto event_type = versioned_object && obj->get_instance().empty() ?
+ rgw::notify::ObjectRemovedDeleteMarkerCreated :
+ rgw::notify::ObjectRemovedDelete;
+ std::unique_ptr<rgw::sal::Notification> res
+ = store->get_notification(obj.get(), s->src_object.get(), s, event_type);
op_ret = res->publish_reserve(this);
if (op_ret < 0) {
send_partial_response(*iter, false, "", op_ret);
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) = 0;
/** Get a @a Completions object. Used for Async I/O tracking */
virtual std::unique_ptr<Completions> get_completions(void) = 0;
- /** Get a @a Notification object. Used to communicate with non-RGW daemons, such as
- * management/tracking software */
- virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s,
+
+ /** Get a @a Notification object. Used to communicate with non-RGW daemons, such as
+ * management/tracking software */
+ /** RGWOp variant */
+ virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
rgw::notify::EventType event_type, const std::string* object_name=nullptr) = 0;
+ /** No-req_state variant (e.g., rgwlc) */
+ virtual std::unique_ptr<Notification> get_notification(
+ const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx,
+ rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant,
+ std::string& _req_id, optional_yield y) = 0;
+
/** Get access to the lifecycle management thread */
virtual RGWLC* get_rgwlc(void) = 0;
/** Get access to the coroutine registry. Used to create new coroutine managers */
class Notification {
protected:
Object* obj;
+ Object* src_obj;
rgw::notify::EventType event_type;
public:
- Notification(Object* _obj, rgw::notify::EventType _type) : obj(_obj), event_type(_type) {}
+ Notification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type)
+ : obj(_obj), src_obj(_src_obj), event_type(_type)
+ {}
+
virtual ~Notification() = default;
/** Indicate the start of the event associated with this notification */
return new LCDBSerializer(store, oid, lock_name, cookie);
}
- std::unique_ptr<Notification> DBStore::get_notification(rgw::sal::Object* obj,
- struct req_state* s,
- rgw::notify::EventType event_type, const std::string* object_name)
+ std::unique_ptr<Notification> DBStore::get_notification(
+ rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
+ rgw::notify::EventType event_type, const std::string* object_name)
{
- return std::make_unique<DBNotification>(obj, event_type);
+ return std::make_unique<DBNotification>(obj, src_obj, event_type);
+ }
+
+ std::unique_ptr<Notification> DBStore::get_notification(
+ const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj, RGWObjectCtx* rctx,
+ rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
+ std::string& _user_id, std::string& _user_tenant, std::string& _req_id,
+ optional_yield y)
+ {
+ return std::make_unique<DBNotification>(obj, src_obj, event_type);
}
RGWLC* DBStore::get_rgwlc(void) {
class DBNotification : public Notification {
protected:
- Object* obj;
- rgw::notify::EventType event_type;
-
public:
- DBNotification(Object* _obj, rgw::notify::EventType _type) : Notification(_obj, _type), obj(_obj), event_type(_type) {}
+ DBNotification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type)
+ : Notification(_obj, _src_obj, _type) {}
~DBNotification() = default;
virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override { return 0;}
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
virtual std::unique_ptr<Completions> get_completions(void) override;
- virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s,
- rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
+
+ virtual std::unique_ptr<Notification> get_notification(
+ rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
+ rgw::notify::EventType event_type, const std::string* object_name) override;
+
+ virtual std::unique_ptr<Notification> get_notification(
+ const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj, RGWObjectCtx* rctx,
+ rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
+ std::string& _user_id, std::string& _user_tenant, std::string& _req_id,
+ optional_yield y) override;
+
virtual RGWLC* get_rgwlc(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
-
virtual int log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info) override;
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
virtual int register_to_service_map(const DoutPrefixProvider *dpp, const string& daemon_type,
return std::make_unique<RadosCompletions>();
}
-std::unique_ptr<Notification> RadosStore::get_notification(rgw::sal::Object* obj,
- struct req_state* s,
- rgw::notify::EventType event_type,
- const std::string* object_name)
+std::unique_ptr<Notification> RadosStore::get_notification(
+ rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name)
{
- return std::make_unique<RadosNotification>(s, this, obj, s, event_type, object_name);
+ return std::make_unique<RadosNotification>(s, this, obj, src_obj, s, event_type, object_name);
+}
+
+std::unique_ptr<Notification> RadosStore::get_notification(const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y)
+{
+ return std::make_unique<RadosNotification>(dpp, this, obj, src_obj, rctx, event_type, _bucket, _user_id, _user_tenant, _req_id, y);
}
int RadosStore::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj)
virtual int cluster_stat(RGWClusterStat& stats) override;
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
virtual std::unique_ptr<Completions> get_completions(void) override;
- virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
+
+ // op variant
+ virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s, rgw::notify::EventType event_type, const std::string* object_name=nullptr) override;
+
+ // non-op variant (e.g., rgwlc)
+ virtual std::unique_ptr<Notification> get_notification(const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, RGWObjectCtx* rctx, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) override;
virtual RGWLC* get_rgwlc(void) override { return rados->get_lc(); }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return rados->get_cr_registry(); }
class RadosNotification : public Notification {
RadosStore* store;
+ /* XXX it feels incorrect to me that rgw::notify::reservation_t is
+ * currently RADOS-specific; instead, I think notification types such as
+ * reservation_t should be generally visible, whereas the internal
+ * notification behavior should be made portable (e.g., notification
+ * to non-RADOS message sinks) */
rgw::notify::reservation_t res;
public:
- RadosNotification(const DoutPrefixProvider *_dpp, RadosStore* _store, Object* _obj, req_state* _s,
- rgw::notify::EventType _type, const std::string* object_name=nullptr) :
- Notification(_obj, _type), store(_store), res(_dpp, _store, _s, _obj, object_name) { }
+ RadosNotification(const DoutPrefixProvider* _dpp, RadosStore* _store, Object* _obj, Object* _src_obj, req_state* _s, rgw::notify::EventType _type, const std::string* object_name=nullptr) :
+ Notification(_obj, _src_obj, _type), store(_store), res(_dpp, _store, _s, _obj, _src_obj, object_name) { }
+
+ RadosNotification(const DoutPrefixProvider* _dpp, RadosStore* _store, Object* _obj, Object* _src_obj, RGWObjectCtx* rctx, rgw::notify::EventType _type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) :
+ Notification(_obj, _src_obj, _type), store(_store), res(_dpp, _store, rctx, _obj, _src_obj, _bucket, _user_id, _user_tenant, _req_id, y) {}
+
~RadosNotification() = default;
+ rgw::notify::reservation_t& get_reservation(void) {
+ return res;
+ }
+
virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override;
virtual int publish_commit(const DoutPrefixProvider* dpp, uint64_t size,
const ceph::real_time& mtime, const std::string& etag, const std::string& version) override;
std::string events_str = s->info.args.get("events", &exists);
if (!exists) {
// if no events are provided, we notify on all of them
- events_str = "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE";
+ events_str =
+ "OBJECT_CREATE,OBJECT_DELETE,DELETE_MARKER_CREATE,OBJECT_EXPIRATION";
}
rgw::notify::from_string_list(events_str, events);
if (std::find(events.begin(), events.end(), rgw::notify::UnknownEvent) != events.end()) {