+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectLifecycle:Transition:NonCurrent`` | Supported, Ceph extension |
+------------------------------------------------+-----------------+-------------------------------------------+
+| ``s3:ObjectSynced:*`` | Supported, Ceph extension |
++------------------------------------------------+-----------------+-------------------------------------------+
+| ``s3:ObjectSynced:Create`` | Supported, Ceph Extension |
++------------------------------------------------+-----------------+-------------------------------------------+
+| ``s3:ObjectSynced:Delete`` | Defined, Ceph extension (not generated) |
++------------------------------------------------+-----------------+-------------------------------------------+
+| ``s3:ObjectSynced:DeletionMarkerCreated`` | Defined, Ceph extension (not generated) |
++------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRestore:Post`` | Not applicable to Ceph |
+------------------------------------------------+-----------------+-------------------------------------------+
| ``s3:ObjectRestore:Complete`` | Not applicable to Ceph |
.. note::
In case of multipart upload, an ``ObjectCreated:CompleteMultipartUpload`` notification will be sent at the end of the process.
+
+.. note::
+
+ The ``s3:ObjectSynced:Create`` event is sent when an object successfully syncs to a zone. It must be explicitely set for each zone.
Topic Configuration
-------------------
rgw::sal::RadosObject src_obj(store, key, &bucket);
rgw::sal::RadosBucket dest_bucket(store, dest_bucket_info);
rgw::sal::RadosObject dest_obj(store, dest_key.value_or(key), &dest_bucket);
+
+ std::string etag;
std::optional<uint64_t> bytes_transferred;
int r = store->getRados()->fetch_remote_obj(obj_ctx,
&src_obj,
&dest_bucket, /* dest */
nullptr, /* source */
- dest_placement_rule,
- NULL, /* real_time* src_mtime, */
+ dest_placement_rule,
+ nullptr, /* real_time* src_mtime, */
NULL, /* real_time* mtime, */
NULL, /* const real_time* mod_ptr, */
NULL, /* const real_time* unmod_ptr, */
versioned_epoch,
real_time(), /* delete_at */
NULL, /* string *ptag, */
- NULL, /* string *petag, */
+ &etag, /* string *petag, */
NULL, /* void (*progress_cb)(off_t, void *), */
NULL, /* void *progress_data*); */
dpp,
if (counters) {
counters->inc(sync_counters::l_fetch_err, 1);
}
- } else if (counters) {
- if (bytes_transferred) {
- counters->inc(sync_counters::l_fetch, *bytes_transferred);
- } else {
- counters->inc(sync_counters::l_fetch_not_modified);
- }
+ } else {
+ // r >= 0
+ if (bytes_transferred) {
+ // send notification that object was succesfully synced
+ std::string user_id = "rgw sync";
+ std::string req_id = "0";
+
+ RGWObjTags obj_tags;
+ auto iter = attrs.find(RGW_ATTR_TAGS);
+ if (iter != attrs.end()) {
+ try {
+ auto it = iter->second.cbegin();
+ obj_tags.decode(it);
+ } catch (buffer::error &err) {
+ ldpp_dout(dpp, 1) << "ERROR: " << __func__ << ": caught buffer::error couldn't decode TagSet " << dendl;
+ }
+ }
+
+ // NOTE: we create a mutable copy of bucket.get_tenant as the get_notification function expects a std::string&, not const
+ std::string tenant(dest_bucket.get_tenant());
+
+ std::unique_ptr<rgw::sal::Notification> notify
+ = store->get_notification(dpp, &dest_obj, nullptr, rgw::notify::ObjectSyncedCreate,
+ &dest_bucket, user_id,
+ tenant,
+ req_id, null_yield);
+
+ auto notify_res = static_cast<rgw::sal::RadosNotification*>(notify.get())->get_reservation();
+ int ret = rgw::notify::publish_reserve(dpp, rgw::notify::ObjectSyncedCreate, notify_res, &obj_tags);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: reserving notification failed, with error: " << ret << dendl;
+ // no need to return, the sync already happened
+ } else {
+ ret = rgw::notify::publish_commit(&dest_obj, dest_obj.get_obj_size(), ceph::real_clock::now(), etag, dest_obj.get_instance(), rgw::notify::ObjectSyncedCreate, notify_res, dpp);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: publishing notification failed, with error: " << ret << dendl;
+ }
+ }
+ }
+
+ if (counters) {
+ if (bytes_transferred) {
+ counters->inc(sync_counters::l_fetch, *bytes_transferred);
+ } else {
+ counters->inc(sync_counters::l_fetch_not_modified);
+ }
+ }
}
return r;
}
event.x_amz_id_2 = res.store->getRados()->host_id; // RGW on which the change was made
// configurationId is filled from notification configuration
event.bucket_name = res.bucket->get_name();
- event.bucket_ownerIdentity = res.bucket->get_owner()->get_id().id;
+ event.bucket_ownerIdentity = res.bucket->get_owner() ? res.bucket->get_owner()->get_id().id : "";
event.bucket_arn = to_string(rgw::ARN(res.bucket->get_key()));
event.object_key = res.object_name ? *res.object_name : obj->get_name();
event.object_size = size;
return "s3:ObjectLifecycle:Transition:Current";
case ObjectTransitionNoncurrent:
return "s3:ObjectLifecycle:Transition:Noncurrent";
+ case ObjectSynced:
+ return "s3:ObjectSynced:*";
+ case ObjectSyncedCreate:
+ return "s3:ObjectSynced:Create";
+ case ObjectSyncedDelete:
+ return "s3:ObjectSynced:Delete";
+ case ObjectSyncedDeletionMarkerCreated:
+ return "s3:ObjectSynced:DeletionMarkerCreated";
case UnknownEvent:
return "s3:UnknownEvent";
}
case ObjectTransitionCurrent:
case ObjectTransitionNoncurrent:
return "OBJECT_TRANSITION";
+ case ObjectSynced:
+ case ObjectSyncedCreate:
+ case ObjectSyncedDelete:
+ case ObjectSyncedDeletionMarkerCreated:
+ return "OBJECT_SYNCED";
case ObjectRemoved:
case UnknownEvent:
return "UNKNOWN_EVENT";
return ObjectTransitionCurrent;
if (s == "s3:ObjectLifecycle:Transition:Noncurrent")
return ObjectTransitionNoncurrent;
+ if (s == "s3:ObjectSynced:*" || s == "OBJECT_SYNCED")
+ return ObjectSynced;
+ if (s == "s3:ObjectSynced:Create")
+ return ObjectSyncedCreate;
+ if (s == "s3:ObjectSynced:Delete")
+ return ObjectSyncedDelete;
+ if (s == "s3:ObjectSynced:DeletionMarkerCreated")
+ return ObjectSyncedDeletionMarkerCreated;
return UnknownEvent;
}
ObjectTransition = 0xF000,
ObjectTransitionCurrent = 0x1000,
ObjectTransitionNoncurrent = 0x2000,
- UnknownEvent = 0x10000
+ ObjectSynced = 0xF0000,
+ ObjectSyncedCreate = 0x10000,
+ ObjectSyncedDelete = 0x20000,
+ ObjectSyncedDeletionMarkerCreated = 0x40000,
+ UnknownEvent = 0x100000
};
using EventTypeList = std::vector<EventType>;