req_state *s;
protected:
- bool immutable_head() { return true; }
- int prepare(RGWRados *store, void *obj_ctx);
+ int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
public:
+ bool immutable_head() { return true; }
RGWPutObjProcessor_Multipart(uint64_t _p, req_state *_s) : RGWPutObjProcessor_Atomic(_s->bucket, _s->object_str, _p, _s->req_id), s(_s) {}
};
-int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx)
+int RGWPutObjProcessor_Multipart::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
- RGWPutObjProcessor::prepare(store, obj_ctx);
+ RGWPutObjProcessor::prepare(store, obj_ctx, NULL);
string oid = obj_str;
string upload_id;
upload_id = s->info.args.get("uploadId");
- mp.init(oid, upload_id);
+ if (!oid_rand) {
+ mp.init(oid, upload_id);
+ } else {
+ mp.init(oid, upload_id, *oid_rand);
+ }
part_num = s->info.args.get("partNumber");
if (part_num.empty()) {
}
-RGWPutObjProcessor *RGWPutObj::select_processor()
+RGWPutObjProcessor *RGWPutObj::select_processor(bool *is_multipart)
{
RGWPutObjProcessor *processor;
processor = new RGWPutObjProcessor_Multipart(part_size, s);
}
+ if (is_multipart) {
+ *is_multipart = multipart;
+ }
+
return processor;
}
map<string, bufferlist> attrs;
int len;
map<string, string>::iterator iter;
+ bool multipart;
perfcounter->inc(l_rgw_put);
supplied_md5[sizeof(supplied_md5) - 1] = '\0';
}
- processor = select_processor();
+ processor = select_processor(&multipart);
- ret = processor->prepare(store, s->obj_ctx);
+ ret = processor->prepare(store, s->obj_ctx, NULL);
if (ret < 0)
goto done;
hash.Update(data_ptr, len);
- ret = processor->throttle_data(handle);
- if (ret < 0)
- goto done;
+ /* do we need this operation to be synchronous? if we're dealing with an object with immutable
+ * head, e.g., multipart object we need to make sure we're the first one writing to this object
+ */
+ bool need_to_wait = (ofs == 0) && multipart;
+
+ ret = processor->throttle_data(handle, need_to_wait);
+ if (ret < 0) {
+ if (!need_to_wait || ret != -EEXIST) {
+ ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
+ goto done;
+ }
+
+ ldout(s->cct, 5) << "NOTICE: processor->throttle_data() returned -EEXIST, need to restart write" << dendl;
+
+ /* restart processing with different oid suffix */
+
+ dispose_processor(processor);
+ processor = select_processor(&multipart);
+
+ string oid_rand;
+ char buf[33];
+ gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+ oid_rand.append(buf);
+
+ ret = processor->prepare(store, s->obj_ctx, &oid_rand);
+ if (ret < 0) {
+ ldout(s->cct, 0) << "ERROR: processor->prepare() returned " << ret << dendl;
+ goto done;
+ }
+
+ ret = processor->handle_data(data, ofs, &handle);
+ if (ret < 0) {
+ ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
+ goto done;
+ }
+
+ ret = processor->throttle_data(handle, false);
+ if (ret < 0) {
+ ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
+ goto done;
+ }
+ }
ofs += len;
} while (len > 0);
processor = select_processor();
- ret = processor->prepare(store, s->obj_ctx);
+ ret = processor->prepare(store, s->obj_ctx, NULL);
if (ret < 0)
goto done;
hash.Update(data_ptr, len);
- ret = processor->throttle_data(handle);
+ ret = processor->throttle_data(handle, false);
if (ret < 0)
goto done;
policy.set_ctx(s->cct);
}
- RGWPutObjProcessor *select_processor();
+ RGWPutObjProcessor *select_processor(bool *is_multipart);
void dispose_processor(RGWPutObjProcessor *processor);
int verify_permission();
string upload_id;
public:
RGWMPObj() {}
- RGWMPObj(string& _oid, string& _upload_id) {
- init(_oid, _upload_id);
+ RGWMPObj(const string& _oid, const string& _upload_id) {
+ init(_oid, _upload_id, _upload_id);
+ }
+ void init(const string& _oid, const string& _upload_id) {
+ init(_oid, _upload_id, _upload_id);
}
- void init(string& _oid, string& _upload_id) {
+ void init(const string& _oid, const string& _upload_id, const string& part_unique_str) {
if (_oid.empty()) {
clear();
return;
}
oid = _oid;
upload_id = _upload_id;
- prefix = oid;
- prefix.append(".");
- prefix.append(upload_id);
- meta = prefix;
- meta.append(MP_META_SUFFIX);
+ prefix = oid + ".";
+ meta = prefix + upload_id + MP_META_SUFFIX;
+ prefix.append(part_unique_str);
}
string& get_meta() { return meta; }
string get_part(int num) {
return false;
oid = meta.substr(0, mid_pos);
upload_id = meta.substr(mid_pos + 1, end_pos - mid_pos - 1);
- init(oid, upload_id);
+ init(oid, upload_id, upload_id);
return true;
}
void clear() {
}
}
-int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx)
+int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
- RGWPutObjProcessor::prepare(store, obj_ctx);
+ RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);
obj.init(bucket, obj_str);
}
-int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle)
+int RGWPutObjProcessor_Aio::handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive)
{
if ((uint64_t)abs_ofs + bl.length() > obj_len)
obj_len = abs_ofs + bl.length();
int r = store->aio_put_obj_data(NULL, obj,
bl,
((ofs != 0) ? ofs : -1),
- false, phandle);
+ exclusive, phandle);
return r;
}
return ret;
}
-int RGWPutObjProcessor_Aio::throttle_data(void *handle)
+int RGWPutObjProcessor_Aio::throttle_data(void *handle, bool need_to_wait)
{
if (handle) {
struct put_obj_aio_info info;
pending.push_back(info);
}
size_t orig_size = pending.size();
- while (pending_has_completed()) {
+ while (pending_has_completed()
+ || need_to_wait) {
int r = wait_pending_front();
if (r < 0)
return r;
+
+ need_to_wait = false;
}
/* resize window in case messages are draining too fast */
return 0;
}
-int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle)
+int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive)
{
if (ofs >= next_part_ofs)
prepare_next_part(ofs);
- return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle);
+ return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}
int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle)
}
off_t write_ofs = data_ofs;
data_ofs = write_ofs + bl.length();
- return write_data(bl, write_ofs, phandle);
+ bool exclusive = (!write_ofs && immutable_head()); /* immutable head object, need to verify nothing exists there
+ we could be racing with another upload, to the same
+ object and cleanup can be messy */
+ return write_data(bl, write_ofs, phandle, exclusive);
}
-int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx)
+int RGWPutObjProcessor_Atomic::prepare(RGWRados *store, void *obj_ctx, string *oid_rand)
{
- RGWPutObjProcessor::prepare(store, obj_ctx);
+ RGWPutObjProcessor::prepare(store, obj_ctx, oid_rand);
head_obj.init(bucket, obj_str);
- char buf[33];
- gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
oid_prefix.append("_");
- oid_prefix.append(buf);
+ if (!oid_rand) {
+ char buf[33];
+ gen_rand_alphanumeric(store->ctx(), buf, sizeof(buf) - 1);
+ oid_prefix.append(buf);
+ } else {
+ oid_prefix.append(*oid_rand);
+ }
oid_prefix.append("_");
return 0;
}
if (pending_data_bl.length()) {
void *handle;
- int r = write_data(pending_data_bl, data_ofs, &handle);
+ int r = write_data(pending_data_bl, data_ofs, &handle, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
return r;
}
- r = throttle_data(handle);
+ r = throttle_data(handle, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
return r;
}
}
- ret = processor->throttle_data(handle);
+ ret = processor->throttle_data(handle, false);
if (ret < 0)
return ret;
RGWPutObjProcessor_Atomic processor(dest_obj.bucket, dest_obj.object,
cct->_conf->rgw_obj_stripe_size, tag);
- ret = processor.prepare(this, ctx);
+ ret = processor.prepare(this, ctx, NULL);
if (ret < 0)
return ret;
public:
RGWPutObjProcessor() : store(NULL), obj_ctx(NULL), is_complete(false) {}
virtual ~RGWPutObjProcessor();
- virtual int prepare(RGWRados *_store, void *_o) {
+ virtual int prepare(RGWRados *_store, void *_o, string *oid_rand) {
store = _store;
obj_ctx = _o;
return 0;
};
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
- virtual int throttle_data(void *handle) = 0;
+ virtual int throttle_data(void *handle, bool need_to_wait) = 0;
virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
};
off_t ofs;
protected:
- int prepare(RGWRados *store, void *obj_ctx);
+ int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
int handle_data(bufferlist& bl, off_t ofs, void **phandle);
int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
public:
- int throttle_data(void *handle) { return 0; }
+ int throttle_data(void *handle, bool need_to_wait) { return 0; }
RGWPutObjProcessor_Plain(rgw_bucket& b, const string& o) : bucket(b), obj_str(o), ofs(0) {}
};
uint64_t obj_len;
int drain_pending();
- int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle);
+ int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
public:
- int throttle_data(void *handle);
+ int throttle_data(void *handle, bool need_to_wait);
RGWPutObjProcessor_Aio() : max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
virtual ~RGWPutObjProcessor_Aio() {
rgw_obj cur_obj;
RGWObjManifest manifest;
- virtual bool immutable_head() { return false; }
-
- int write_data(bufferlist& bl, off_t ofs, void **phandle);
+ int write_data(bufferlist& bl, off_t ofs, void **phandle, bool exclusive);
virtual int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
void prepare_next_part(off_t ofs);
bucket(_b),
obj_str(_o),
unique_tag(_t) {}
- int prepare(RGWRados *store, void *obj_ctx);
+ int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
+ virtual bool immutable_head() { return false; }
void set_extra_data_len(uint64_t len) {
extra_data_len = len;
}
- int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+ virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle);
bufferlist& get_extra_data() { return extra_data_bl; }
};