+--------------------------------------------------------+-----------------------------------------+
| ``s3:ObjectLifecycle:Transition:NonCurrent`` | Ceph extension |
+--------------------------------------------------------+-----------------------------------------+
+| ``s3:LifecycleExpiration:*`` | Supported. Equivalent to: s3:LifecycleExpiration:Delete, s3:LifecycleExpiration:DeleteMarkerCreated|
++--------------------------------------------------------+-----------------------------------------+
+| ``s3:LifecycleExpiration:Delete`` | Supported. Equivalent to: s3:ObjectLifecycle:Expiration:Current.|
++--------------------------------------------------------+-----------------------------------------+
+| ``s3:LifecycleExpiration:DeleteMarkerCreated`` | Supported. Equivalent to: s3:ObjectLifecycle:Expiration:DeleteMarker.|
++--------------------------------------------------------+-----------------------------------------+
+| ``s3:LifecycleTransition`` | Supported. Equivalent to: s3:ObjectLifecycle:Transition:Current|
++--------------------------------------------------------+-----------------------------------------+
| ``s3:ObjectSynced:*`` | Ceph extension |
+--------------------------------------------------------+-----------------------------------------+
| ``s3:ObjectSynced:Create`` | Ceph Extension |
std::unique_ptr<Notification> DaosStore::get_notification(
rgw::sal::Object* obj, rgw::sal::Object* src_obj, struct req_state* s,
rgw::notify::EventType event_type, const std::string* object_name) {
- return std::make_unique<DaosNotification>(obj, src_obj, event_type);
+ rgw::notify::EventTypeList event_types = {event_type};
+ return std::make_unique<DaosNotification>(obj, src_obj, event_types);
}
std::unique_ptr<Notification> DaosStore::get_notification(
- const DoutPrefixProvider* dpp, Object* obj, Object* src_obj,
- rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
- std::string& _user_id, std::string& _user_tenant, std::string& _req_id,
+ const DoutPrefixProvider* dpp,
+ Object* obj,
+ Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
optional_yield y) {
ldpp_dout(dpp, 20) << "get_notification" << dendl;
- return std::make_unique<DaosNotification>(obj, src_obj, event_type);
+ return std::make_unique<DaosNotification>(obj, src_obj, event_types);
}
int DaosStore::log_usage(const DoutPrefixProvider* dpp,
class DaosNotification : public StoreNotification {
public:
- DaosNotification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type)
- : StoreNotification(_obj, _src_obj, _type) {}
+ DaosNotification(Object* _obj,
+ Object* _src_obj,
+ const rgw::notify::EventTypeList& _types)
+ : StoreNotification(_obj, _src_obj, _types) {}
~DaosNotification() = default;
virtual int publish_reserve(const DoutPrefixProvider* dpp,
rgw::notify::EventType event_type, optional_yield y,
const std::string* object_name = nullptr) override;
virtual std::unique_ptr<Notification> get_notification(
- const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
- rgw::sal::Object* src_obj, rgw::notify::EventType event_type,
- rgw::sal::Bucket* _bucket, std::string& _user_id,
- std::string& _user_tenant, std::string& _req_id,
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
optional_yield y) override;
virtual RGWLC* get_rgwlc(void) override { return NULL; }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override {
std::unique_ptr<Notification> MotrStore::get_notification(Object* obj, Object* src_obj, req_state* s,
rgw::notify::EventType event_type, optional_yield y, const string* object_name)
{
- return std::make_unique<MotrNotification>(obj, src_obj, event_type);
+ const rgw::notify::EventTypeList event_types = {event_type};
+ return std::make_unique<MotrNotification>(obj, src_obj, event_types);
}
-std::unique_ptr<Notification> MotrStore::get_notification(const DoutPrefixProvider* dpp, Object* obj,
- Object* src_obj, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
- std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y)
-{
- return std::make_unique<MotrNotification>(obj, src_obj, event_type);
+std::unique_ptr<Notification> MotrStore::get_notification(
+ const DoutPrefixProvider* dpp,
+ Object* obj,
+ Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) {
+ return std::make_unique<MotrNotification>(obj, src_obj, event_types);
}
int MotrStore::log_usage(const DoutPrefixProvider *dpp, map<rgw_user_bucket, RGWUsageBatch>& usage_info)
class MotrNotification : public StoreNotification {
public:
- MotrNotification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type) :
- StoreNotification(_obj, _src_obj, _type) {}
- ~MotrNotification() = default;
+ MotrNotification(Object* _obj,
+ Object* _src_obj,
+ const rgw::notify::EventTypeList& _types)
+ : StoreNotification(_obj, _src_obj, _types) {}
+ ~MotrNotification() = default;
- virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override { return 0;}
+ virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override { return 0;}
virtual int publish_commit(const DoutPrefixProvider* dpp, uint64_t size,
const ceph::real_time& mtime, const std::string& etag, const std::string& version) override { return 0; }
};
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj,
req_state* s, rgw::notify::EventType event_type, optional_yield y, const std::string* object_name=nullptr) override;
- virtual std::unique_ptr<Notification> get_notification(const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
- rgw::sal::Object* src_obj, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
- std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) override;
+ virtual std::unique_ptr<Notification> get_notification(
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) override;
virtual RGWLC* get_rgwlc(void) override { return NULL; }
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
return next->get_notification(obj, src_obj, s, event_type, y, object_name);
}
-std::unique_ptr<Notification> POSIXDriver::get_notification(const DoutPrefixProvider* dpp,
- rgw::sal::Object* obj, rgw::sal::Object* src_obj,
- rgw::notify::EventType event_type,
- rgw::sal::Bucket* _bucket,
- std::string& _user_id, std::string& _user_tenant,
- std::string& _req_id, optional_yield y)
-{
- return next->get_notification(dpp, obj, src_obj, event_type, _bucket, _user_id, _user_tenant, _req_id, y);
+std::unique_ptr<Notification> POSIXDriver::get_notification(
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) {
+ return next->get_notification(dpp, obj, src_obj, event_types, _bucket,
+ _user_id, _user_tenant, _req_id, y);
}
int POSIXDriver::close()
const std::string* object_name=nullptr) override;
virtual std::unique_ptr<Notification> get_notification(
- const DoutPrefixProvider* dpp,
- rgw::sal::Object* obj,
- rgw::sal::Object* src_obj,
- rgw::notify::EventType event_type,
- rgw::sal::Bucket* _bucket,
- std::string& _user_id,
- std::string& _user_tenant,
- std::string& _req_id,
- optional_yield y) override;
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_type,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) override;
/* Internal APIs */
int get_root_fd() { return root_fd; }
// send notification that object was successfully synced
std::string user_id = "rgw sync";
std::string req_id = "0";
-
+
RGWObjTags obj_tags;
auto iter = attrs.find(RGW_ATTR_TAGS);
if (iter != attrs.end()) {
try {
auto it = iter->second.cbegin();
obj_tags.decode(it);
- } catch (buffer::error &err) {
- ldpp_dout(dpp, 1) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+ } catch (buffer::error& err) {
+ ldpp_dout(dpp, 1)
+ << "ERROR: " << __func__
+ << ": caught buffer::error couldn't decode TagSet " << dendl;
}
}
- // NOTE: we create a mutable copy of bucket.get_tenant as the get_notification function expects a std::string&, not const
+ // NOTE: we create a mutable copy of bucket.get_tenant as the
+ // get_notification function expects a std::string&, not const
std::string tenant(dest_bucket.get_tenant());
- std::unique_ptr<rgw::sal::Notification> notify
- = store->get_notification(dpp, &dest_obj, nullptr, rgw::notify::ObjectSyncedCreate,
- &dest_bucket, user_id,
- tenant,
- req_id, null_yield);
-
- auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
- int ret = rgw::notify::publish_reserve(dpp, *store->svc()->site, rgw::notify::ObjectSyncedCreate, notify_res, &obj_tags);
+ std::unique_ptr<rgw::sal::Notification> notify =
+ store->get_notification(
+ dpp, &dest_obj, nullptr, {rgw::notify::ObjectSyncedCreate},
+ &dest_bucket, user_id, tenant, req_id, null_yield);
+
+ auto notify_res =
+ static_cast<rgw::sal::RadosNotification*>(notify.get())
+ ->get_reservation();
+ int ret = rgw::notify::publish_reserve(
+ dpp, *store->svc()->site, {rgw::notify::ObjectSyncedCreate},
+ notify_res, &obj_tags);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: reserving notification failed, with error: " << ret << dendl;
+ ldpp_dout(dpp, 1)
+ << "ERROR: reserving notification failed, with error: " << ret
+ << dendl;
// no need to return, the sync already happened
} else {
- ret = rgw::notify::publish_commit(&dest_obj, *bytes_transferred, ceph::real_clock::now(), etag, dest_obj.get_instance(), rgw::notify::ObjectSyncedCreate, notify_res, dpp);
+ ret = rgw::notify::publish_commit(
+ &dest_obj, *bytes_transferred, ceph::real_clock::now(), etag,
+ dest_obj.get_instance(), notify_res, dpp);
if (ret < 0) {
- ldpp_dout(dpp, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
+ ldpp_dout(dpp, 1)
+ << "ERROR: publishing notification failed, with error: " << ret
+ << dendl;
}
}
}
-
+
if (counters) {
if (bytes_transferred) {
counters->inc(sync_counters::l_fetch, *bytes_transferred);
return true;
}
- int publish_reserve(const DoutPrefixProvider* dpp,
- const SiteConfig& site,
- EventType event_type,
- reservation_t& res,
- const RGWObjTags* req_tags)
-{
+int publish_reserve(const DoutPrefixProvider* dpp,
+ const SiteConfig& site,
+ const EventTypeList& event_types,
+ reservation_t& res,
+ const RGWObjTags* req_tags) {
rgw_pubsub_bucket_topics bucket_topics;
if (all_zonegroups_support(site, zone_features::notification_v2) &&
res.store->stat_topics_v1(res.user_tenant, res.yield, res.dpp) == -ENOENT) {
for (const auto& bucket_topic : bucket_topics.topics) {
const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
- if (!notification_match(res, topic_filter, event_type, req_tags)) {
- // notification does not apply to req_state
- continue;
- }
- ldpp_dout(res.dpp, 20) << "INFO: notification: '" << topic_filter.s3_id <<
- "' on topic: '" << topic_cfg.dest.arn_topic <<
- "' and bucket: '" << res.bucket->get_name() <<
- "' (unique topic: '" << topic_cfg.name <<
- "') apply to event of type: '" << to_string(event_type) << "'" << dendl;
-
- cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
- if (topic_cfg.dest.persistent) {
- // TODO: take default reservation size from conf
- constexpr auto DEFAULT_RESERVATION = 4*1024U; // 4K
- res.size = DEFAULT_RESERVATION;
- librados::ObjectWriteOperation op;
- bufferlist obl;
- int rval;
- const auto& queue_name = topic_cfg.dest.arn_topic;
- cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
- auto ret = rgw_rados_operate(
- res.dpp, res.store->getRados()->get_notif_pool_ctx(),
- queue_name, &op, res.yield, librados::OPERATION_RETURNVEC);
- if (ret < 0) {
- ldpp_dout(res.dpp, 1) <<
- "ERROR: failed to reserve notification on queue: "
- << queue_name << ". error: " << ret << dendl;
- // if no space is left in queue we ask client to slow down
- return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
+ for (auto& event_type : event_types) {
+ if (!notification_match(res, topic_filter, event_type, req_tags)) {
+ // notification does not apply to req_state
+ continue;
}
- ret = cls_2pc_queue_reserve_result(obl, res_id);
- if (ret < 0) {
- ldpp_dout(res.dpp, 1) << "ERROR: failed to parse reservation id. error: " << ret << dendl;
- return ret;
+ ldpp_dout(res.dpp, 20)
+ << "INFO: notification: '" << topic_filter.s3_id << "' on topic: '"
+ << topic_cfg.dest.arn_topic << "' and bucket: '"
+ << res.bucket->get_name() << "' (unique topic: '" << topic_cfg.name
+ << "') apply to event of type: '" << to_string(event_type) << "'"
+ << dendl;
+
+ cls_2pc_reservation::id_t res_id = cls_2pc_reservation::NO_ID;
+ if (topic_cfg.dest.persistent) {
+ // TODO: take default reservation size from conf
+ constexpr auto DEFAULT_RESERVATION = 4 * 1024U; // 4K
+ res.size = DEFAULT_RESERVATION;
+ librados::ObjectWriteOperation op;
+ bufferlist obl;
+ int rval;
+ const auto& queue_name = topic_cfg.dest.arn_topic;
+ cls_2pc_queue_reserve(op, res.size, 1, &obl, &rval);
+ auto ret = rgw_rados_operate(
+ res.dpp, res.store->getRados()->get_notif_pool_ctx(), queue_name,
+ &op, res.yield, librados::OPERATION_RETURNVEC);
+ if (ret < 0) {
+ ldpp_dout(res.dpp, 1)
+ << "ERROR: failed to reserve notification on queue: "
+ << queue_name << ". error: " << ret << dendl;
+ // if no space is left in queue we ask client to slow down
+ return (ret == -ENOSPC) ? -ERR_RATE_LIMITED : ret;
+ }
+ ret = cls_2pc_queue_reserve_result(obl, res_id);
+ if (ret < 0) {
+ ldpp_dout(res.dpp, 1)
+ << "ERROR: failed to parse reservation id. error: " << ret
+ << dendl;
+ return ret;
+ }
}
- }
- // load the topic,if there is change in topic config while it's stored in
- // notification.
- rgw_pubsub_topic result;
- const RGWPubSub ps(res.store, res.user_tenant, site);
- auto ret = ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr);
- if (ret < 0) {
- ldpp_dout(res.dpp, 1)
- << "INFO: failed to load topic: " << topic_cfg.name
- << ". error: " << ret
- << " while reserving persistent notification event" << dendl;
- if (ret == -ENOENT) {
- // either the topic is deleted but the corresponding notification still
- // exist or in v2 mode the notification could have synced first but
- // topic is not synced yet.
- return 0;
+ // load the topic,if there is change in topic config while it's stored in
+ // notification.
+ rgw_pubsub_topic result;
+ const RGWPubSub ps(res.store, res.user_tenant, site);
+ auto ret =
+ ps.get_topic(res.dpp, topic_cfg.name, result, res.yield, nullptr);
+ if (ret < 0) {
+ ldpp_dout(res.dpp, 1)
+ << "INFO: failed to load topic: " << topic_cfg.name
+ << ". error: " << ret
+ << " while reserving persistent notification event" << dendl;
+ if (ret == -ENOENT) {
+ // either the topic is deleted but the corresponding notification
+ // still exist or in v2 mode the notification could have synced first
+ // but topic is not synced yet.
+ return 0;
+ }
+ ldpp_dout(res.dpp, 1)
+ << "WARN: Using the stored topic from bucket notification struct."
+ << dendl;
+ res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id,
+ event_type);
+ } else {
+ res.topics.emplace_back(topic_filter.s3_id, result, res_id, event_type);
}
- ldpp_dout(res.dpp, 1)
- << "WARN: Using the stored topic from bucket notification struct."
- << dendl;
- res.topics.emplace_back(topic_filter.s3_id, topic_cfg, res_id);
- } else {
- res.topics.emplace_back(topic_filter.s3_id, result, res_id);
}
}
return 0;
const ceph::real_time& mtime,
const std::string& etag,
const std::string& version,
- EventType event_type,
reservation_t& res,
const DoutPrefixProvider* dpp)
{
continue;
}
event_entry_t event_entry;
- populate_event(res, obj, size, mtime, etag, version, event_type, event_entry.event);
+ populate_event(res, obj, size, mtime, etag, version, topic.event_type,
+ event_entry.event);
event_entry.event.configurationId = topic.configurationId;
event_entry.event.opaque_data = topic.cfg.opaque_data;
if (topic.cfg.dest.persistent) {
struct reservation_t {
struct topic_t {
topic_t(const std::string& _configurationId, const rgw_pubsub_topic& _cfg,
- cls_2pc_reservation::id_t _res_id) :
- configurationId(_configurationId), cfg(_cfg), res_id(_res_id) {}
+ cls_2pc_reservation::id_t _res_id,
+ rgw::notify::EventType _event_type)
+ : configurationId(_configurationId),
+ cfg(_cfg),
+ res_id(_res_id),
+ event_type(_event_type) {}
const std::string configurationId;
const rgw_pubsub_topic cfg;
// res_id is reset after topic is committed/aborted
cls_2pc_reservation::id_t res_id;
+ rgw::notify::EventType event_type;
};
const DoutPrefixProvider* const dpp;
// create a reservation on the 2-phase-commit queue
int publish_reserve(const DoutPrefixProvider *dpp,
- const SiteConfig& site,
- EventType event_type,
- reservation_t& reservation,
- const RGWObjTags* req_tags);
+ const SiteConfig& site,
+ const EventTypeList& event_types,
+ reservation_t& reservation,
+ const RGWObjTags* req_tags);
// commit the reservation to the queue
int publish_commit(rgw::sal::Object* obj,
const ceph::real_time& mtime,
const std::string& etag,
const std::string& version,
- EventType event_type,
reservation_t& reservation,
const DoutPrefixProvider *dpp);
return std::make_unique<RadosNotification>(s, this, obj, src_obj, s, event_type, y, object_name);
}
-std::unique_ptr<Notification> RadosStore::get_notification(const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj, rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y)
-{
- return std::make_unique<RadosNotification>(dpp, this, obj, src_obj, event_type, _bucket, _user_id, _user_tenant, _req_id, y);
+std::unique_ptr<Notification> RadosStore::get_notification(
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) {
+ return std::make_unique<RadosNotification>(dpp, this, obj, src_obj,
+ event_types, _bucket, _user_id,
+ _user_tenant, _req_id, y);
}
std::string RadosStore::topics_oid(const std::string& tenant) const {
int RadosNotification::publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags)
{
- return rgw::notify::publish_reserve(dpp, *store->svc()->site, event_type, res, obj_tags);
+ return rgw::notify::publish_reserve(dpp, *store->svc()->site, event_types, res, obj_tags);
}
int RadosNotification::publish_commit(const DoutPrefixProvider* dpp, uint64_t size,
const ceph::real_time& mtime, const std::string& etag, const std::string& version)
{
- return rgw::notify::publish_commit(obj, size, mtime, etag, version, event_type, res, dpp);
+ return rgw::notify::publish_commit(obj, size, mtime, etag, version, res, dpp);
}
int RadosAtomicWriter::prepare(optional_yield y)
virtual std::unique_ptr<Lifecycle> get_lifecycle(void) override;
virtual std::unique_ptr<Notification> get_notification(rgw::sal::Object* obj, rgw::sal::Object* src_obj, req_state* s, rgw::notify::EventType event_type, optional_yield y, const std::string* object_name=nullptr) override;
virtual std::unique_ptr<Notification> get_notification(
- const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj,
- rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant,
- std::string& _req_id, optional_yield y) override;
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) override;
int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override;
int stat_topics_v1(const std::string& tenant, optional_yield y, const DoutPrefixProvider *dpp) override;
rgw::notify::reservation_t res;
public:
- RadosNotification(const DoutPrefixProvider* _dpp, RadosStore* _store, Object* _obj, Object* _src_obj, req_state* _s, rgw::notify::EventType _type, optional_yield y, const std::string* object_name) :
- StoreNotification(_obj, _src_obj, _type), store(_store), res(_dpp, _store, _s, _obj, _src_obj, object_name, y) { }
-
- RadosNotification(const DoutPrefixProvider* _dpp, RadosStore* _store, Object* _obj, Object* _src_obj, rgw::notify::EventType _type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant, std::string& _req_id, optional_yield y) :
- StoreNotification(_obj, _src_obj, _type), store(_store), res(_dpp, _store, _obj, _src_obj, _bucket, _user_id, _user_tenant, _req_id, y) {}
-
- ~RadosNotification() = default;
+ RadosNotification(const DoutPrefixProvider* _dpp,
+ RadosStore* _store,
+ Object* _obj,
+ Object* _src_obj,
+ req_state* _s,
+ rgw::notify::EventType _type,
+ optional_yield y,
+ const std::string* object_name)
+ : StoreNotification(_obj, _src_obj, {_type}),
+ store(_store),
+ res(_dpp, _store, _s, _obj, _src_obj, object_name, y) {}
+
+ RadosNotification(const DoutPrefixProvider* _dpp,
+ RadosStore* _store,
+ Object* _obj,
+ Object* _src_obj,
+ const rgw::notify::EventTypeList& _types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y)
+ : StoreNotification(_obj, _src_obj, _types),
+ store(_store),
+ res(_dpp,
+ _store,
+ _obj,
+ _src_obj,
+ _bucket,
+ _user_id,
+ _user_tenant,
+ _req_id,
+ y) {}
+
+ ~RadosNotification() = default;
rgw::notify::reservation_t& get_reservation(void) {
return res;
});
}
-static int remove_expired_obj(
- const DoutPrefixProvider *dpp, lc_op_ctx& oc, bool remove_indeed,
- rgw::notify::EventType event_type)
-{
+static int remove_expired_obj(const DoutPrefixProvider* dpp,
+ lc_op_ctx& oc,
+ bool remove_indeed,
+ const rgw::notify::EventTypeList& event_types) {
auto& driver = oc.driver;
auto& bucket_info = oc.bucket->get_info();
auto& o = oc.o;
del_op->params.unmod_since = meta.mtime;
// notification supported only for RADOS driver for now
- notify = driver->get_notification(dpp, obj.get(), nullptr, event_type,
- oc.bucket, lc_id,
- const_cast<std::string&>(oc.bucket->get_tenant()),
- lc_req_id, null_yield);
+ notify = driver->get_notification(
+ dpp, obj.get(), nullptr, event_types, oc.bucket, lc_id,
+ const_cast<std::string&>(oc.bucket->get_tenant()), lc_req_id, null_yield);
ret = notify->publish_reserve(dpp, nullptr);
if ( ret < 0) {
std::unique_ptr<rgw::sal::Notification> notify
= driver->get_notification(
- this, sal_obj.get(), nullptr, event_type,
- target, lc_id,
- const_cast<std::string&>(target->get_tenant()),
- lc_req_id, null_yield);
+ this, sal_obj.get(), nullptr, {event_type}, target, lc_id,
+ const_cast<std::string&>(target->get_tenant()), lc_req_id,
+ null_yield);
auto version_id = obj.key.instance;
ret = notify->publish_reserve(this, nullptr);
auto& o = oc.o;
int r;
if (o.is_delete_marker()) {
- r = remove_expired_obj(oc.dpp, oc, true,
- rgw::notify::ObjectExpirationDeleteMarker);
+ r = remove_expired_obj(
+ oc.dpp, oc, true,
+ {rgw::notify::ObjectExpirationDeleteMarker,
+ rgw::notify::LifecycleExpirationDeleteMarkerCreated});
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: current is-dm remove_expired_obj "
<< oc.bucket << ":" << o.key
} else {
/* ! o.is_delete_marker() */
r = remove_expired_obj(oc.dpp, oc, !oc.bucket->versioned(),
- rgw::notify::ObjectExpirationCurrent);
+ {rgw::notify::ObjectExpirationCurrent,
+ rgw::notify::LifecycleExpirationDelete});
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj "
<< oc.bucket << ":" << o.key
int process(lc_op_ctx& oc) override {
auto& o = oc.o;
int r = remove_expired_obj(oc.dpp, oc, true,
- rgw::notify::ObjectExpirationNoncurrent);
+ {rgw::notify::ObjectExpirationNoncurrent});
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (non-current expiration) "
<< oc.bucket << ":" << o.key
int process(lc_op_ctx& oc) override {
auto& o = oc.o;
int r = remove_expired_obj(oc.dpp, oc, true,
- rgw::notify::ObjectExpirationDeleteMarker);
+ {rgw::notify::ObjectExpirationDeleteMarker,
+ rgw::notify::LifecycleExpirationDeleteMarkerCreated});
if (r < 0) {
ldpp_dout(oc.dpp, 0) << "ERROR: remove_expired_obj (delete marker expiration) "
<< oc.bucket << ":" << o.key
/* If bucket is versioned, create delete_marker for current version
*/
if (! oc.bucket->versioned()) {
- ret = remove_expired_obj(oc.dpp, oc, true, rgw::notify::ObjectTransition);
+ ret =
+ remove_expired_obj(oc.dpp, oc, true, {rgw::notify::ObjectTransition});
ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key
<< ") not versioned flags: " << oc.o.flags << dendl;
} else {
/* versioned */
if (oc.o.is_current() && !oc.o.is_delete_marker()) {
ret = remove_expired_obj(oc.dpp, oc, false,
- rgw::notify::ObjectTransitionCurrent);
+ {rgw::notify::ObjectTransitionCurrent,
+ rgw::notify::LifecycleTransition});
ldpp_dout(oc.dpp, 20) << "delete_tier_obj Object(key:" << oc.o.key
<< ") current & not delete_marker"
<< " versioned_epoch: " << oc.o.versioned_epoch
<< "flags: " << oc.o.flags << dendl;
} else {
ret = remove_expired_obj(oc.dpp, oc, true,
- rgw::notify::ObjectTransitionNoncurrent);
+ {rgw::notify::ObjectTransitionNoncurrent});
ldpp_dout(oc.dpp, 20)
<< "delete_tier_obj Object(key:" << oc.o.key << ") not current "
<< "versioned_epoch: " << oc.o.versioned_epoch
return ret;
}
- const auto event_type = (bucket->versioned() &&
- oc.o.is_current() && !oc.o.is_delete_marker()) ?
- rgw::notify::ObjectTransitionCurrent :
- rgw::notify::ObjectTransitionNoncurrent;
+ rgw::notify::EventTypeList event_types;
+ if (bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
+ event_types.insert(event_types.end(),
+ {rgw::notify::ObjectTransitionCurrent,
+ rgw::notify::LifecycleTransition});
+ } else {
+ event_types.push_back(rgw::notify::ObjectTransitionNoncurrent);
+ }
- std::unique_ptr<rgw::sal::Notification> notify
- = oc.driver->get_notification(
- oc.dpp, obj.get(), nullptr, event_type,
- bucket, lc_id,
- const_cast<std::string&>(oc.bucket->get_tenant()),
- lc_req_id, null_yield);
+ std::unique_ptr<rgw::sal::Notification> notify =
+ oc.driver->get_notification(
+ oc.dpp, obj.get(), nullptr, event_types, bucket, lc_id,
+ const_cast<std::string&>(oc.bucket->get_tenant()), lc_req_id,
+ null_yield);
auto version_id = oc.o.key.instance;
ret = notify->publish_reserve(oc.dpp, nullptr);
return "s3:ObjectSynced:Delete";
case ObjectSyncedDeletionMarkerCreated:
return "s3:ObjectSynced:DeletionMarkerCreated";
+ case LifecycleExpiration:
+ return "s3:LifecycleExpiration:*";
+ case LifecycleExpirationDelete:
+ return "s3:LifecycleExpiration:Delete";
+ case LifecycleExpirationDeleteMarkerCreated:
+ return "s3:LifecycleExpiration:DeleteMarkerCreated";
+ case LifecycleTransition:
+ return "s3:LifecycleTransition";
case UnknownEvent:
return "s3:UnknownEvent";
}
return ObjectSyncedDelete;
if (s == "s3:ObjectSynced:DeletionMarkerCreated")
return ObjectSyncedDeletionMarkerCreated;
+ if (s == "s3:LifecycleExpiration:*")
+ return LifecycleExpiration;
+ if (s == "s3:LifecycleExpiration:Delete")
+ return LifecycleExpirationDelete;
+ if (s == "s3:LifecycleExpiration:DeleteMarkerCreated")
+ return LifecycleExpirationDeleteMarkerCreated;
+ if (s == "s3:LifecycleTransition")
+ return LifecycleTransition;
return UnknownEvent;
}
ObjectSyncedCreate = 0x10000,
ObjectSyncedDelete = 0x20000,
ObjectSyncedDeletionMarkerCreated = 0x40000,
- UnknownEvent = 0x100000
- };
+ LifecycleExpiration = 0xF00000,
+ LifecycleExpirationDelete = 0x100000,
+ LifecycleExpirationDeleteMarkerCreated = 0x200000,
+ LifecycleTransition = 0xF000000,
+ UnknownEvent = 0x10000000
+};
using EventTypeList = std::vector<EventType>;
rgw::notify::EventType event_type, optional_yield y, const std::string* object_name=nullptr) = 0;
/** No-req_state variant (e.g., rgwlc) */
virtual std::unique_ptr<Notification> get_notification(
- const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj,
- rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket, std::string& _user_id, std::string& _user_tenant,
- std::string& _req_id, optional_yield y) = 0;
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) = 0;
/** Read the topic config entry into @a data and (optionally) @a objv_tracker */
virtual int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) = 0;
rgw::notify::EventType event_type, optional_yield y,
const std::string* object_name)
{
- return std::make_unique<DBNotification>(obj, src_obj, event_type);
+ rgw::notify::EventTypeList event_types = {event_type};
+ return std::make_unique<DBNotification>(obj, src_obj, event_types);
}
std::unique_ptr<Notification> DBStore::get_notification(
- const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
- rgw::sal::Object* src_obj,
- rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
- std::string& _user_id, std::string& _user_tenant, std::string& _req_id,
- optional_yield y)
- {
- return std::make_unique<DBNotification>(obj, src_obj, event_type);
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) {
+ return std::make_unique<DBNotification>(obj, src_obj, event_types);
}
RGWLC* DBStore::get_rgwlc(void) {
class DBNotification : public StoreNotification {
protected:
public:
- DBNotification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type)
- : StoreNotification(_obj, _src_obj, _type) {}
- ~DBNotification() = default;
+ DBNotification(Object* _obj,
+ Object* _src_obj,
+ const rgw::notify::EventTypeList& _types)
+ : StoreNotification(_obj, _src_obj, _types) {}
+ ~DBNotification() = default;
- virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override { return 0;}
+ virtual int publish_reserve(const DoutPrefixProvider *dpp, RGWObjTags* obj_tags = nullptr) override { return 0;}
virtual int publish_commit(const DoutPrefixProvider* dpp, uint64_t size,
const ceph::real_time& mtime, const std::string& etag, const std::string& version) override { return 0; }
};
rgw::notify::EventType event_type, optional_yield y, const std::string* object_name) override;
virtual std::unique_ptr<Notification> get_notification(
- const DoutPrefixProvider* dpp, rgw::sal::Object* obj,
- rgw::sal::Object* src_obj,
- rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
- std::string& _user_id, std::string& _user_tenant, std::string& _req_id,
- optional_yield y) override;
-
- virtual RGWLC* get_rgwlc(void) override;
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) override;
+
+ virtual RGWLC* get_rgwlc(void) override;
virtual RGWCoroutinesManagerRegistry* get_cr_registry() override { return NULL; }
virtual int log_usage(const DoutPrefixProvider *dpp, std::map<rgw_user_bucket, RGWUsageBatch>& usage_info, optional_yield y) override;
virtual int log_op(const DoutPrefixProvider *dpp, std::string& oid, bufferlist& bl) override;
return std::make_unique<FilterNotification>(std::move(n));
}
-std::unique_ptr<Notification> FilterDriver::get_notification(const DoutPrefixProvider* dpp,
- rgw::sal::Object* obj, rgw::sal::Object* src_obj,
- rgw::notify::EventType event_type,
- rgw::sal::Bucket* _bucket, std::string& _user_id,
- std::string& _user_tenant, std::string& _req_id,
- optional_yield y)
-{
- std::unique_ptr<Notification> n = next->get_notification(dpp, nextObject(obj),
- nextObject(src_obj),
- event_type,
- nextBucket(_bucket),
- _user_id,
- _user_tenant,
- _req_id, y);
+std::unique_ptr<Notification> FilterDriver::get_notification(
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) {
+ std::unique_ptr<Notification> n = next->get_notification(
+ dpp, nextObject(obj), nextObject(src_obj), event_types,
+ nextBucket(_bucket), _user_id, _user_tenant, _req_id, y);
return std::make_unique<FilterNotification>(std::move(n));
}
rgw::notify::EventType event_type, optional_yield y,
const std::string* object_name=nullptr) override;
virtual std::unique_ptr<Notification> get_notification(
- const DoutPrefixProvider* dpp, rgw::sal::Object* obj, rgw::sal::Object* src_obj,
-
- rgw::notify::EventType event_type, rgw::sal::Bucket* _bucket,
- std::string& _user_id, std::string& _user_tenant,
- std::string& _req_id, optional_yield y) override;
+ const DoutPrefixProvider* dpp,
+ rgw::sal::Object* obj,
+ rgw::sal::Object* src_obj,
+ const rgw::notify::EventTypeList& event_types,
+ rgw::sal::Bucket* _bucket,
+ std::string& _user_id,
+ std::string& _user_tenant,
+ std::string& _req_id,
+ optional_yield y) override;
int read_topics(const std::string& tenant, rgw_pubsub_topics& topics, RGWObjVersionTracker* objv_tracker,
optional_yield y, const DoutPrefixProvider *dpp) override {
protected:
Object* obj;
Object* src_obj;
- rgw::notify::EventType event_type;
+ rgw::notify::EventTypeList event_types;
- public:
- StoreNotification(Object* _obj, Object* _src_obj, rgw::notify::EventType _type)
- : obj(_obj), src_obj(_src_obj), event_type(_type)
- {}
+ public:
+ StoreNotification(Object* _obj,
+ Object* _src_obj,
+ rgw::notify::EventTypeList _types)
+ : obj(_obj), src_obj(_src_obj), event_types(std::move(_types)) {}
- virtual ~StoreNotification() = default;
+ virtual ~StoreNotification() = default;
};
class StoreWriter : public Writer {
notification_name = bucket_name + NOTIFICATION_SUFFIX
topic_conf_list = [{'Id': notification_name,
'TopicArn': topic_arn,
- 'Events': ['s3:ObjectLifecycle:Expiration:*']
+ 'Events': ['s3:ObjectLifecycle:Expiration:*',
+ 's3:LifecycleExpiration:*']
}]
s3_notification_conf = PSNotificationS3(conn, bucket_name, topic_conf_list)
response, status = s3_notification_conf.set_config()
time_diff = time.time() - start_time
print('average time for creation + http notification is: ' + str(time_diff*1000/number_of_objects) + ' milliseconds')
-
+
# create lifecycle policy
client = boto3.client('s3',
endpoint_url='http://'+conn.host+':'+str(conn.port),
aws_access_key_id=conn.aws_access_key_id,
aws_secret_access_key=conn.aws_secret_access_key)
yesterday = datetime.date.today() - datetime.timedelta(days=1)
- response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
+ response = client.put_bucket_lifecycle_configuration(Bucket=bucket_name,
LifecycleConfiguration={'Rules': [
{
'ID': 'rule1',
print('total number of objects: ' + str(len(keys)))
event_keys = []
events = http_server.get_and_reset_events()
+ assert_equal(number_of_objects * 2, len(events))
for event in events:
- assert_equal(event['Records'][0]['eventName'], 'ObjectLifecycle:Expiration:Current')
+ assert_in(event['Records'][0]['eventName'],
+ ['LifecycleExpiration:Delete',
+ 'ObjectLifecycle:Expiration:Current'])
event_keys.append(event['Records'][0]['s3']['object']['key'])
for key in keys:
key_found = False