]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: notifications on object replication 43371/head
authorLiav Turkia <liav.turkia@gmail.com>
Sun, 27 Feb 2022 16:24:09 +0000 (18:24 +0200)
committerLiav Turkia <liav.turkia@gmail.com>
Tue, 26 Apr 2022 18:59:14 +0000 (21:59 +0300)
Signed-off-by: liavt <liav.turkia@gmail.com>
doc/radosgw/s3-notification-compatibility.rst
src/rgw/rgw_cr_rados.cc
src/rgw/rgw_notify.cc
src/rgw/rgw_notify_event_type.cc
src/rgw/rgw_notify_event_type.h

index d0259d1bb20f73265683726e75c3faefb067b629..ad2292094b2ff7a77eb511a8274c9c18acb62413 100644 (file)
@@ -110,6 +110,14 @@ Event Types
 +------------------------------------------------+-----------------+-------------------------------------------+
 | ``s3:ObjectLifecycle:Transition:NonCurrent``   | Supported, Ceph extension                                   |
 +------------------------------------------------+-----------------+-------------------------------------------+
+| ``s3:ObjectSynced:*``                          | Supported, Ceph extension                                   |
++------------------------------------------------+-----------------+-------------------------------------------+
+| ``s3:ObjectSynced:Create``                     | Supported, Ceph Extension                                   |
++------------------------------------------------+-----------------+-------------------------------------------+
+| ``s3:ObjectSynced:Delete``                     | Defined, Ceph extension (not generated)                     |
++------------------------------------------------+-----------------+-------------------------------------------+
+| ``s3:ObjectSynced:DeletionMarkerCreated``      | Defined, Ceph extension (not generated)                     |
++------------------------------------------------+-----------------+-------------------------------------------+
 | ``s3:ObjectRestore:Post``                      | Not applicable to Ceph                                      |
 +------------------------------------------------+-----------------+-------------------------------------------+
 | ``s3:ObjectRestore:Complete``                  | Not applicable to Ceph                                      |
@@ -124,6 +132,10 @@ Event Types
 .. note::
 
    In case of multipart upload, an ``ObjectCreated:CompleteMultipartUpload`` notification will be sent at the end of the process.
+   
+.. note::
+
+   The ``s3:ObjectSynced:Create`` event is sent when an object successfully syncs to a zone. It must be explicitely set for each zone. 
 
 Topic Configuration
 -------------------
index 55a8791c038a70bccc848ed455f361bdd19e70b1..8e0714b377ffb6d30a0b06701a525ca9ad3e9d04 100644 (file)
@@ -652,6 +652,8 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
   rgw::sal::RadosObject src_obj(store, key, &bucket);
   rgw::sal::RadosBucket dest_bucket(store, dest_bucket_info);
   rgw::sal::RadosObject dest_obj(store, dest_key.value_or(key), &dest_bucket);
+    
+  std::string etag;
 
   std::optional<uint64_t> bytes_transferred;
   int r = store->getRados()->fetch_remote_obj(obj_ctx,
@@ -662,8 +664,8 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
                        &src_obj,
                        &dest_bucket, /* dest */
                        nullptr, /* source */
-                      dest_placement_rule,
-                       NULL, /* real_time* src_mtime, */
+                       dest_placement_rule,
+                       nullptr, /* real_time* src_mtime, */
                        NULL, /* real_time* mtime, */
                        NULL, /* const real_time* mod_ptr, */
                        NULL, /* const real_time* unmod_ptr, */
@@ -677,7 +679,7 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
                        versioned_epoch,
                        real_time(), /* delete_at */
                        NULL, /* string *ptag, */
-                       NULL, /* string *petag, */
+                       &etag, /* string *petag, */
                        NULL, /* void (*progress_cb)(off_t, void *), */
                        NULL, /* void *progress_data*); */
                        dpp,
@@ -690,12 +692,53 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp)
     if (counters) {
       counters->inc(sync_counters::l_fetch_err, 1);
     }
-  } else if (counters) {
-    if (bytes_transferred) {
-      counters->inc(sync_counters::l_fetch, *bytes_transferred);
-    } else {
-      counters->inc(sync_counters::l_fetch_not_modified);
-    }
+  } else {
+      // r >= 0
+      if (bytes_transferred) {
+        // send notification that object was succesfully synced
+        std::string user_id = "rgw sync";
+        std::string req_id = "0";
+                       
+        RGWObjTags obj_tags;
+        auto iter = attrs.find(RGW_ATTR_TAGS);
+        if (iter != attrs.end()) {
+          try {
+            auto it = iter->second.cbegin();
+            obj_tags.decode(it);
+          } catch (buffer::error &err) {
+            ldpp_dout(dpp, 1) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+          }
+        }
+
+        // NOTE: we create a mutable copy of bucket.get_tenant as the get_notification function expects a std::string&, not const
+        std::string tenant(dest_bucket.get_tenant());
+
+        std::unique_ptr<rgw::sal::Notification> notify 
+                 = store->get_notification(dpp, &dest_obj, nullptr, rgw::notify::ObjectSyncedCreate,
+                  &dest_bucket, user_id,
+                  tenant,
+                  req_id, null_yield);
+
+        auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
+        int ret = rgw::notify::publish_reserve(dpp, rgw::notify::ObjectSyncedCreate, notify_res, &obj_tags);
+        if (ret < 0) {
+          ldpp_dout(dpp, 1) << "ERROR: reserving notification failed, with error: " << ret << dendl;
+          // no need to return, the sync already happened
+        } else {
+          ret = rgw::notify::publish_commit(&dest_obj, dest_obj.get_obj_size(), ceph::real_clock::now(), etag, dest_obj.get_instance(), rgw::notify::ObjectSyncedCreate, notify_res, dpp);
+          if (ret < 0) {
+            ldpp_dout(dpp, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
+          }
+        }
+      }
+      
+      if (counters) {
+        if (bytes_transferred) {
+          counters->inc(sync_counters::l_fetch, *bytes_transferred);
+        } else {
+          counters->inc(sync_counters::l_fetch_not_modified);
+        }
+      }
   }
   return r;
 }
index aa02cf85aac164bda6621eda009ca22afc78ae0d..3318424ad10ecea1bf680fff1f387c5ba57ff95c 100644 (file)
@@ -686,7 +686,7 @@ static inline void populate_event(reservation_t& res,
   event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made
   // configurationId is filled from notification configuration
   event.bucket_name = res.bucket->get_name();
-  event.bucket_ownerIdentity = res.bucket->get_owner()->get_id().id;
+  event.bucket_ownerIdentity = res.bucket->get_owner() ? res.bucket->get_owner()->get_id().id : "";
   event.bucket_arn = to_string(rgw::ARN(res.bucket->get_key()));
   event.object_key = res.object_name ? *res.object_name : obj->get_name();
   event.object_size = size;
index 059cfc3469a8862e28108211c36177d2072bc04a..8ee52c674dba47fc5cbbed733b33b4b64911b89d 100644 (file)
@@ -42,6 +42,14 @@ namespace rgw::notify {
       return "s3:ObjectLifecycle:Transition:Current";
     case ObjectTransitionNoncurrent:
       return "s3:ObjectLifecycle:Transition:Noncurrent";
+    case ObjectSynced:
+      return "s3:ObjectSynced:*";
+    case ObjectSyncedCreate:
+      return "s3:ObjectSynced:Create";
+    case ObjectSyncedDelete:
+      return "s3:ObjectSynced:Delete";
+    case ObjectSyncedDeletionMarkerCreated:
+      return "s3:ObjectSynced:DeletionMarkerCreated";
     case UnknownEvent:
         return "s3:UnknownEvent";
     }
@@ -72,6 +80,11 @@ namespace rgw::notify {
     case ObjectTransitionCurrent:
     case ObjectTransitionNoncurrent:
       return "OBJECT_TRANSITION";
+    case ObjectSynced:
+    case ObjectSyncedCreate:
+    case ObjectSyncedDelete:
+    case ObjectSyncedDeletionMarkerCreated:
+      return "OBJECT_SYNCED";
     case ObjectRemoved:
     case UnknownEvent:
       return "UNKNOWN_EVENT";
@@ -118,6 +131,14 @@ namespace rgw::notify {
         return ObjectTransitionCurrent;
     if (s == "s3:ObjectLifecycle:Transition:Noncurrent")
         return ObjectTransitionNoncurrent;
+    if (s == "s3:ObjectSynced:*" || s == "OBJECT_SYNCED")
+        return ObjectSynced;
+    if (s == "s3:ObjectSynced:Create")
+        return ObjectSyncedCreate;
+    if (s == "s3:ObjectSynced:Delete")
+        return ObjectSyncedDelete;
+    if (s == "s3:ObjectSynced:DeletionMarkerCreated")
+        return ObjectSyncedDeletionMarkerCreated;
     return UnknownEvent;
   }
 
index bbfff88f5e169a2fc29918fc8e29ad7e40d97c92..d09432b1626dc302e358a1c6fe26ec39b9b0d516 100644 (file)
@@ -25,7 +25,11 @@ namespace rgw::notify {
     ObjectTransition                     = 0xF000,
     ObjectTransitionCurrent              = 0x1000,
     ObjectTransitionNoncurrent           = 0x2000,
-    UnknownEvent                         = 0x10000
+    ObjectSynced                         = 0xF0000,
+    ObjectSyncedCreate                   = 0x10000,
+    ObjectSyncedDelete                   = 0x20000,
+    ObjectSyncedDeletionMarkerCreated    = 0x40000,
+    UnknownEvent                         = 0x100000
   };
 
   using EventTypeList = std::vector<EventType>;