]> git.apps.os.sepia.ceph.com Git - ceph-ci.git/commitdiff
rgw/pubsub: support eventId in push mode
authorYuval Lifshitz <yuvalif@yahoo.com>
Sun, 1 Dec 2019 18:08:12 +0000 (20:08 +0200)
committerYuval Lifshitz <yuvalif@yahoo.com>
Tue, 3 Dec 2019 18:42:20 +0000 (20:42 +0200)
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
PendingReleaseNotes
doc/radosgw/notifications.rst
doc/radosgw/pubsub-module.rst
src/rgw/rgw_notify.cc
src/rgw/rgw_pubsub.cc
src/rgw/rgw_pubsub.h
src/rgw/rgw_sync_module_pubsub.cc

index 859dd60e75337ac93847640efc41a8aabe21b85c..3cd4acd7f6f7ee37809d610ec510f10f8c51431e 100644 (file)
   or manually without ceph-volume and LVM.  Going forward, the OSD
   will limit itself to only 'hdd' and 'ssd' (or whatever device class the user
   manually specifies).
+
+* RGW: a mismatch between the bucket notification documentation and the actual
+  message format was fixed. This means that any endpoints receiving bucket 
+  notification, will now receive the same notifications inside an JSON array
+  named 'Records'. Note that this does not affect pulling bucket notification
+  from a subscription in a 'pubsub' zone, as these are already wrapped inside
+  that array.
index fee5eb0259f2dd209b04c6c0dc49f045e62fc7eb..dd6762e10a56946c074ffed25f601318d0915ffa 100644 (file)
@@ -273,7 +273,7 @@ pushed or pulled using the pubsub sync module.
                    "eTag":"",
                    "versionId":"",
                    "sequencer": "",
-                   "metadata":""
+                   "metadata":[]
                }
            },
            "eventId":"",
@@ -298,7 +298,7 @@ pushed or pulled using the pubsub sync module.
 - s3.object.version: object version in case of versioned bucket
 - s3.object.sequencer: monotonically increasing identifier of the change per object (hexadecimal format)
 - s3.object.metadata: any metadata set on the object sent as: ``x-amz-meta-`` (an extension to the S3 notification API) 
-- s3.eventId: not supported (an extension to the S3 notification API)
+- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
 
 .. _PubSub Module : ../pubsub-module
 .. _S3 Notification Compatibility: ../s3-notification-compatibility
index 473a9c032d254b92451bc199fbbcd3b2558c12ab..32ff8bf7448a1dd775db9dd828baa0a108ba0e97 100644 (file)
@@ -474,7 +474,7 @@ the events will have an S3-compatible record format (JSON):
                    "eTag":"",
                    "versionId":"",
                    "sequencer":"",
-                   "metadata":""
+                   "metadata":[]
                }
            },
            "eventId":"",
@@ -488,7 +488,6 @@ the events will have an S3-compatible record format (JSON):
 - requestParameters: not supported
 - responseElements: not supported
 - s3.configurationId: notification ID that created the subscription for the event
-- s3.eventId: unique ID of the event, that could be used for acking (an extension to the S3 notification API)
 - s3.bucket.name: name of the bucket
 - s3.bucket.ownerIdentity.principalId: owner of the bucket
 - s3.bucket.arn: ARN of the bucket
index 54a212e717bc14e335824d6d50c36ca9f77f599a..11ca0256462da3779983f179bad15606cf51f8b9 100644 (file)
@@ -19,15 +19,11 @@ void populate_record_from_request(const req_state *s,
         const std::string& etag, 
         EventType event_type,
         rgw_pubsub_s3_record& record) { 
-  record.eventVersion = "2.1";
-  record.eventSource = "aws:s3";
   record.eventTime = mtime;
   record.eventName = to_string(event_type);
   record.userIdentity = s->user->user_id.id;    // user that triggered the change
-  record.sourceIPAddress = "";                  // IP address of client that triggered the change: TODO
   record.x_amz_request_id = s->req_id;          // request ID of the original change
   record.x_amz_id_2 = s->host_id;               // RGW on which the change was made
-  record.s3SchemaVersion = "1.0";
   // configurationId is filled from subscription configuration
   record.bucket_name = s->bucket_name;
   record.bucket_ownerIdentity = s->bucket_owner.get_id().id;
@@ -40,10 +36,7 @@ void populate_record_from_request(const req_state *s,
   const utime_t ts(real_clock::now());
   boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
           std::back_inserter(record.object_sequencer));
-  // event ID is rgw extension (not in the S3 spec), used for acking the event
-  // same format is used in both S3 compliant and Ceph specific events
-  // not used in case of push-only mode
-  record.id = "";
+  set_event_id(record.id, etag, ts);
   record.bucket_id = s->bucket.bucket_id;
   // pass meta data
   record.x_meta_map = s->info.x_meta_map;
index 6c2f6cd74cc8aff2d63404083a7bb7c343b1caec..4fac3ae07272c84878891df1efdbdeb864ef8b71 100644 (file)
 
 #define dout_subsys ceph_subsys_rgw
 
+void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
+  char buf[64];
+  const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
+  if (len > 0) {
+    id.assign(buf, len);
+  }
+}
+
 bool rgw_s3_key_filter::decode_xml(XMLObj* obj) {
   XMLObjIter iter = obj->find("FilterRule");
   XMLObj *o;
index 1270124c6dcc91ae19a1d5ddc59a134a975af3a1..a6f97ea6ff1102a9efe65c851e87434942afa8f1 100644 (file)
@@ -108,7 +108,7 @@ class rgw_pubsub_topic_filter;
           <Name></Name>
           <Value></Value>
         </FilterRule>
-      </s3Metadata>
+      </S3Metadata>
     </Filter>
     <Id>notification1</Id>
     <Topic>arn:aws:sns:<region>:<account>:<topic></Topic>
@@ -195,17 +195,16 @@ struct rgw_pubsub_s3_notifications {
 
 struct rgw_pubsub_s3_record {
   constexpr static const char* const json_type_plural = "Records";
-  // 2.2
-  std::string eventVersion;
+  std::string eventVersion = "2.2";
   // aws:s3
-  std::string eventSource;
+  std::string eventSource = "ceph:s3";
   // zonegroup
   std::string awsRegion;
   // time of the request
   ceph::real_time eventTime;
   // type of the event
   std::string eventName;
-  // user that sent the requet (not implemented)
+  // user that sent the request
   std::string userIdentity;
   // IP address of source of the request (not implemented)
   std::string sourceIPAddress;
@@ -213,20 +212,19 @@ struct rgw_pubsub_s3_record {
   std::string x_amz_request_id;
   // radosgw that received the request
   std::string x_amz_id_2;
-  // 1.0
-  std::string s3SchemaVersion;
+  std::string s3SchemaVersion = "1.0";
   // ID received in the notification request
   std::string configurationId;
   // bucket name
   std::string bucket_name;
-  // bucket owner (not implemented)
+  // bucket owner
   std::string bucket_ownerIdentity;
   // bucket ARN
   std::string bucket_arn;
   // object key
   std::string object_key;
-  // object size (not implemented)
-  uint64_t object_size;
+  // object size
+  uint64_t object_size = 0;
   // object etag
   std::string object_etag;
   // object version id bucket is versioned
@@ -235,7 +233,7 @@ struct rgw_pubsub_s3_record {
   std::string object_sequencer;
   // this is an rgw extension (not S3 standard)
   // used to store a globally unique identifier of the event
-  // that could be used for acking
+  // that could be used for acking or any other identification of the event
   std::string id;
   // this is an rgw extension holding the internal bucket id
   std::string bucket_id;
@@ -334,6 +332,9 @@ struct rgw_pubsub_event {
 };
 WRITE_CLASS_ENCODER(rgw_pubsub_event)
 
+// settign a unique ID for an event/record based on object hash and timestamp
+void set_event_id(std::string& id, const std::string& hash, const utime_t& ts);
+
 struct rgw_pubsub_sub_dest {
   std::string bucket_name;
   std::string oid_prefix;
index 598eee6e41c6e25d1d7437e8e01f44c8fe1826f5..f674149c3a49372f00ca71d174572396d7fef60a 100644 (file)
@@ -387,14 +387,6 @@ struct objstore_event {
   }
 };
 
-static void set_event_id(std::string& id, const std::string& hash, const utime_t& ts) {
-  char buf[64];
-  const auto len = snprintf(buf, sizeof(buf), "%010ld.%06ld.%s", (long)ts.sec(), (long)ts.usec(), hash.c_str());
-  if (len > 0) {
-    id.assign(buf, len);
-  }
-}
-
 static void make_event_ref(CephContext *cct, const rgw_bucket& bucket,
                        const rgw_obj_key& key,
                        const ceph::real_time& mtime,
@@ -426,22 +418,18 @@ static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
   *record = std::make_shared<rgw_pubsub_s3_record>();
 
   EventRef<rgw_pubsub_s3_record>& r = *record;
-  r->eventVersion = "2.1";
-  r->eventSource = "aws:s3";
   r->eventTime = mtime;
   r->eventName = rgw::notify::to_string(event_type);
-  r->userIdentity = "";         // user that triggered the change: not supported in sync module
-  r->sourceIPAddress = "";      // IP address of client that triggered the change: not supported in sync module
-  r->x_amz_request_id = "";     // request ID of the original change: not supported in sync module
-  r->x_amz_id_2 = "";           // RGW on which the change was made: not supported in sync module
-  r->s3SchemaVersion = "1.0";
+  // userIdentity: not supported in sync module
+  // x_amz_request_id: not supported in sync module
+  // x_amz_id_2: not supported in sync module
   // configurationId is filled from subscription configuration
   r->bucket_name = bucket.name;
   r->bucket_ownerIdentity = owner.to_str();
   r->bucket_arn = to_string(rgw::ARN(bucket));
   r->bucket_id = bucket.bucket_id; // rgw extension
   r->object_key = key.name;
-  r->object_size = 0;           // not supported in sync module
+  // object_size not supported in sync module
   objstore_event oevent(bucket, key, mtime, attrs);
   r->object_etag = oevent.get_hash();
   r->object_versionId = key.instance;
@@ -451,8 +439,6 @@ static void make_s3_record_ref(CephContext *cct, const rgw_bucket& bucket,
   boost::algorithm::hex((const char*)&ts, (const char*)&ts + sizeof(utime_t), 
           std::back_inserter(r->object_sequencer));
  
-  // event ID is rgw extension (not in the S3 spec), used for acking the event
-  // same format is used in both S3 compliant and Ceph specific events
   set_event_id(r->id, r->object_etag, ts);
 }