namespace rgw::notify {
-struct record_with_endpoint_t {
- rgw_pubsub_s3_record record;
+struct event_entry_t {
+ rgw_pubsub_s3_event event;
std::string push_endpoint;
std::string push_endpoint_args;
std::string arn_topic;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
- encode(record, bl);
+ encode(event, bl);
encode(push_endpoint, bl);
encode(push_endpoint_args, bl);
encode(arn_topic, bl);
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
- decode(record, bl);
+ decode(event, bl);
decode(push_endpoint, bl);
decode(push_endpoint_args, bl);
decode(arn_topic, bl);
DECODE_FINISH(bl);
}
};
-WRITE_CLASS_ENCODER(record_with_endpoint_t)
+WRITE_CLASS_ENCODER(event_entry_t)
using queues_t = std::set<std::string>;
// processing of a specific entry
// return whether processing was successfull (true) or not (false)
bool process_entry(const cls_queue_entry& entry, spawn::yield_context yield) {
- record_with_endpoint_t record_with_endpoint;
+ event_entry_t event_entry;
auto iter = entry.data.cbegin();
try {
- decode(record_with_endpoint, iter);
+ decode(event_entry, iter);
} catch (buffer::error& err) {
ldout(cct, 5) << "WARNING: failed to decode entry. error: " << err.what() << dendl;
return false;
}
try {
// TODO move endpoint creation to queue level
- const auto push_endpoint = RGWPubSubEndpoint::create(record_with_endpoint.push_endpoint, record_with_endpoint.arn_topic,
- RGWHTTPArgs(record_with_endpoint.push_endpoint_args),
+ const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
+ RGWHTTPArgs(event_entry.push_endpoint_args),
cct);
- ldout(cct, 20) << "INFO: push endpoint created: " << record_with_endpoint.push_endpoint <<
+ ldout(cct, 20) << "INFO: push endpoint created: " << event_entry.push_endpoint <<
" for entry: " << entry.marker << dendl;
- const auto ret = push_endpoint->send_to_completion_async(cct, record_with_endpoint.record, optional_yield(io_context, yield));
+ const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
if (ret < 0) {
- ldout(cct, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << record_with_endpoint.push_endpoint
+ ldout(cct, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
<< " failed. error: " << ret << " (will retry)" << dendl;
return false;
} else {
- ldout(cct, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << record_with_endpoint.push_endpoint
+ ldout(cct, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
<< " ok" << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
return true;
}
} catch (const RGWPubSubEndpoint::configuration_error& e) {
ldout(cct, 5) << "WARNING: failed to create push endpoint: "
- << record_with_endpoint.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
+ << event_entry.push_endpoint << " for entry: " << entry.marker << ". error: " << e.what() << " (will retry) " << dendl;
return false;
}
}
}
}
-// populate record from request
-void populate_record_from_request(const req_state *s,
+// populate event from request
+void populate_event_from_request(const req_state *s,
rgw::sal::RGWObject* obj,
uint64_t size,
const ceph::real_time& mtime,
const std::string& etag,
EventType event_type,
- rgw_pubsub_s3_record& record) {
- record.eventTime = mtime;
- record.eventName = to_string(event_type);
- record.userIdentity = s->user->get_id().id; // user that triggered the change
- record.x_amz_request_id = s->req_id; // request ID of the original change
- record.x_amz_id_2 = s->host_id; // RGW on which the change was made
+ rgw_pubsub_s3_event& event) {
+ event.eventTime = mtime;
+ event.eventName = to_string(event_type);
+ event.userIdentity = s->user->get_id().id; // user that triggered the change
+ event.x_amz_request_id = s->req_id; // request ID of the original change
+ event.x_amz_id_2 = s->host_id; // RGW on which the change was made
// configurationId is filled from notification configuration
- record.bucket_name = s->bucket_name;
- record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
- record.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
- record.object_key = obj->get_name();
- record.object_size = size;
- record.object_etag = etag;
- record.object_versionId = obj->get_instance();
+ event.bucket_name = s->bucket_name;
+ event.bucket_ownerIdentity = s->bucket_owner.get_id().id;
+ event.bucket_arn = to_string(rgw::ARN(s->bucket->get_key()));
+ event.object_key = obj->get_name();
+ event.object_size = size;
+ event.object_etag = etag;
+ event.object_versionId = obj->get_instance();
// use timestamp as per key sequence id (hex encoded)
const utime_t ts(real_clock::now());
boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
- std::back_inserter(record.object_sequencer));
- set_event_id(record.id, etag, ts);
- record.bucket_id = s->bucket->get_bucket_id();
+ std::back_inserter(event.object_sequencer));
+ set_event_id(event.id, etag, ts);
+ event.bucket_id = s->bucket->get_bucket_id();
// pass meta data
if (s->info.x_meta_map.empty()) {
// try to fetch the metadata from the attributes
- metadata_from_attributes(s, obj, record.x_meta_map);
+ metadata_from_attributes(s, obj, event.x_meta_map);
} else {
- record.x_meta_map = s->info.x_meta_map;
+ event.x_meta_map = s->info.x_meta_map;
}
// pass tags
if (s->tagset.get_tags().empty()) {
// try to fetch the tags from the attributes
- tags_from_attributes(s, obj, record.tags);
+ tags_from_attributes(s, obj, event.tags);
} else {
- record.tags = s->tagset.get_tags();
+ event.tags = s->tagset.get_tags();
}
// opaque data will be filled from topic configuration
}
-bool match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj,
+bool notification_match(const rgw_pubsub_topic_filter& filter, const req_state* s, rgw::sal::RGWObject* obj,
EventType event, const RGWObjTags* req_tags) {
- if (!::match(filter.events, event)) {
+ if (!match(filter.events, event)) {
return false;
}
- if (!::match(filter.s3_filter.key_filter, obj->get_name())) {
+ if (!match(filter.s3_filter.key_filter, obj->get_name())) {
return false;
}
// metadata filter exists
if (!s->info.x_meta_map.empty()) {
// metadata was cached in req_state
- if (!::match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
+ if (!match(filter.s3_filter.metadata_filter, s->info.x_meta_map)) {
return false;
}
} else {
// try to fetch the metadata from the attributes
KeyValueMap metadata;
metadata_from_attributes(s, obj, metadata);
- if (!::match(filter.s3_filter.metadata_filter, metadata)) {
+ if (!match(filter.s3_filter.metadata_filter, metadata)) {
return false;
}
}
// tag filter exists
if (req_tags) {
// tags in the request
- if (!::match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
+ if (!match(filter.s3_filter.tag_filter, req_tags->get_tags())) {
return false;
}
} else if (!s->tagset.get_tags().empty()) {
// tags were cached in req_state
- if (!::match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
+ if (!match(filter.s3_filter.tag_filter, s->tagset.get_tags())) {
return false;
}
} else {
// try to fetch tags from the attributes
KeyValueMap tags;
tags_from_attributes(s, obj, tags);
- if (!::match(filter.s3_filter.tag_filter, tags)) {
+ if (!match(filter.s3_filter.tag_filter, tags)) {
return false;
}
}
for (const auto& bucket_topic : bucket_topics.topics) {
const rgw_pubsub_topic_filter& topic_filter = bucket_topic.second;
const rgw_pubsub_topic& topic_cfg = topic_filter.topic;
- if (!match(topic_filter, res.s, res.object, event_type, req_tags)) {
- // topic does not apply to req_state
+ if (!notification_match(topic_filter, res.s, res.object, event_type, req_tags)) {
+ // notification does not apply to req_state
continue;
}
ldout(res.s->cct, 20) << "INFO: notification: '" << topic_filter.s3_id <<
// nothing to commit or already committed/aborted
continue;
}
- record_with_endpoint_t record_with_endpoint;
- populate_record_from_request(res.s, obj, size, mtime, etag, event_type, record_with_endpoint.record);
- record_with_endpoint.record.configurationId = topic.configurationId;
- record_with_endpoint.record.opaque_data = topic.cfg.opaque_data;
+ event_entry_t event_entry;
+ populate_event_from_request(res.s, obj, size, mtime, etag, event_type, event_entry.event);
+ event_entry.event.configurationId = topic.configurationId;
+ event_entry.event.opaque_data = topic.cfg.opaque_data;
if (topic.cfg.dest.persistent) {
- record_with_endpoint.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
- record_with_endpoint.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args);
- record_with_endpoint.arn_topic = std::move(topic.cfg.dest.arn_topic);
+ event_entry.push_endpoint = std::move(topic.cfg.dest.push_endpoint);
+ event_entry.push_endpoint_args = std::move(topic.cfg.dest.push_endpoint_args);
+ event_entry.arn_topic = std::move(topic.cfg.dest.arn_topic);
bufferlist bl;
- encode(record_with_endpoint, bl);
+ encode(event_entry, bl);
const auto& queue_name = topic.cfg.dest.arn_topic;
if (bl.length() > res.size) {
// try to make a larger reservation, fail only if this is not possible
RGWHTTPArgs(topic.cfg.dest.push_endpoint_args),
res.s->cct);
ldout(res.s->cct, 20) << "INFO: push endpoint created: " << topic.cfg.dest.push_endpoint << dendl;
- const auto ret = push_endpoint->send_to_completion_async(res.s->cct, record_with_endpoint.record, res.s->yield);
+ const auto ret = push_endpoint->send_to_completion_async(res.s->cct, event_entry.event, res.s->yield);
if (ret < 0) {
ldout(res.s->cct, 1) << "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint << " failed. error: " << ret << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
do_encode_xml("NotificationConfiguration", list, "TopicConfiguration", f);
}
-void rgw_pubsub_s3_record::dump(Formatter *f) const {
+void rgw_pubsub_s3_event::dump(Formatter *f) const {
encode_json("eventVersion", eventVersion, f);
encode_json("eventSource", eventSource, f);
encode_json("awsRegion", awsRegion, f);
// explicit instantiation for the only two possible types
// no need to move implementation to header
template class RGWPubSub::SubWithEvents<rgw_pubsub_event>;
-template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_record>;
+template class RGWPubSub::SubWithEvents<rgw_pubsub_s3_event>;
]
}*/
-struct rgw_pubsub_s3_record {
+struct rgw_pubsub_s3_event {
constexpr static const char* const json_type_plural = "Records";
std::string eventVersion = "2.2";
// aws:s3
void dump(Formatter *f) const;
};
-WRITE_CLASS_ENCODER(rgw_pubsub_s3_record)
+WRITE_CLASS_ENCODER(rgw_pubsub_s3_event)
struct rgw_pubsub_event {
constexpr static const char* const json_type_plural = "events";
};
WRITE_CLASS_ENCODER(rgw_pubsub_event)
-// settign a unique ID for an event/record based on object hash and timestamp
+// settign a unique ID for an event based on object hash and timestamp
void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
struct rgw_pubsub_sub_dest {
if (conf.s3_id.empty()) {
return std::make_shared<SubWithEvents<rgw_pubsub_event>>(this, sub);
}
- return std::make_shared<SubWithEvents<rgw_pubsub_s3_record>>(this, sub);
+ return std::make_shared<SubWithEvents<rgw_pubsub_s3_event>>(this, sub);
}
void get_meta_obj(rgw_raw_obj *obj) const;
return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
}
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
- return new PostCR(json_format_pubsub_event(record), env, endpoint, ack_level, verify_ssl);
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
+ return new PostCR(json_format_pubsub_event(event), env, endpoint, ack_level, verify_ssl);
}
- int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+ int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
bufferlist read_bl;
RGWPostHTTPData request(cct, "POST", endpoint, &read_bl, verify_ssl);
- const auto post_data = json_format_pubsub_event(record);
+ const auto post_data = json_format_pubsub_event(event);
request.set_post_data(post_data);
request.set_send_length(post_data.length());
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_pending);
}
}
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
ceph_assert(conn);
if (ack_level == ack_level_t::None) {
- return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+ return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
} else {
- return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
}
}
}
};
- int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+ int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
ceph_assert(conn);
if (ack_level == ack_level_t::None) {
- return amqp::publish(conn, topic, json_format_pubsub_event(record));
+ return amqp::publish(conn, topic, json_format_pubsub_event(event));
} else {
// TODO: currently broker and routable are the same - this will require different flags but the same mechanism
// note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
auto w = std::unique_ptr<Waiter>(new Waiter);
const auto rc = amqp::publish_with_confirm(conn,
topic,
- json_format_pubsub_event(record),
+ json_format_pubsub_event(event),
std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
if (rc < 0) {
// failed to publish, does not wait for reply
}
}
- RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) override {
+ RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) override {
ceph_assert(conn);
if (ack_level == ack_level_t::None) {
- return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+ return new NoAckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
} else {
- return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(record));
+ return new AckPublishCR(cct, topic, conn, json_format_pubsub_event(event));
}
}
}
};
- int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) override {
+ int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) override {
ceph_assert(conn);
if (ack_level == ack_level_t::None) {
- return kafka::publish(conn, topic, json_format_pubsub_event(record));
+ return kafka::publish(conn, topic, json_format_pubsub_event(event));
} else {
// note: dynamic allocation of Waiter is needed when this is invoked from a beast coroutine
auto w = std::unique_ptr<Waiter>(new Waiter);
const auto rc = kafka::publish_with_confirm(conn,
topic,
- json_format_pubsub_event(record),
+ json_format_pubsub_event(event),
std::bind(&Waiter::finish, w.get(), std::placeholders::_1));
if (rc < 0) {
// failed to publish, does not wait for reply
class RGWCoroutine;
class RGWHTTPArgs;
struct rgw_pubsub_event;
-struct rgw_pubsub_s3_record;
+struct rgw_pubsub_s3_event;
// endpoint base class all endpoint - types should derive from it
class RGWPubSubEndpoint {
// this method is used in order to send notification (S3 compliant) and wait for completion
// in async manner via a coroutine when invoked in the data sync environment
- virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_record& record, RGWDataSyncEnv* env) = 0;
+ virtual RGWCoroutine* send_to_completion_async(const rgw_pubsub_s3_event& event, RGWDataSyncEnv* env) = 0;
// this method is used in order to send notification (S3 compliant) and wait for completion
// in async manner via a coroutine when invoked in the frontend environment
- virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_record& record, optional_yield y) = 0;
+ virtual int send_to_completion_async(CephContext* cct, const rgw_pubsub_s3_event& event, optional_yield y) = 0;
// present as string
virtual std::string to_str() const { return ""; }
encode_json("info", oevent, &e->info);
}
-static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
+static void make_s3_event_ref(CephContext *cct, const rgw_bucket& bucket,
const rgw_user& owner,
const rgw_obj_key& key,
const ceph::real_time& mtime,
- const std::vector<std::pair<std::string, std::string> > *attrs,
+ const std::vector<std::pair<std::string, std::string>>* attrs,
rgw::notify::EventType event_type,
- EventRef<rgw_pubsub_s3_record> *record) {
- *record = std::make_shared<rgw_pubsub_s3_record>();
+ EventRef<rgw_pubsub_s3_event>* event) {
+ *event = std::make_shared<rgw_pubsub_s3_event>();
- EventRef<rgw_pubsub_s3_record>& r = *record;
- r->eventTime = mtime;
- r->eventName = rgw::notify::to_string(event_type);
+ EventRef<rgw_pubsub_s3_event>& e = *event;
+ e->eventTime = mtime;
+ e->eventName = rgw::notify::to_string(event_type);
// userIdentity: not supported in sync module
// x_amz_request_id: not supported in sync module
// x_amz_id_2: not supported in sync module
// configurationId is filled from subscription configuration
- r->bucket_name = bucket.name;
- r->bucket_ownerIdentity = owner.to_str();
- r->bucket_arn = to_string(rgw::ARN(bucket));
- r->bucket_id = bucket.bucket_id; // rgw extension
- r->object_key = key.name;
+ e->bucket_name = bucket.name;
+ e->bucket_ownerIdentity = owner.to_str();
+ e->bucket_arn = to_string(rgw::ARN(bucket));
+ e->bucket_id = bucket.bucket_id; // rgw extension
+ e->object_key = key.name;
// object_size not supported in sync module
objstore_event oevent(bucket, key, mtime, attrs);
- r->object_etag = oevent.get_hash();
- r->object_versionId = key.instance;
+ e->object_etag = oevent.get_hash();
+ e->object_versionId = key.instance;
// use timestamp as per key sequence id (hex encoded)
const utime_t ts(real_clock::now());
boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t),
- std::back_inserter(r->object_sequencer));
+ std::back_inserter(e->object_sequencer));
- set_event_id(r->id, r->object_etag, ts);
+ set_event_id(e->id, e->object_etag, ts);
}
class PSManager;
const PSEnvRef env;
const rgw_user owner;
const EventRef<rgw_pubsub_event> event;
- const EventRef<rgw_pubsub_s3_record> record;
+ const EventRef<rgw_pubsub_s3_event> s3_event;
const TopicsRef topics;
bool has_subscriptions;
bool event_handled;
- bool sub_conf_found;
PSSubscriptionRef sub;
std::vector<PSTopicConfigRef>::const_iterator titer;
std::set<std::string>::const_iterator siter;
const PSEnvRef _env,
const rgw_user& _owner,
const EventRef<rgw_pubsub_event>& _event,
- const EventRef<rgw_pubsub_s3_record>& _record,
+ const EventRef<rgw_pubsub_s3_event>& _s3_event,
const TopicsRef& _topics) : RGWCoroutine(_sc->cct),
sc(_sc),
env(_env),
owner(_owner),
event(_event),
- record(_record),
+ s3_event(s3_event),
topics(_topics),
has_subscriptions(false),
event_handled(false) {}
}
} else {
// subscription was made by S3 compatible API
- ldout(sc->cct, 20) << "storing record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
- record->configurationId = sub->sub_conf->s3_id;
- record->opaque_data = (*titer)->opaque_data;
- yield call(PSSubscription::store_event_cr(sc, sub, record));
+ ldout(sc->cct, 20) << "storing s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+ s3_event->configurationId = sub->sub_conf->s3_id;
+ s3_event->opaque_data = (*titer)->opaque_data;
+ yield call(PSSubscription::store_event_cr(sc, sub, s3_event));
if (retcode < 0) {
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_fail);
- ldout(sc->cct, 1) << "ERROR: failed to store record for subscription=" << *siter << " ret=" << retcode << dendl;
+ ldout(sc->cct, 1) << "ERROR: failed to store s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
} else {
if (perfcounter) perfcounter->inc(l_rgw_pubsub_store_ok);
event_handled = true;
}
if (sub->sub_conf->push_endpoint) {
- ldout(sc->cct, 20) << "push record for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
- yield call(PSSubscription::push_event_cr(sc, sub, record));
+ ldout(sc->cct, 20) << "push s3 event for subscription=" << *siter << " owner=" << owner << " ret=" << retcode << dendl;
+ yield call(PSSubscription::push_event_cr(sc, sub, s3_event));
if (retcode < 0) {
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
- ldout(sc->cct, 1) << "ERROR: failed to push record for subscription=" << *siter << " ret=" << retcode << dendl;
+ ldout(sc->cct, 1) << "ERROR: failed to push s3 event for subscription=" << *siter << " ret=" << retcode << dendl;
} else {
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
event_handled = true;
PSEnvRef env;
std::optional<uint64_t> versioned_epoch;
EventRef<rgw_pubsub_event> event;
- EventRef<rgw_pubsub_s3_record> record;
+ EventRef<rgw_pubsub_s3_event> s3_event;
TopicsRef topics;
public:
RGWPSHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
}
attrs.push_back(std::make_pair(k, attr.second));
}
- // at this point we don't know whether we need the ceph event or S3 record
+ // at this point we don't know whether we need the ceph event or S3 event
// this is why both are created here, once we have information about the
// subscription, we will store/push only the relevant ones
make_event_ref(sc->cct,
sync_pipe.info.source_bs.bucket, key,
mtime, &attrs,
rgw::notify::ObjectCreated, &event);
- make_s3_record_ref(sc->cct,
+ make_s3_event_ref(sc->cct,
sync_pipe.info.source_bs.bucket, sync_pipe.dest_bucket_info.owner, key,
mtime, &attrs,
- rgw::notify::ObjectCreated, &record);
+ rgw::notify::ObjectCreated, &s3_event);
}
- yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, record, topics));
+ yield call(new RGWPSHandleObjEventCR(sc, env, sync_pipe.source_bucket_info.owner, event, s3_event, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}
ceph::real_time mtime;
rgw::notify::EventType event_type;
EventRef<rgw_pubsub_event> event;
- EventRef<rgw_pubsub_s3_record> record;
+ EventRef<rgw_pubsub_s3_event> s3_event;
TopicsRef topics;
public:
RGWPSGenericObjEventCBCR(RGWDataSyncCtx *_sc,
ldout(sc->cct, 20) << "no topics found for " << bucket << "/" << key << dendl;
return set_cr_done();
}
- // at this point we don't know whether we need the ceph event or S3 record
+ // at this point we don't know whether we need the ceph event or S3 event
// this is why both are created here, once we have information about the
// subscription, we will store/push only the relevant ones
make_event_ref(sc->cct,
bucket, key,
mtime, nullptr,
event_type, &event);
- make_s3_record_ref(sc->cct,
+ make_s3_event_ref(sc->cct,
bucket, owner, key,
mtime, nullptr,
- event_type, &record);
- yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, record, topics));
+ event_type, &s3_event);
+ yield call(new RGWPSHandleObjEventCR(sc, env, owner, event, s3_event, topics));
if (retcode < 0) {
return set_cr_error(retcode);
}