static std::string lc_id = "rgw lifecycle";
static std::string lc_req_id = "0";
+static void send_notification(const DoutPrefixProvider* dpp,
+ rgw::sal::Driver* driver,
+ rgw::sal::Object* obj,
+ rgw::sal::Bucket* bucket,
+ const std::string& etag,
+ uint64_t size,
+ const std::string& version_id,
+ const rgw::notify::EventTypeList& event_types) {
+ // notification supported only for RADOS driver for now
+ auto notify = driver->get_notification(
+ dpp, obj, nullptr, event_types, bucket, lc_id,
+ const_cast<std::string&>(bucket->get_tenant()), lc_req_id, null_yield);
+
+ int ret = notify->publish_reserve(dpp, nullptr);
+ if (ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: notify publish_reserve failed, with error: "
+ << ret << " for lc object: " << obj->get_name()
+ << " for event_types: " << event_types << dendl;
+ return;
+ }
+ ret = notify->publish_commit(dpp, size, ceph::real_clock::now(), etag,
+ version_id);
+ if (ret < 0) {
+ ldpp_dout(dpp, 5) << "WARNING: notify publish_commit failed, with error: "
+ << ret << " for lc object: " << obj->get_name()
+ << " for event_types: " << event_types << dendl;
+ }
+}
+
/* do all zones in the zone group process LC? */
static bool zonegroup_lc_check(const DoutPrefixProvider *dpp, rgw::sal::Zone* zone)
{
auto& meta = o.meta;
int ret;
auto version_id = obj_key.instance; // deep copy, so not cleared below
- std::unique_ptr<rgw::sal::Notification> notify;
/* per discussion w/Daniel, Casey,and Eric, we *do need*
* a new sal object handle, based on the following decision
if (iter != obj_state->attrset.end()) {
etag = rgw_bl_str(iter->second);
}
+ auto size = obj->get_obj_size();
std::unique_ptr<rgw::sal::Object::DeleteOp> del_op
= obj->get_delete_op();
del_op->params.bucket_owner = bucket_info.owner;
del_op->params.unmod_since = meta.mtime;
- // notification supported only for RADOS driver for now
- notify = driver->get_notification(
- dpp, obj.get(), nullptr, event_types, oc.bucket, lc_id,
- const_cast<std::string&>(oc.bucket->get_tenant()), lc_req_id, null_yield);
-
- ret = notify->publish_reserve(dpp, nullptr);
- if ( ret < 0) {
- ldpp_dout(dpp, 1)
- << "ERROR: notify reservation failed, deferring delete of object k="
- << o.key
- << dendl;
- return ret;
- }
-
uint32_t flags = (!remove_indeed || !zonegroup_lc_check(dpp, oc.driver->get_zone()))
? rgw::sal::FLAG_LOG_OP : 0;
ret = del_op->delete_obj(dpp, null_yield, flags);
ldpp_dout(dpp, 1) <<
fmt::format("ERROR: {} failed, with error: {}", __func__, ret) << dendl;
} else {
- // send request to notification manager
- int publish_ret = notify->publish_commit(dpp, obj_state->size,
- ceph::real_clock::now(),
- etag,
- version_id);
- if (publish_ret < 0) {
- ldpp_dout(dpp, 5) << "WARNING: notify publish_commit failed, with error: " << publish_ret << dendl;
- }
+ send_notification(dpp, driver, obj.get(), oc.bucket, etag, size, version_id,
+ event_types);
}
return ret;
params.ns = RGW_OBJ_NS_MULTIPART;
params.access_list_filter = MultipartMetaFilter;
- const auto event_type = rgw::notify::ObjectExpirationAbortMPU;
-
auto pf = [&](RGWLC::LCWorker *wk, WorkQ *wq, WorkItem &wi) {
int ret{0};
auto wt = boost::get<std::tuple<lc_op, rgw_bucket_dir_entry>>(wi);
if (iter != obj_state->attrset.end()) {
etag = rgw_bl_str(iter->second);
}
-
- std::unique_ptr<rgw::sal::Notification> notify
- = driver->get_notification(
- this, sal_obj.get(), nullptr, {event_type}, target, lc_id,
- const_cast<std::string&>(target->get_tenant()), lc_req_id,
- null_yield);
- auto version_id = obj.key.instance;
-
- ret = notify->publish_reserve(this, nullptr);
- if (ret < 0) {
- ldpp_dout(wk->get_lc(), 0)
- << "ERROR: reserving persistent notification for "
- "abort_multipart_upload, ret="
- << ret << ", thread:" << wq->thr_name()
- << ", deferring mpu cleanup for meta:" << obj.key << dendl;
- return ret;
- }
+ auto size = sal_obj->get_obj_size();
ret = mpu->abort(this, cct, null_yield);
if (ret == 0) {
- int publish_ret = notify->publish_commit(
- this, obj_state->size,
- ceph::real_clock::now(),
- etag,
- version_id);
- if (publish_ret < 0) {
- ldpp_dout(wk->get_lc(), 5)
- << "WARNING: notify publish_commit failed, with error: " << ret
- << dendl;
- }
+ const auto event_type = rgw::notify::ObjectExpirationAbortMPU;
+ send_notification(this, driver, sal_obj.get(), target, etag, size,
+ obj.key.instance, {event_type});
if (perfcounter) {
perfcounter->inc(l_rgw_lc_abort_mpu, 1);
}
if (iter != obj_state->attrset.end()) {
etag = rgw_bl_str(iter->second);
}
+<<<<<<< HEAD
rgw::notify::EventTypeList event_types;
if (bucket->versioned() && oc.o.is_current() && !oc.o.is_delete_marker()) {
const_cast<std::string&>(oc.bucket->get_tenant()), lc_req_id,
null_yield);
auto version_id = oc.o.key.instance;
-
- ret = notify->publish_reserve(oc.dpp, nullptr);
- if (ret < 0) {
- ldpp_dout(oc.dpp, 1)
- << "ERROR: notify reservation failed, deferring transition of object k="
- << oc.o.key
- << dendl;
- return ret;
- }
+ auto size = obj->get_obj_size();
ret = oc.obj->transition_to_cloud(oc.bucket, oc.tier.get(), oc.o,
oc.env.worker->get_cloud_targets(),
if (ret < 0) {
return ret;
} else {
- // send request to notification manager
- int publish_ret = notify->publish_commit(oc.dpp, obj_state->size,
- ceph::real_clock::now(),
- etag,
- version_id);
- if (publish_ret < 0) {
- ldpp_dout(oc.dpp, 5) <<
- "WARNING: notify publish_commit failed, with error: " << publish_ret << dendl;
+ rgw::notify::EventTypeList event_types;
+ if (bucket->versioned() && oc.o.is_current() &&
+ !oc.o.is_delete_marker()) {
+ event_types.insert(event_types.end(),
+ {rgw::notify::ObjectTransitionCurrent,
+ rgw::notify::LifecycleTransition});
+ } else {
+ event_types.push_back(rgw::notify::ObjectTransitionNonCurrent);
}
+ send_notification(oc.dpp, oc.driver, obj.get(), oc.bucket, etag, size,
+ oc.o.key.instance, event_types);
}
if (delete_object) {