string upload_id;
protected:
- bool immutable_head() { return true; }
- int prepare(RGWRados *store, void *obj_ctx);
+ int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
public:
+ bool immutable_head() { return true; }
RGWPutObjProcessor_Multipart(const string& bucket_owner, uint64_t _p, req_state *_s) :
RGWPutObjProcessor_Atomic(bucket_owner, _s->bucket, _s->object_str, _p, _s->req_id), s(_s) {}
};
-int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx)
+int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
- RGWPutObjProcessor::prepare(store, obj_ctx);
+ RGWPutObjProcessor::prepare(store, obj_ctx, NULL);
string oid = obj_str;
upload_id = s->info.args.get("uploadId");
- mp.init(oid, upload_id);
+ if (!oid_rand) {
+ mp.init(oid, upload_id);
+ } else {
+ mp.init(oid, upload_id, *oid_rand);
+ }
part_num = s->info.args.get("partNumber");
if (part_num.empty()) {
return -EINVAL;
}
- string upload_prefix = oid + "." + upload_id;
+ string upload_prefix = oid + ".";
+
+ if (!oid_rand) {
+ upload_prefix.append(upload_id);
+ } else {
+ upload_prefix.append(*oid_rand);
+ }
rgw_obj target_obj;
target_obj.init(bucket, oid);
}
-RGWPutObjProcessor *RGWPutObj::select_processor()
+RGWPutObjProcessor *RGWPutObj::select_processor(bool *is_multipart)
{
RGWPutObjProcessor *processor;
processor = new RGWPutObjProcessor_Multipart(bucket_owner, part_size, s);
}
+ if (is_multipart) {
+ *is_multipart = multipart;
+ }
+
return processor;
}
map<string, bufferlist> attrs;
int len;
map<string, string>::iterator iter;
+ bool multipart;
perfcounter->inc(l_rgw_put);
supplied_md5[sizeof(supplied_md5) - 1] = '\0';
}
- processor = select_processor();
+ processor = select_processor(&multipart);
- ret = processor->prepare(store, s->obj_ctx);
+ ret = processor->prepare(store, s->obj_ctx, NULL);
if (ret < 0)
goto done;
hash.Update(data_ptr, len);
- ret = processor->throttle_data(handle);
- if (ret < 0)
- goto done;
+ /* do we need this operation to be synchronous? if we're dealing with an object with immutable
+ * head, e.g., multipart object we need to make sure we're the first one writing to this object
+ */
+ bool need_to_wait = (ofs == 0) && multipart;
+
+ ret = processor->throttle_data(handle, need_to_wait);
+ if (ret < 0) {
+ if (!need_to_wait || ret != -EEXIST) {
+ ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
+ goto done;
+ }
+
+ ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
+
+ /* restart processing with different oid suffix */
+
+ dispose_processor(processor);
+ processor = select_processor(&multipart);
+
+ string oid_rand;
+ char buf[33];
+ gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+ oid_rand.append(buf);
+
+ ret = processor->prepare(store, s->obj_ctx, &oid_rand);
+ if (ret < 0) {
+ ldout(s->cct, 0) << "ERROR: processor->prepare() returned " << ret << dendl;
+ goto done;
+ }
+
+ ret = processor->handle_data(data, ofs, &handle);
+ if (ret < 0) {
+ ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
+ goto done;
+ }
+
+ ret = processor->throttle_data(handle, false);
+ if (ret < 0) {
+ ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
+ goto done;
+ }
+ }
ofs += len;
} while (len > 0);
processor = select_processor();
- ret = processor->prepare(store, s->obj_ctx);
+ ret = processor->prepare(store, s->obj_ctx, NULL);
if (ret < 0)
goto done;
hash.Update(data_ptr, len);
- ret = processor->throttle_data(handle);
+ ret = processor->throttle_data(handle, false);
if (ret < 0)
goto done;
policy.set_ctx(s->cct);
}
- RGWPutObjProcessor *select_processor();
+ RGWPutObjProcessor *select_processor(bool *is_multipart);
void dispose_processor(RGWPutObjProcessor *processor);
int verify_permission();
string upload_id;
public:
RGWMPObj() {}
- RGWMPObj(string& _oid, string& _upload_id) {
- init(_oid, _upload_id);
+ RGWMPObj(const string& _oid, const string& _upload_id) {
+ init(_oid, _upload_id, _upload_id);
+ }
+ void init(const string& _oid, const string& _upload_id) {
+ init(_oid, _upload_id, _upload_id);
}
- void init(string& _oid, string& _upload_id) {
+ void init(const string& _oid, const string& _upload_id, const string& part_unique_str) {
if (_oid.empty()) {
clear();
return;
}
oid = _oid;
upload_id = _upload_id;
- prefix = oid;
- prefix.append(".");
- prefix.append(upload_id);
- meta = prefix;
- meta.append(MP_META_SUFFIX);
+ prefix = oid + ".";
+ meta = prefix + upload_id + MP_META_SUFFIX;
+ prefix.append(part_unique_str);
}
string& get_meta() { return meta; }
string get_part(int num) {
return false;
oid = meta.substr(0, mid_pos);
upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1);
- init(oid, upload_id);
+ init(oid, upload_id, upload_id);
return true;
}
void clear() {
manifest->set_head(_h);
last_ofs = 0;
- char buf[33];
- gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
-
if (manifest->get_prefix().empty()) {
+ char buf[33];
+ gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
+
string oid_prefix = ".";
oid_prefix.append(buf);
oid_prefix.append("_");
}
}
-int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx)
+int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
- RGWPutObjProcessor::prepare(store, obj_ctx);
+ RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);
obj.init(bucket, obj_str);
}
-int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle)
+int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive)
{
if ((uint64_t)abs_ofs + bl.length() > obj_len)
obj_len = abs_ofs + bl.length();
int r = store->aio_put_obj_data(NULL, obj,
bl,
((ofs != 0) ? ofs : -1),
- false, phandle);
+ exclusive, phandle);
return r;
}
return ret;
}
-int RGWPutObjProcessor_Aio::throttle_data(void *handle)
+int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
{
if (handle) {
struct put_obj_aio_info info;
pending.push_back(info);
}
size_t orig_size = pending.size();
- while (pending_has_completed()) {
+ while (pending_has_completed()
+ || need_to_wait) {
int r = wait_pending_front();
if (r < 0)
return r;
+
+ need_to_wait = false;
}
/* resize window in case messages are draining too fast */
return 0;
}
-int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle)
+int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive)
{
if (ofs >= next_part_ofs) {
int r = prepare_next_part(ofs);
}
}
- return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
+ return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}
int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle)
}
off_t write_ofs = data_ofs;
data_ofs = write_ofs + bl.length();
- return write_data(bl, write_ofs, phandle);
+ bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
+ we could be racing with another upload, to the same
+ object and cleanup can be messy */
+ return write_data(bl, write_ofs, phandle, exclusive);
}
-int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
+int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
- RGWPutObjProcessor::prepare(store, obj_ctx);
+ RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);
head_obj.init(bucket, obj_str);
}
if (pending_data_bl.length()) {
void *handle;
- int r = write_data(pending_data_bl, data_ofs, &handle);
+ int r = write_data(pending_data_bl, data_ofs, &handle, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
return r;
}
- r = throttle_data(handle);
+ r = throttle_data(handle, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
return r;
}
}
- ret = processor->throttle_data(handle);
+ ret = processor->throttle_data(handle, false);
if (ret < 0)
return ret;
RGWPutObjProcessor_Atomic processor(dest_bucket_info.owner, dest_obj.bucket, dest_obj.object,
cct->_conf->rgw_obj_stripe_size, tag);
- ret = processor.prepare(this, ctx);
+ ret = processor.prepare(this, ctx, NULL);
if (ret < 0)
return ret;
public:
RGWPutObjProcessor(const string& _bo) : store(NULL), obj_ctx(NULL), is_complete(false), bucket_owner(_bo) {}
virtual ~RGWPutObjProcessor();
- virtual int prepare(RGWRados *_store, void *_o) {
+ virtual int prepare(RGWRados *_store, void *_o, string *oid_rand) {
store = _store;
obj_ctx = _o;
return 0;
};
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
- virtual int throttle_data(void *handle) = 0;
+ virtual int throttle_data(void *handle, bool need_to_wait) = 0;
virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
};
off_t ofs;
protected:
- int prepare(RGWRados *store, void *obj_ctx);
+ int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
int handle_data(bufferlist& bl, off_t ofs, void **phandle);
int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
public:
- int throttle_data(void *handle) { return 0; }
+ int throttle_data(void *handle, bool need_to_wait) { return 0; }
RGWPutObjProcessor_Plain(const string& bucket_owner, rgw_bucket& b, const string& o) : RGWPutObjProcessor(bucket_owner),
bucket(b), obj_str(o), ofs(0) {}
};
uint64_t obj_len;
int drain_pending();
- int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle);
+ int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
public:
- int throttle_data(void *handle);
+ int throttle_data(void *handle, bool need_to_wait);
RGWPutObjProcessor_Aio(const string& bucket_owner) : RGWPutObjProcessor(bucket_owner), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
virtual ~RGWPutObjProcessor_Aio() {
RGWObjManifest manifest;
RGWObjManifest::generator manifest_gen;
- virtual bool immutable_head() { return false; }
-
- int write_data(bufferlist& bl, off_t ofs, void **phandle);
+ int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive);
virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
int prepare_next_part(off_t ofs);
bucket(_b),
obj_str(_o),
unique_tag(_t) {}
- int prepare(RGWRados *store, void *obj_ctx);
+ int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
+ virtual bool immutable_head() { return false; }
void set_extra_data_len(uint64_t len) {
extra_data_len = len;
}
- int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+ virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle);
bufferlist& get_extra_data() { return extra_data_bl; }
};