From b303ae1f117e19bbc7b8f6bed4eb21fa50733865 Mon Sep 17 00:00:00 2001 From: Liav Turkia Date: Sun, 27 Feb 2022 18:24:09 +0200 Subject: [PATCH] rgw: notifications on object replication Signed-off-by: liavt --- doc/radosgw/s3-notification-compatibility.rst | 12 ++++ src/rgw/rgw_cr_rados.cc | 61 ++++++++++++++++--- src/rgw/rgw_notify.cc | 2 +- src/rgw/rgw_notify_event_type.cc | 21 +++++++ src/rgw/rgw_notify_event_type.h | 6 +- 5 files changed, 91 insertions(+), 11 deletions(-) diff --git a/doc/radosgw/s3-notification-compatibility.rst b/doc/radosgw/s3-notification-compatibility.rst index d0259d1bb20..ad2292094b2 100644 --- a/doc/radosgw/s3-notification-compatibility.rst +++ b/doc/radosgw/s3-notification-compatibility.rst @@ -110,6 +110,14 @@ Event Types +------------------------------------------------+-----------------+-------------------------------------------+ | ``s3:ObjectLifecycle:Transition:NonCurrent`` | Supported, Ceph extension | +------------------------------------------------+-----------------+-------------------------------------------+ +| ``s3:ObjectSynced:*`` | Supported, Ceph extension | ++------------------------------------------------+-----------------+-------------------------------------------+ +| ``s3:ObjectSynced:Create`` | Supported, Ceph Extension | ++------------------------------------------------+-----------------+-------------------------------------------+ +| ``s3:ObjectSynced:Delete`` | Defined, Ceph extension (not generated) | ++------------------------------------------------+-----------------+-------------------------------------------+ +| ``s3:ObjectSynced:DeletionMarkerCreated`` | Defined, Ceph extension (not generated) | ++------------------------------------------------+-----------------+-------------------------------------------+ | ``s3:ObjectRestore:Post`` | Not applicable to Ceph | +------------------------------------------------+-----------------+-------------------------------------------+ | ``s3:ObjectRestore:Complete`` | Not applicable to Ceph | @@ -124,6 +132,10 @@ Event Types .. note:: In case of multipart upload, an ``ObjectCreated:CompleteMultipartUpload`` notification will be sent at the end of the process. + +.. note:: + + The ``s3:ObjectSynced:Create`` event is sent when an object successfully syncs to a zone. It must be explicitely set for each zone. Topic Configuration ------------------- diff --git a/src/rgw/rgw_cr_rados.cc b/src/rgw/rgw_cr_rados.cc index 55a8791c038..8e0714b377f 100644 --- a/src/rgw/rgw_cr_rados.cc +++ b/src/rgw/rgw_cr_rados.cc @@ -652,6 +652,8 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp) rgw::sal::RadosObject src_obj(store, key, &bucket); rgw::sal::RadosBucket dest_bucket(store, dest_bucket_info); rgw::sal::RadosObject dest_obj(store, dest_key.value_or(key), &dest_bucket); + + std::string etag; std::optional bytes_transferred; int r = store->getRados()->fetch_remote_obj(obj_ctx, @@ -662,8 +664,8 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp) &src_obj, &dest_bucket, /* dest */ nullptr, /* source */ - dest_placement_rule, - NULL, /* real_time* src_mtime, */ + dest_placement_rule, + nullptr, /* real_time* src_mtime, */ NULL, /* real_time* mtime, */ NULL, /* const real_time* mod_ptr, */ NULL, /* const real_time* unmod_ptr, */ @@ -677,7 +679,7 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp) versioned_epoch, real_time(), /* delete_at */ NULL, /* string *ptag, */ - NULL, /* string *petag, */ + &etag, /* string *petag, */ NULL, /* void (*progress_cb)(off_t, void *), */ NULL, /* void *progress_data*); */ dpp, @@ -690,12 +692,53 @@ int RGWAsyncFetchRemoteObj::_send_request(const DoutPrefixProvider *dpp) if (counters) { counters->inc(sync_counters::l_fetch_err, 1); } - } else if (counters) { - if (bytes_transferred) { - counters->inc(sync_counters::l_fetch, *bytes_transferred); - } else { - counters->inc(sync_counters::l_fetch_not_modified); - } + } else { + // r >= 0 + if (bytes_transferred) { + // send notification that object was succesfully synced + std::string user_id = "rgw sync"; + std::string req_id = "0"; + + RGWObjTags obj_tags; + auto iter = attrs.find(RGW_ATTR_TAGS); + if (iter != attrs.end()) { + try { + auto it = iter->second.cbegin(); + obj_tags.decode(it); + } catch (buffer::error &err) { + ldpp_dout(dpp, 1) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl; + } + } + + // NOTE: we create a mutable copy of bucket.get_tenant as the get_notification function expects a std::string&, not const + std::string tenant(dest_bucket.get_tenant()); + + std::unique_ptr notify + = store->get_notification(dpp, &dest_obj, nullptr, rgw::notify::ObjectSyncedCreate, + &dest_bucket, user_id, + tenant, + req_id, null_yield); + + auto notify_res = static_cast(notify.get())->get_reservation(); + int ret = rgw::notify::publish_reserve(dpp, rgw::notify::ObjectSyncedCreate, notify_res, &obj_tags); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: reserving notification failed, with error: " << ret << dendl; + // no need to return, the sync already happened + } else { + ret = rgw::notify::publish_commit(&dest_obj, dest_obj.get_obj_size(), ceph::real_clock::now(), etag, dest_obj.get_instance(), rgw::notify::ObjectSyncedCreate, notify_res, dpp); + if (ret < 0) { + ldpp_dout(dpp, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl; + } + } + } + + if (counters) { + if (bytes_transferred) { + counters->inc(sync_counters::l_fetch, *bytes_transferred); + } else { + counters->inc(sync_counters::l_fetch_not_modified); + } + } } return r; } diff --git a/src/rgw/rgw_notify.cc b/src/rgw/rgw_notify.cc index aa02cf85aac..3318424ad10 100644 --- a/src/rgw/rgw_notify.cc +++ b/src/rgw/rgw_notify.cc @@ -686,7 +686,7 @@ static inline void populate_event(reservation_t& res, event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made // configurationId is filled from notification configuration event.bucket_name = res.bucket->get_name(); - event.bucket_ownerIdentity = res.bucket->get_owner()->get_id().id; + event.bucket_ownerIdentity = res.bucket->get_owner() ? res.bucket->get_owner()->get_id().id : ""; event.bucket_arn = to_string(rgw::ARN(res.bucket->get_key())); event.object_key = res.object_name ? *res.object_name : obj->get_name(); event.object_size = size; diff --git a/src/rgw/rgw_notify_event_type.cc b/src/rgw/rgw_notify_event_type.cc index 059cfc3469a..8ee52c674db 100644 --- a/src/rgw/rgw_notify_event_type.cc +++ b/src/rgw/rgw_notify_event_type.cc @@ -42,6 +42,14 @@ namespace rgw::notify { return "s3:ObjectLifecycle:Transition:Current"; case ObjectTransitionNoncurrent: return "s3:ObjectLifecycle:Transition:Noncurrent"; + case ObjectSynced: + return "s3:ObjectSynced:*"; + case ObjectSyncedCreate: + return "s3:ObjectSynced:Create"; + case ObjectSyncedDelete: + return "s3:ObjectSynced:Delete"; + case ObjectSyncedDeletionMarkerCreated: + return "s3:ObjectSynced:DeletionMarkerCreated"; case UnknownEvent: return "s3:UnknownEvent"; } @@ -72,6 +80,11 @@ namespace rgw::notify { case ObjectTransitionCurrent: case ObjectTransitionNoncurrent: return "OBJECT_TRANSITION"; + case ObjectSynced: + case ObjectSyncedCreate: + case ObjectSyncedDelete: + case ObjectSyncedDeletionMarkerCreated: + return "OBJECT_SYNCED"; case ObjectRemoved: case UnknownEvent: return "UNKNOWN_EVENT"; @@ -118,6 +131,14 @@ namespace rgw::notify { return ObjectTransitionCurrent; if (s == "s3:ObjectLifecycle:Transition:Noncurrent") return ObjectTransitionNoncurrent; + if (s == "s3:ObjectSynced:*" || s == "OBJECT_SYNCED") + return ObjectSynced; + if (s == "s3:ObjectSynced:Create") + return ObjectSyncedCreate; + if (s == "s3:ObjectSynced:Delete") + return ObjectSyncedDelete; + if (s == "s3:ObjectSynced:DeletionMarkerCreated") + return ObjectSyncedDeletionMarkerCreated; return UnknownEvent; } diff --git a/src/rgw/rgw_notify_event_type.h b/src/rgw/rgw_notify_event_type.h index bbfff88f5e1..d09432b1626 100644 --- a/src/rgw/rgw_notify_event_type.h +++ b/src/rgw/rgw_notify_event_type.h @@ -25,7 +25,11 @@ namespace rgw::notify { ObjectTransition = 0xF000, ObjectTransitionCurrent = 0x1000, ObjectTransitionNoncurrent = 0x2000, - UnknownEvent = 0x10000 + ObjectSynced = 0xF0000, + ObjectSyncedCreate = 0x10000, + ObjectSyncedDelete = 0x20000, + ObjectSyncedDeletionMarkerCreated = 0x40000, + UnknownEvent = 0x100000 }; using EventTypeList = std::vector; -- 2.39.5