objv_tracker, y);
}
+
+#define RGW_ATTR_COMMITTED_LOGGING_OBJ RGW_ATTR_PREFIX "committed-logging-obj"
+
+int get_committed_logging_object(RadosStore* store,
+ const std::string& obj_name_oid,
+ const rgw_pool& data_pool,
+ optional_yield y,
+ const DoutPrefixProvider *dpp,
+ std::string& last_committed) {
+ librados::IoCtx io_ctx;
+ if (const int ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx); ret < 0) {
+ return -EIO;
+ }
+ if (!io_ctx.is_valid()) {
+ return -EIO;
+ }
+ bufferlist bl;
+ librados::ObjectReadOperation op;
+ int rval;
+ op.getxattr(RGW_ATTR_COMMITTED_LOGGING_OBJ, &bl, &rval);
+ if (const int ret = rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), nullptr, y); ret < 0) {
+ return ret;
+ }
+ last_committed = bl.to_str();
+ return 0;
+}
+
+int set_committed_logging_object(RadosStore* store,
+ const std::string& obj_name_oid,
+ const rgw_pool& data_pool,
+ optional_yield y,
+ const DoutPrefixProvider *dpp,
+ const std::string& last_committed) {
+ librados::IoCtx io_ctx;
+ if (const int ret = rgw_init_ioctx(dpp, store->getRados()->get_rados_handle(), data_pool, io_ctx); ret < 0) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to get IO context when setting last committed logging object name from data pool:" << data_pool.to_str() << dendl;
+ return -EIO;
+ }
+ if (!io_ctx.is_valid()) {
+ ldpp_dout(dpp, 1) << "ERROR: invalid IO context when setting last committed logging object name" << dendl;
+ return -EIO;
+ }
+
+ bufferlist bl;
+ bl.append(last_committed);
+ librados::ObjectWriteOperation op;
+ op.setxattr(RGW_ATTR_COMMITTED_LOGGING_OBJ, std::move(bl));
+ return rgw_rados_operate(dpp, io_ctx, obj_name_oid, std::move(op), y);
+}
+
int RadosBucket::get_logging_object_name(std::string& obj_name,
const std::string& prefix,
optional_yield y,
objv_tracker,
ceph::real_time::clock::now(),
y,
- nullptr);
+ no_change_attrs());
if (ret == -EEXIST) {
ldpp_dout(dpp, 20) << "INFO: race detected in initializing '" << obj_name_oid << "' with logging object name:'" << obj_name << "'. ret = " << ret << dendl;
} else if (ret == -ECANCELED) {
y);
}
-int RadosBucket::commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) {
+int RadosBucket::commit_logging_object(const std::string& obj_name,
+ optional_yield y,
+ const DoutPrefixProvider *dpp,
+ const std::string& prefix,
+ std::string* last_committed) {
rgw_pool data_pool;
const rgw_obj head_obj{get_key(), obj_name};
const auto placement_rule = get_placement_rule();
"' when committing logging object" << dendl;
return -EIO;
}
+ const auto obj_name_oid = bucketlogging::object_name_oid(this, prefix);
+ if (last_committed) {
+ if (const int ret = get_committed_logging_object(store,
+ obj_name_oid,
+ data_pool,
+ y,
+ dpp,
+ *last_committed); ret < 0) {
+ if (ret != -ENODATA) {
+ ldpp_dout(dpp, 1) << "ERROR: failed to get last committed logging object name from bucket '" << get_key() <<
+ "' when committing logging object. ret = " << ret << dendl;
+ return ret;
+ }
+ ldpp_dout(dpp, 5) << "WARNING: no last committed logging object name found for bucket '" << get_key() <<
+ "' when committing logging object" << dendl;
+ last_committed->clear();
+ }
+ }
const auto temp_obj_name = to_temp_object_name(this, obj_name);
std::map<string, bufferlist> obj_attrs;
y,
dpp,
&obj_attrs,
- nullptr); ret < 0 && ret != -ENOENT) {
- ldpp_dout(dpp, 1) << "ERROR: failed to read logging data when committing object '" << temp_obj_name
- << ". error: " << ret << dendl;
+ nullptr); ret < 0) {
+ if (ret == -ENOENT) {
+ ldpp_dout(dpp, 5) << "WARNING: temporary logging object '" << temp_obj_name << "' does not exists" << dendl;
+ } else {
+ ldpp_dout(dpp, 1) << "ERROR: failed to read logging data when committing object '" << temp_obj_name
+ << ". error: " << ret << dendl;
+ }
return ret;
- } else if (ret == -ENOENT) {
- ldpp_dout(dpp, 1) << "WARNING: temporary logging object '" << temp_obj_name << "' does not exists" << dendl;
- return 0;
}
uint64_t size = bl_data.length();
"' to bucket '" << get_key() <<"'. error: " << ret << dendl;
return ret;
}
+
ldpp_dout(dpp, 20) << "INFO: committed logging object '" << temp_obj_name <<
"' with size of " << size << " bytes, to bucket '" << get_key() << "' as '" <<
obj_name << "'" << dendl;
+
+ if (const int ret = set_committed_logging_object(store,
+ obj_name_oid,
+ data_pool,
+ y,
+ dpp,
+ obj_name); ret < 0) {
+ ldpp_dout(dpp, 5) << "WARNING: object was committed, but we failed to set last committed logging object name. ret = " << ret << dendl;
+ } else {
+ ldpp_dout(dpp, 20) << "INFO: last committed logging object name was set to '" << obj_name << "'" << dendl;
+ }
+ if (last_committed) {
+ *last_committed = obj_name;
+ }
return 0;
}
optional_yield y,
const DoutPrefixProvider *dpp,
RGWObjVersionTracker* objv_tracker) override;
- int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override;
+ int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override;
int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override;
int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override;
cerr << "ERROR: failed to get pending logging object name from target bucket '" << configuration.target_bucket << "'" << std::endl;
return -ret;
}
- const auto old_obj = obj_name;
+ std::string old_obj;
const auto region = driver->get_zone()->get_zonegroup().get_api_name();
- ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, dpp(), region, bucket, null_yield, true, &objv_tracker);
+ ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, dpp(), region, bucket, null_yield, true, &objv_tracker, &old_obj);
if (ret < 0) {
- cerr << "ERROR: failed to flush pending logging object '" << old_obj
- << "' to target bucket '" << configuration.target_bucket << "'" << std::endl;
+ if (ret == -ENOENT) {
+ cerr << "WARNING: no pending logging object '" << obj_name << "'. nothing to flush";
+ ret = 0;
+ } else {
+ cerr << "ERROR: failed flush pending logging object '" << obj_name << "'";
+ }
+ cerr << " to target bucket '" << configuration.target_bucket << "'. "
+ << " last committed object is '" << old_obj << "'" << std::endl;
return -ret;
}
cout << "flushed pending logging object '" << old_obj
const DoutPrefixProvider *dpp,
rgw::sal::Driver* driver,
const std::string& tenant_name,
- optional_yield y) {
+ optional_yield y,
+ std::string* last_committed) {
std::string target_bucket_name;
std::string target_tenant_name;
int ret = rgw_parse_url_bucket(conf.target_bucket, tenant_name, target_tenant_name, target_bucket_name);
<< ret << dendl;
return ret;
}
- return commit_logging_object(conf, target_bucket, dpp, y);
+ return commit_logging_object(conf, target_bucket, dpp, y, last_committed);
}
int commit_logging_object(const configuration& conf,
const std::unique_ptr<rgw::sal::Bucket>& target_bucket,
const DoutPrefixProvider *dpp,
- optional_yield y) {
+ optional_yield y,
+ std::string* last_committed) {
std::string obj_name;
if (const int ret = target_bucket->get_logging_object_name(obj_name, conf.target_prefix, y, dpp, nullptr); ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to get name of logging object of bucket '" <<
target_bucket->get_key() << "'. ret = " << ret << dendl;
return ret;
}
- if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp); ret <0 ) {
+ if (const int ret = target_bucket->commit_logging_object(obj_name, y, dpp, conf.target_prefix, last_committed); ret < 0) {
ldpp_dout(dpp, 1) << "ERROR: failed to commit logging object '" << obj_name << "' of bucket '" <<
target_bucket->get_key() << "'. ret = " << ret << dendl;
return ret;
const std::unique_ptr<rgw::sal::Bucket>& source_bucket,
optional_yield y,
bool must_commit,
- RGWObjVersionTracker* objv_tracker) {
+ RGWObjVersionTracker* objv_tracker,
+ std::string* last_committed) {
std::string target_bucket_name;
std::string target_tenant_name;
std::ignore = rgw_parse_url_bucket(conf.target_bucket, target_bucket->get_tenant(), target_tenant_name, target_bucket_name);
target_bucket->get_key() << "'. ret = " << ret << dendl;
return ret;
}
- if (const int ret = target_bucket->commit_logging_object(old_obj, y, dpp); ret < 0) {
+ if (const int ret = target_bucket->commit_logging_object(old_obj, y, dpp, conf.target_prefix, last_committed); ret < 0) {
if (must_commit) {
return ret;
}
if (ceph::coarse_real_time::clock::now() > time_to_commit) {
ldpp_dout(dpp, 20) << "INFO: logging object '" << obj_name << "' exceeded its time, will be committed to bucket '" <<
target_bucket_id << "'" << dendl;
- if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker); ret < 0) {
+ if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, false, &objv_tracker, nullptr); ret < 0) {
return ret;
}
} else {
if (ret == -EFBIG) {
ldpp_dout(dpp, 5) << "WARNING: logging object '" << obj_name << "' is full, will be committed to bucket '" <<
target_bucket->get_key() << "'" << dendl;
- if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, nullptr); ret < 0 ) {
+ if (ret = rollover_logging_object(conf, target_bucket, obj_name, dpp, region, s->bucket, y, true, nullptr, nullptr); ret < 0 ) {
return ret;
}
if (ret = target_bucket->write_logging_object(obj_name,
return 0;
}
const auto& info = bucket->get_info();
- if (const int ret = commit_logging_object(*conf, dpp, driver, info.bucket.tenant, y); ret < 0) {
+ if (const int ret = commit_logging_object(*conf, dpp, driver, info.bucket.tenant, y, nullptr); ret < 0) {
ldpp_dout(dpp, 5) << "WARNING: could not commit pending logging object of bucket '" <<
bucket->get_key() << "' during cleanup. ret = " << ret << dendl;
} else {
// commit the pending log objec to the log bucket
// and create a new pending log object
// if "must_commit" is "false" the function will return success even if the pending log object was not committed
+// if "last_committed" is not null, it will be set to the name of the last committed object
int rollover_logging_object(const configuration& conf,
const std::unique_ptr<rgw::sal::Bucket>& bucket,
std::string& obj_name,
const std::unique_ptr<rgw::sal::Bucket>& source_bucket,
optional_yield y,
bool must_commit,
- RGWObjVersionTracker* objv_tracker);
+ RGWObjVersionTracker* objv_tracker,
+ std::string* last_committed);
// commit the pending log object to the log bucket
// use this for cleanup, when new pending object is not needed
// and target bucket is known
+// if "last_committed" is not null, it will be set to the name of the last committed object
int commit_logging_object(const configuration& conf,
const std::unique_ptr<rgw::sal::Bucket>& target_bucket,
const DoutPrefixProvider *dpp,
- optional_yield y);
+ optional_yield y,
+ std::string* last_committed);
// commit the pending log object to the log bucket
// use this for cleanup, when new pending object is not needed
// and target bucket shoud be loaded based on the configuration
+// if "last_committed" is not null, it will be set to the name of the last committed object
int commit_logging_object(const configuration& conf,
const DoutPrefixProvider *dpp,
rgw::sal::Driver* driver,
const std::string& tenant_name,
- optional_yield y);
+ optional_yield y,
+ std::string* last_committed);
// return the oid of the object holding the name of the temporary logging object
// bucket - log bucket
}
} else if (*old_conf != configuration) {
// conf changed - do cleanup
- if (const auto ret = commit_logging_object(*old_conf, target_bucket, this, y); ret < 0) {
+ if (const auto ret = commit_logging_object(*old_conf, target_bucket, this, y, nullptr); ret < 0) {
ldpp_dout(this, 1) << "WARNING: could not commit pending logging object when updating logging configuration of bucket '" <<
src_bucket->get_key() << "', ret = " << ret << dendl;
} else {
ldpp_dout(this, 1) << "ERROR: failed to get pending logging object name from target bucket '" << target_bucket_id << "'" << dendl;
return;
}
- old_obj = obj_name;
const auto region = driver->get_zone()->get_zonegroup().get_api_name();
- op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, null_yield, true, &objv_tracker);
+ op_ret = rgw::bucketlogging::rollover_logging_object(configuration, target_bucket, obj_name, this, region, source_bucket, null_yield, true, &objv_tracker, &old_obj);
if (op_ret < 0) {
- ldpp_dout(this, 1) << "ERROR: failed to flush pending logging object '" << old_obj
- << "' to target bucket '" << target_bucket_id << "'" << dendl;
+ if (op_ret == -ENOENT) {
+ ldpp_dout(this, 5) << "WARNING: no pending logging object '" << obj_name << "'. nothing to flush"
+ << " to target bucket '" << target_bucket_id << "'. "
+ << " last committed object is '" << old_obj << "'" << dendl;
+ op_ret = 0;
+ } else {
+ ldpp_dout(this, 1) << "ERROR: failed flush pending logging object '" << obj_name << "'"
+ << " to target bucket '" << target_bucket_id << "'. "
+ << " last committed object is '" << old_obj << "'" << dendl;
+ }
return;
}
ldpp_dout(this, 20) << "INFO: flushed pending logging object '" << old_obj
optional_yield y,
const DoutPrefixProvider *dpp,
RGWObjVersionTracker* objv_tracker) = 0;
- /** Move the pending bucket logging object into the bucket */
- virtual int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) = 0;
+ /** Move the pending bucket logging object into the bucket
+ if "last_committed" is not null, it will be set to the name of the last committed object
+ * */
+ virtual int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) = 0;
//** Remove the pending bucket logging object */
virtual int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) = 0;
/** Write a record to the pending bucket logging object */
RGWObjVersionTracker* objv_tracker) override {
return next->remove_logging_object_name(prefix, y, dpp, objv_tracker);
}
- int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp)override {
- return next->commit_logging_object(obj_name, y, dpp);
+ int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override {
+ return next->commit_logging_object(obj_name, y, dpp, prefix, last_committed);
}
int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override {
return next->remove_logging_object(obj_name, y, dpp);
optional_yield y,
const DoutPrefixProvider *dpp,
RGWObjVersionTracker* objv_tracker) override { return 0; }
- int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; }
+ int commit_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp, const std::string& prefix, std::string* last_committed) override { return 0; }
int remove_logging_object(const std::string& obj_name, optional_yield y, const DoutPrefixProvider *dpp) override { return 0; }
int write_logging_object(const std::string& obj_name, const std::string& record, optional_yield y, const DoutPrefixProvider *dpp, bool async_completion) override {
return 0;