if (r < 0)
return r;
- is_complete = true;
+ is_complete = canceled;
return 0;
}
return r;
}
+ canceled = obj_op.meta.canceled;
+
return 0;
}
}
}
+ meta.canceled = false;
+
/* update quota cache */
store->quota_handler->update_stats(meta.owner, bucket, (orig_exists ? 0 : 1), size, orig_size);
ldout(store->ctx(), 0) << "ERROR: index_op.cancel()() returned ret=" << ret << dendl;
}
+ meta.canceled = true;
+
/* we lost in a race. There are a few options:
* - existing object was rewritten (ECANCELED)
* - non existing object was created (EEXIST)
int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs) {
return processor->complete(etag, mtime, set_mtime, attrs);
}
+
+ bool is_canceled() {
+ return processor->is_canceled();
+ }
};
/*
RGWRESTStreamReadRequest *in_stream_req;
string tag;
map<string, bufferlist> src_attrs;
+ int i;
append_rand_alpha(cct, tag, tag, 32);
RGWPutObjProcessor_Atomic processor(obj_ctx,
attrs = src_attrs;
}
- ret = cb.complete(etag, mtime, set_mtime, attrs);
- if (ret < 0) {
+#define MAX_COMPLETE_RETRY 100
+ for (i = 0; i < MAX_COMPLETE_RETRY; i++) {
+ ret = cb.complete(etag, mtime, set_mtime, attrs);
+ if (ret < 0) {
+ goto set_err_state;
+ }
+ if (copy_if_newer && cb.is_canceled()) {
+ ldout(cct, 20) << "raced with another write of obj: " << dest_obj << dendl;
+ obj_ctx.invalidate(dest_obj); /* object was overwritten */
+ ret = get_obj_state(&obj_ctx, dest_obj, &dest_state, NULL);
+ if (ret < 0) {
+ ldout(cct, 0) << "ERROR: " << __func__ << ": get_err_state() returned ret=" << ret << dendl;
+ goto set_err_state;
+ }
+ if (!dest_state->exists ||
+ dest_state->mtime < set_mtime) {
+ ldout(cct, 20) << "retrying writing object mtime=" << set_mtime << " dest_state->mtime=" << dest_state->mtime << " dest_state->exists=" << dest_state->exists << dendl;
+ continue;
+ } else {
+ ldout(cct, 20) << "not retrying writing object mtime=" << set_mtime << " dest_state->mtime=" << dest_state->mtime << " dest_state->exists=" << dest_state->exists << dendl;
+ }
+ }
+ break;
+ }
+
+ if (i == MAX_COMPLETE_RETRY) {
+ ldout(cct, 0) << "ERROR: retried object completion too many times, something is wrong!" << dendl;
+ ret = -EIO;
goto set_err_state;
}
return 0;
set_err_state:
- int r = opstate.set_state(RGWOpState::OPSTATE_ERROR);
+ RGWOpState::OpState state = RGWOpState::OPSTATE_ERROR;
+ if (copy_if_newer && ret == -ERR_NOT_MODIFIED) {
+ state = RGWOpState::OPSTATE_COMPLETE;
+ ret = 0;
+ }
+ int r = opstate.set_state(state);
if (r < 0) {
ldout(cct, 0) << "ERROR: failed to set opstate r=" << ret << dendl;
}
const char *if_match;
const char *if_nomatch;
uint64_t olh_epoch;
+ bool canceled;
MetaParams() : mtime(NULL), rmattrs(NULL), data(NULL), manifest(NULL), ptag(NULL),
remove_objs(NULL), set_mtime(0), category(RGW_OBJ_CATEGORY_MAIN), flags(0),
- if_match(NULL), if_nomatch(NULL), olh_epoch(0) {}
+ if_match(NULL), if_nomatch(NULL), olh_epoch(0), canceled(false) {}
} meta;
Write(RGWRados::Object *_target) : target(_target) {}
RGWObjectCtx& obj_ctx;
bool is_complete;
RGWBucketInfo bucket_info;
+ bool canceled;
virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime,
map<string, bufferlist>& attrs,
const char *if_match = NULL, const char *if_nomatch = NULL) = 0;
public:
- RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_info(_bi) {}
+ RGWPutObjProcessor(RGWObjectCtx& _obj_ctx, RGWBucketInfo& _bi) : store(NULL), obj_ctx(_obj_ctx), is_complete(false), bucket_info(_bi), canceled(false) {}
virtual ~RGWPutObjProcessor() {}
virtual int prepare(RGWRados *_store, string *oid_rand) {
store = _store;
const char *if_match = NULL, const char *if_nomatch = NULL);
CephContext *ctx();
+
+ bool is_canceled() { return canceled; }
};
struct put_obj_aio_info {