From 19832a0daeb7b23006d1c38059ad14f0c239d32a Mon Sep 17 00:00:00 2001 From: Yuval Lifshitz Date: Sun, 1 Dec 2019 20:08:12 +0200 Subject: [PATCH] rgw/pubsub: support eventId in push mode Signed-off-by: Yuval Lifshitz --- PendingReleaseNotes | 7 +++++++ doc/radosgw/notifications.rst | 4 ++-- doc/radosgw/pubsub-module.rst | 3 +-- src/rgw/rgw_notify.cc | 9 +-------- src/rgw/rgw_pubsub.cc | 8 ++++++++ src/rgw/rgw_pubsub.h | 23 ++++++++++++----------- src/rgw/rgw_sync_module_pubsub.cc | 22 ++++------------------ 7 files changed, 35 insertions(+), 41 deletions(-) diff --git a/PendingReleaseNotes b/PendingReleaseNotes index 859dd60e753..3cd4acd7f6f 100644 --- a/PendingReleaseNotes +++ b/PendingReleaseNotes @@ -318,3 +318,10 @@ or manually without ceph-volume and LVM. Going forward, the OSD will limit itself to only 'hdd' and 'ssd' (or whatever device class the user manually specifies). + +* RGW: a mismatch between the bucket notification documentation and the actual + message format was fixed. This means that any endpoints receiving bucket + notification, will now receive the same notifications inside an JSON array + named 'Records'. Note that this does not affect pulling bucket notification + from a subscription in a 'pubsub' zone, as these are already wrapped inside + that array. diff --git a/doc/radosgw/notifications.rst b/doc/radosgw/notifications.rst index fee5eb0259f..dd6762e10a5 100644 --- a/doc/radosgw/notifications.rst +++ b/doc/radosgw/notifications.rst @@ -273,7 +273,7 @@ pushed or pulled using the pubsub sync module. "eTag":"", "versionId":"", "sequencer": "", - "metadata":"" + "metadata":[] } }, "eventId":"", @@ -298,7 +298,7 @@ pushed or pulled using the pubsub sync module. - s3.object.version: object version in case of versioned bucket - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format) - s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API) -- s3.eventId: not supported (an extension to the S3 notification API) +- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API) .. _PubSub Module : ../pubsub-module .. _S3 Notification Compatibility: ../s3-notification-compatibility diff --git a/doc/radosgw/pubsub-module.rst b/doc/radosgw/pubsub-module.rst index 473a9c032d2..32ff8bf7448 100644 --- a/doc/radosgw/pubsub-module.rst +++ b/doc/radosgw/pubsub-module.rst @@ -474,7 +474,7 @@ the events will have an S3-compatible record format (JSON): "eTag":"", "versionId":"", "sequencer":"", - "metadata":"" + "metadata":[] } }, "eventId":"", @@ -488,7 +488,6 @@ the events will have an S3-compatible record format (JSON): - requestParameters: not supported - responseElements: not supported - s3.configurationId: notification ID that created the subscription for the event -- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API) - s3.bucket.name: name of the bucket - s3.bucket.ownerIdentity.principalId: owner of the bucket - s3.bucket.arn: ARN of the bucket diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index 54a212e717b..11ca0256462 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -19,15 +19,11 @@ void populate_record_from_request(const req_state *s, const std::string& etag, EventType event_type, rgw_pubsub_s3_record& record) { - record.eventVersion = "2.1"; - record.eventSource = "aws:s3"; record.eventTime = mtime; record.eventName = to_string(event_type); record.userIdentity = s->user->user_id.id; // user that triggered the change - record.sourceIPAddress = ""; // IP address of client that triggered the change: TODO record.x_amz_request_id = s->req_id; // request ID of the original change record.x_amz_id_2 = s->host_id; // RGW on which the change was made - record.s3SchemaVersion = "1.0"; // configurationId is filled from subscription configuration record.bucket_name = s->bucket_name; record.bucket_ownerIdentity = s->bucket_owner.get_id().id; @@ -40,10 +36,7 @@ void populate_record_from_request(const req_state *s, const utime_t ts(real_clock::now()); boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), std::back_inserter(record.object_sequencer)); - // event ID is rgw extension (not in the S3 spec), used for acking the event - // same format is used in both S3 compliant and Ceph specific events - // not used in case of push-only mode - record.id = ""; + set_event_id(record.id, etag, ts); record.bucket_id = s->bucket.bucket_id; // pass meta data record.x_meta_map = s->info.x_meta_map; diff --git a/src/rgw/rgw_pubsub.cc b/src/rgw/rgw_pubsub.cc index 6c2f6cd74cc..4fac3ae0727 100644 --- a/src/rgw/rgw_pubsub.cc +++ b/src/rgw/rgw_pubsub.cc @@ -15,6 +15,14 @@ #define dout_subsys ceph_subsys_rgw +void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) { + char buf[64]; + const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); + if (len > 0) { + id.assign(buf, len); + } +} + bool rgw_s3_key_filter::decode_xml(XMLObj* obj) { XMLObjIter iter = obj->find("FilterRule"); XMLObj *o; diff --git a/src/rgw/rgw_pubsub.h b/src/rgw/rgw_pubsub.h index 1270124c6dc..a6f97ea6ff1 100644 --- a/src/rgw/rgw_pubsub.h +++ b/src/rgw/rgw_pubsub.h @@ -108,7 +108,7 @@ class rgw_pubsub_topic_filter; - + notification1 arn:aws:sns::: @@ -195,17 +195,16 @@ struct rgw_pubsub_s3_notifications { struct rgw_pubsub_s3_record { constexpr static const char* const json_type_plural = "Records"; - // 2.2 - std::string eventVersion; + std::string eventVersion = "2.2"; // aws:s3 - std::string eventSource; + std::string eventSource = "ceph:s3"; // zonegroup std::string awsRegion; // time of the request ceph::real_time eventTime; // type of the event std::string eventName; - // user that sent the requet (not implemented) + // user that sent the request std::string userIdentity; // IP address of source of the request (not implemented) std::string sourceIPAddress; @@ -213,20 +212,19 @@ struct rgw_pubsub_s3_record { std::string x_amz_request_id; // radosgw that received the request std::string x_amz_id_2; - // 1.0 - std::string s3SchemaVersion; + std::string s3SchemaVersion = "1.0"; // ID received in the notification request std::string configurationId; // bucket name std::string bucket_name; - // bucket owner (not implemented) + // bucket owner std::string bucket_ownerIdentity; // bucket ARN std::string bucket_arn; // object key std::string object_key; - // object size (not implemented) - uint64_t object_size; + // object size + uint64_t object_size = 0; // object etag std::string object_etag; // object version id bucket is versioned @@ -235,7 +233,7 @@ struct rgw_pubsub_s3_record { std::string object_sequencer; // this is an rgw extension (not S3 standard) // used to store a globally unique identifier of the event - // that could be used for acking + // that could be used for acking or any other identification of the event std::string id; // this is an rgw extension holding the internal bucket id std::string bucket_id; @@ -334,6 +332,9 @@ struct rgw_pubsub_event { }; WRITE_CLASS_ENCODER(rgw_pubsub_event) +// settign a unique ID for an event/record based on object hash and timestamp +void set_event_id(std::string& id, const std::string& hash, const utime_t& ts); + struct rgw_pubsub_sub_dest { std::string bucket_name; std::string oid_prefix; diff --git a/src/rgw/rgw_sync_module_pubsub.cc b/src/rgw/rgw_sync_module_pubsub.cc index 598eee6e41c..f674149c3a4 100644 --- a/src/rgw/rgw_sync_module_pubsub.cc +++ b/src/rgw/rgw_sync_module_pubsub.cc @@ -387,14 +387,6 @@ struct objstore_event { } }; -static void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) { - char buf[64]; - const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str()); - if (len > 0) { - id.assign(buf, len); - } -} - static void make_event_ref(CephContext *cct, const rgw_bucket& bucket, const rgw_obj_key& key, const ceph::real_time& mtime, @@ -426,22 +418,18 @@ static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket, *record = std::make_shared(); EventRef& r = *record; - r->eventVersion = "2.1"; - r->eventSource = "aws:s3"; r->eventTime = mtime; r->eventName = rgw::notify::to_string(event_type); - r->userIdentity = ""; // user that triggered the change: not supported in sync module - r->sourceIPAddress = ""; // IP address of client that triggered the change: not supported in sync module - r->x_amz_request_id = ""; // request ID of the original change: not supported in sync module - r->x_amz_id_2 = ""; // RGW on which the change was made: not supported in sync module - r->s3SchemaVersion = "1.0"; + // userIdentity: not supported in sync module + // x_amz_request_id: not supported in sync module + // x_amz_id_2: not supported in sync module // configurationId is filled from subscription configuration r->bucket_name = bucket.name; r->bucket_ownerIdentity = owner.to_str(); r->bucket_arn = to_string(rgw::ARN(bucket)); r->bucket_id = bucket.bucket_id; // rgw extension r->object_key = key.name; - r->object_size = 0; // not supported in sync module + // object_size not supported in sync module objstore_event oevent(bucket, key, mtime, attrs); r->object_etag = oevent.get_hash(); r->object_versionId = key.instance; @@ -451,8 +439,6 @@ static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket, boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), std::back_inserter(r->object_sequencer)); - // event ID is rgw extension (not in the S3 spec), used for acking the event - // same format is used in both S3 compliant and Ceph specific events set_event_id(r->id, r->object_etag, ts); } -- 2.39.5