return 0;
}
+static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data, off_t ofs,
+ MD5 *hash, bool need_to_wait)
+{
+ const unsigned char *data_ptr = (hash ? (const unsigned char *)data.c_str() : NULL);
+ bool again;
+ uint64_t len = data.length();
+
+ do {
+ void *handle;
+
+ int ret = processor->handle_data(data, ofs, &handle, &again);
+ if (ret < 0)
+ return ret;
+
+ if (hash) {
+ hash->Update(data_ptr, len);
+ hash = NULL; /* only calculate hash once */
+ }
+
+ ret = processor->throttle_data(handle, false);
+ if (ret < 0)
+ return ret;
+
+ need_to_wait = false; /* the need to wait only applies to the first iteration */
+ } while (again);
+
+ return 0;
+}
+
+
void RGWPutObj::execute()
{
RGWPutObjProcessor *processor = NULL;
if (!len)
break;
- void *handle;
- const unsigned char *data_ptr = (const unsigned char *)data.c_str();
-
- ret = processor->handle_data(data, ofs, &handle);
- if (ret < 0)
- goto done;
-
- if (need_calc_md5) {
- hash.Update(data_ptr, len);
- }
-
/* do we need this operation to be synchronous? if we're dealing with an object with immutable
* head, e.g., multipart object we need to make sure we're the first one writing to this object
*/
bool need_to_wait = (ofs == 0) && multipart;
- ret = processor->throttle_data(handle, need_to_wait);
+ ret = put_data_and_throttle(processor, data, ofs, (need_calc_md5 ? &hash : NULL), need_to_wait);
if (ret < 0) {
if (!need_to_wait || ret != -EEXIST) {
ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl;
goto done;
}
- ret = processor->handle_data(data, ofs, &handle);
+ ret = put_data_and_throttle(processor, data, ofs, NULL, false);
if (ret < 0) {
- ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl;
- goto done;
- }
-
- ret = processor->throttle_data(handle, false);
- if (ret < 0) {
- ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl;
goto done;
}
}
if (!len)
break;
- void *handle;
- const unsigned char *data_ptr = (const unsigned char *)data.c_str();
-
- ret = processor->handle_data(data, ofs, &handle);
- if (ret < 0)
- goto done;
-
- hash.Update(data_ptr, len);
-
- ret = processor->throttle_data(handle, false);
- if (ret < 0)
- goto done;
+ ret = put_data_and_throttle(processor, data, ofs, &hash, false);
ofs += len;
return 0;
}
-int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle)
+int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle, bool *again)
{
+ *again = false;
+
if (ofs != _ofs)
return -EINVAL;
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}
-int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle)
+int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again)
{
+ *again = false;
+
*phandle = NULL;
if (extra_data_len) {
size_t extra_len = bl.length();
pending_data_bl.splice(0, max_write_size, &bl);
+ /* do we have enough data pending accumulated that needs to be written? */
+ *again = (pending_data_bl.length() >= max_chunk_size);
+
if (!data_ofs && !immutable_head()) {
first_chunk.claim(bl);
obj_len = (uint64_t)first_chunk.length();
int handle_data(bufferlist& bl, off_t ofs, off_t len) {
progress_cb(ofs, progress_data);
- void *handle;
- int ret = processor->handle_data(bl, ofs, &handle);
- if (ret < 0)
- return ret;
+ bool again;
- if (opstate) {
- /* need to update opstate repository with new state. This is ratelimited, so we're not
- * really doing it every time
- */
- ret = opstate->renew_state();
- if (ret < 0) {
- /* could not renew state! might have been marked as cancelled */
+ bool need_opstate = true;
+
+ do {
+ void *handle;
+ int ret = processor->handle_data(bl, ofs, &handle, &again);
+ if (ret < 0)
return ret;
+
+ if (need_opstate && opstate) {
+ /* need to update opstate repository with new state. This is ratelimited, so we're not
+ * really doing it every time
+ */
+ ret = opstate->renew_state();
+ if (ret < 0) {
+ /* could not renew state! might have been marked as cancelled */
+ return ret;
+ }
+
+ need_opstate = false;
}
- }
- ret = processor->throttle_data(handle, false);
- if (ret < 0)
- return ret;
+ ret = processor->throttle_data(handle, false);
+ if (ret < 0)
+ return ret;
+ } while (again);
return 0;
}
obj_ctx = _o;
return 0;
}
- virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0;
+ virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) = 0;
virtual int throttle_data(void *handle, bool need_to_wait) = 0;
virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
};
protected:
int prepare(RGWRados *store, void *obj_ctx, string *oid_rand);
- int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+ int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again);
int do_complete(string& etag, time_t *mtime, time_t set_mtime, map<string, bufferlist>& attrs);
public:
void set_extra_data_len(uint64_t len) {
extra_data_len = len;
}
- virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle);
+ virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again);
bufferlist& get_extra_data() { return extra_data_bl; }
};