return rgw_obj_key::oid_to_key_in_ns(oid, &key, empty_ns);
}
-int rgw_remove_object(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, rgw::sal::Bucket* bucket, rgw_obj_key& key)
+int rgw_remove_object(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, rgw::sal::Bucket* bucket, rgw_obj_key& key,
+ const bool force)
{
if (key.instance.empty()) {
key.instance = "null";
std::unique_ptr<rgw::sal::Object> object = bucket->get_object(key);
- return object->delete_object(dpp, null_yield, rgw::sal::FLAG_LOG_OP);
+ const uint32_t possible_force_flag = force ? rgw::sal::FLAG_FORCE_OP : 0;
+
+ return object->delete_object(dpp, null_yield, rgw::sal::FLAG_LOG_OP | possible_force_flag);
}
static void set_err_msg(std::string *sink, std::string msg)
static RGWBucketInstanceMetadataHandlerBase *alloc(rgw::sal::Driver* driver);
};
-extern int rgw_remove_object(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, rgw::sal::Bucket* bucket, rgw_obj_key& key);
+extern int rgw_remove_object(const DoutPrefixProvider *dpp, rgw::sal::Driver* driver, rgw::sal::Bucket* bucket, rgw_obj_key& key,
+ const bool force = false);
extern int rgw_object_get_attr(rgw::sal::Driver* driver, rgw::sal::Object* obj,
const char* attr_name, bufferlist& out_bl,
* Delete an object.
* bucket: name of the bucket storing the object
* obj: name of the object to delete
+ * force: if b.i. entry exists but head object does not, still remove entry
* Returns: 0 on success, -ERR# otherwise.
*/
-int RGWRados::Object::Delete::delete_obj(optional_yield y, const DoutPrefixProvider *dpp, bool log_op)
+int RGWRados::Object::Delete::delete_obj(optional_yield y,
+ const DoutPrefixProvider* dpp,
+ bool log_op,
+ const bool force)
{
RGWRados *store = target->get_store();
const rgw_obj& src_obj = target->get_obj();
}
result.delete_marker = dirent.is_delete_marker();
r = store->unlink_obj_instance(dpp, target->get_ctx(), target->get_bucket_info(), obj, params.olh_epoch,
- y, params.zones_trace, add_log);
+ y, params.zones_trace, add_log, force);
if (r < 0) {
return r;
}
}
if (!state->exists) {
- target->invalidate_state();
- return -ENOENT;
+ if (!force) {
+ target->invalidate_state();
+ return -ENOENT;
+ } else {
+ ldpp_dout(dpp, 5) << "WARNING: head for \"" << src_obj <<
+ "\" does not exist; will continue with deleting bucket "
+ "index entry(ies)" << dendl;
+ }
}
r = target->prepare_atomic_modification(dpp, op, false, NULL, NULL, NULL, true, false, y);
uint16_t bilog_flags,
const real_time& expiration_time,
rgw_zone_set *zones_trace,
- bool log_op)
+ bool log_op,
+ const bool force) // force removal even if head object is broken
{
RGWRados::Object del_target(this, bucket_info, obj_ctx, obj);
RGWRados::Object::Delete del_op(&del_target);
del_op.params.expiration_time = expiration_time;
del_op.params.zones_trace = zones_trace;
- return del_op.delete_obj(null_yield, dpp, log_op ? rgw::sal::FLAG_LOG_OP : 0);
+ return del_op.delete_obj(null_yield, dpp, log_op, force);
}
int RGWRados::delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj)
std::map<uint64_t, std::vector<rgw_bucket_olh_log_entry> >& log,
uint64_t *plast_ver,
rgw_zone_set* zones_trace,
- bool log_op)
+ bool log_op,
+ const bool force)
{
if (log.empty()) {
return 0;
liter != remove_instances.end(); ++liter) {
cls_rgw_obj_key& key = *liter;
rgw_obj obj_instance(bucket, key);
- int ret = delete_obj(dpp, obj_ctx, bucket_info, obj_instance, 0, RGW_BILOG_FLAG_VERSIONED_OP, ceph::real_time(), zones_trace, log_op);
+ int ret = delete_obj(dpp, obj_ctx, bucket_info, obj_instance, 0, RGW_BILOG_FLAG_VERSIONED_OP, ceph::real_time(), zones_trace, log_op, force);
if (ret < 0 && ret != -ENOENT) {
ldpp_dout(dpp, 0) << "ERROR: delete_obj() returned " << ret << " obj_instance=" << obj_instance << dendl;
return ret;
/*
* read olh log and apply it
*/
-int RGWRados::update_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState *state, RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace, bool log_op)
+int RGWRados::update_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState *state, RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace,
+ bool log_op, const bool force)
{
map<uint64_t, vector<rgw_bucket_olh_log_entry> > log;
bool is_truncated;
if (ret < 0) {
return ret;
}
- ret = apply_olh_log(dpp, obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker, zones_trace, log_op);
+ ret = apply_olh_log(dpp, obj_ctx, *state, bucket_info, obj, state->olh_tag, log, &ver_marker, zones_trace, log_op, force);
if (ret < 0) {
return ret;
}
}
int RGWRados::unlink_obj_instance(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj,
- uint64_t olh_epoch, optional_yield y, rgw_zone_set *zones_trace, bool log_op)
+ uint64_t olh_epoch, optional_yield y, rgw_zone_set *zones_trace, bool log_op, const bool force)
{
string op_tag;
// it's possible that the pending xattr from this op prevented the olh
// object from being cleaned by another thread that was deleting the last
// existing version. We invoke a best-effort update_olh here to handle this case.
- int r = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, zones_trace, log_op);
+ int r = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, zones_trace, log_op, force);
if (r < 0 && r != -ECANCELED) {
ldpp_dout(dpp, 20) << "update_olh() target_obj=" << olh_obj << " returned " << r << dendl;
}
return -EIO;
}
- ret = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, zones_trace, log_op);
+ ret = update_olh(dpp, obj_ctx, state, bucket_info, olh_obj, zones_trace, log_op, force);
if (ret == -ECANCELED) { /* already did what we needed, no need to retry, raced with another user */
return 0;
}
explicit Delete(RGWRados::Object *_target) : target(_target) {}
- int delete_obj(optional_yield y, const DoutPrefixProvider *dpp, bool log_op = true);
- };
+ int delete_obj(optional_yield y,
+ const DoutPrefixProvider* dpp,
+ bool log_op,
+ const bool force); // if head object missing, do a best effort
+ }; // struct RGWRados::Object::Delete
struct Stat {
RGWRados::Object *source;
uint16_t bilog_flags = 0,
const ceph::real_time& expiration_time = ceph::real_time(),
rgw_zone_set *zones_trace = nullptr,
- bool log_op = true);
+ bool log_op = true,
+ const bool force = false); // if head object missing, do a best effort
int delete_raw_obj(const DoutPrefixProvider *dpp, const rgw_raw_obj& obj);
int bucket_index_clear_olh(const DoutPrefixProvider *dpp, RGWBucketInfo& bucket_info, const std::string& olh_tag, const rgw_obj& obj_instance);
int apply_olh_log(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState& obj_state, RGWBucketInfo& bucket_info, const rgw_obj& obj,
bufferlist& obj_tag, std::map<uint64_t, std::vector<rgw_bucket_olh_log_entry> >& log,
- uint64_t *plast_ver, rgw_zone_set *zones_trace = nullptr, bool log_op = true);
- int update_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState *state, RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace = nullptr, bool log_op = true);
+ uint64_t *plast_ver, rgw_zone_set *zones_trace = nullptr, bool log_op = true,
+ const bool force = false);
+ int update_olh(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWObjState *state, RGWBucketInfo& bucket_info, const rgw_obj& obj, rgw_zone_set *zones_trace = nullptr,
+ bool log_op = true, const bool force = false);
int clear_olh(const DoutPrefixProvider *dpp,
RGWObjectCtx& obj_ctx,
const rgw_obj& obj,
int repair_olh(const DoutPrefixProvider *dpp, RGWObjState* state, const RGWBucketInfo& bucket_info,
const rgw_obj& obj);
int unlink_obj_instance(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info, const rgw_obj& target_obj,
- uint64_t olh_epoch, optional_yield y, rgw_zone_set *zones_trace = nullptr, bool log_op = true);
+ uint64_t olh_epoch, optional_yield y, rgw_zone_set *zones_trace = nullptr,
+ bool log_op = true, const bool force = false);
void check_pending_olh_entries(const DoutPrefixProvider *dpp, std::map<std::string, bufferlist>& pending_entries, std::map<std::string, bufferlist> *rm_pending_entries);
int remove_olh_pending_entries(const DoutPrefixProvider *dpp, const RGWBucketInfo& bucket_info, RGWObjState& state, const rgw_obj& olh_obj, std::map<std::string, bufferlist>& pending_attrs);
parent_op(&op_target)
{ }
+
int RadosObject::RadosDeleteOp::delete_obj(const DoutPrefixProvider* dpp, optional_yield y, uint32_t flags)
{
parent_op.params.bucket_owner = params.bucket_owner.get_id();
parent_op.params.abortmp = params.abortmp;
parent_op.params.parts_accounted_size = params.parts_accounted_size;
- int ret = parent_op.delete_obj(y, dpp, flags & FLAG_LOG_OP);
- if (ret < 0)
+ int ret = parent_op.delete_obj(y, dpp, flags & FLAG_LOG_OP, flags & FLAG_FORCE_OP);
+ if (ret < 0) {
return ret;
+ }
result.delete_marker = parent_op.result.delete_marker;
result.version_id = parent_op.result.version_id;
return ret;
-}
+} // RadosObject::RadosDeleteOp::delete_obj
int RadosObject::delete_object(const DoutPrefixProvider* dpp,
optional_yield y,
del_op.params.versioning_status = (flags & FLAG_PREVENT_VERSIONING)
? 0 : bucket->get_info().versioning_status();
- return del_op.delete_obj(y, dpp, flags & FLAG_LOG_OP);
-}
+ // convert flags to bool params
+ return del_op.delete_obj(y, dpp, flags & FLAG_LOG_OP, flags & FLAG_FORCE_OP);
+} // RadosObject::delete_object
int RadosObject::delete_obj_aio(const DoutPrefixProvider* dpp, RGWObjState* astate,
Completions* aio, bool keep_index_consistent,
cerr << "ERROR: could not init bucket: " << cpp_strerror(-ret) << std::endl;
return -ret;
}
+
rgw_obj_key key(object, object_version);
- ret = rgw_remove_object(dpp(), driver, bucket.get(), key);
+ ret = rgw_remove_object(dpp(), driver, bucket.get(), key, yes_i_really_mean_it);
if (ret < 0) {
cerr << "ERROR: object remove returned: " << cpp_strerror(-ret) << std::endl;
return -ret;
static constexpr uint32_t FLAG_LOG_OP = 0x0001;
static constexpr uint32_t FLAG_PREVENT_VERSIONING = 0x0002;
+// if cannot do all elements of op, do as much as possible (e.g.,
+// delete object where head object is missing)
+static constexpr uint32_t FLAG_FORCE_OP = 0x0004;
+
// a simple streaming data processing abstraction
/**
* @brief A simple streaming data processing abstraction