From 062062479a19378608278f440a34ebb3d2606eea Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Fri, 25 Jul 2014 20:33:52 -0700 Subject: [PATCH] rgw: call processor->handle_data() again if needed Fixes: #8937 Following the fix to #8928 we end up accumulating pending data that needs to be written. Beforehand it was working fine because we were feeding it with the exact amount of bytes we were writing. Signed-off-by: Yehuda Sadeh (cherry picked from commit 0553890e79b43414cc0ef97ceb694c1cb5f06bbb) --- src/rgw/rgw_op.cc | 65 ++++++++++++++++++++++---------------------- src/rgw/rgw_rados.cc | 49 +++++++++++++++++++++------------ src/rgw/rgw_rados.h | 6 ++-- 3 files changed, 68 insertions(+), 52 deletions(-) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 02d7c504f73a7..0bc6f71725ba0 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -1564,6 +1564,36 @@ int RGWPutObj::user_manifest_iterate_cb(rgw_bucket& bucket, RGWObjEnt& ent, RGWA return 0; } +static int put_data_and_throttle(RGWPutObjProcessor *processor, bufferlist& data, off_t ofs, + MD5 *hash, bool need_to_wait) +{ + const unsigned char *data_ptr = (hash ? (const unsigned char *)data.c_str() : NULL); + bool again; + uint64_t len = data.length(); + + do { + void *handle; + + int ret = processor->handle_data(data, ofs, &handle, &again); + if (ret < 0) + return ret; + + if (hash) { + hash->Update(data_ptr, len); + hash = NULL; /* only calculate hash once */ + } + + ret = processor->throttle_data(handle, false); + if (ret < 0) + return ret; + + need_to_wait = false; /* the need to wait only applies to the first iteration */ + } while (again); + + return 0; +} + + void RGWPutObj::execute() { RGWPutObjProcessor *processor = NULL; @@ -1637,23 +1667,12 @@ void RGWPutObj::execute() if (!len) break; - void *handle; - const unsigned char *data_ptr = (const unsigned char *)data.c_str(); - - ret = processor->handle_data(data, ofs, &handle); - if (ret < 0) - goto done; - - if (need_calc_md5) { - hash.Update(data_ptr, len); - } - /* do we need this operation to be synchronous? if we're dealing with an object with immutable * head, e.g., multipart object we need to make sure we're the first one writing to this object */ bool need_to_wait = (ofs == 0) && multipart; - ret = processor->throttle_data(handle, need_to_wait); + ret = put_data_and_throttle(processor, data, ofs, (need_calc_md5 ? &hash : NULL), need_to_wait); if (ret < 0) { if (!need_to_wait || ret != -EEXIST) { ldout(s->cct, 20) << "processor->thottle_data() returned ret=" << ret << dendl; @@ -1678,15 +1697,8 @@ void RGWPutObj::execute() goto done; } - ret = processor->handle_data(data, ofs, &handle); + ret = put_data_and_throttle(processor, data, ofs, NULL, false); if (ret < 0) { - ldout(s->cct, 0) << "ERROR: processor->handle_data() returned " << ret << dendl; - goto done; - } - - ret = processor->throttle_data(handle, false); - if (ret < 0) { - ldout(s->cct, 0) << "ERROR: processor->throttle_data() returned " << ret << dendl; goto done; } } @@ -1850,18 +1862,7 @@ void RGWPostObj::execute() if (!len) break; - void *handle; - const unsigned char *data_ptr = (const unsigned char *)data.c_str(); - - ret = processor->handle_data(data, ofs, &handle); - if (ret < 0) - goto done; - - hash.Update(data_ptr, len); - - ret = processor->throttle_data(handle, false); - if (ret < 0) - goto done; + ret = put_data_and_throttle(processor, data, ofs, &hash, false); ofs += len; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index c5b558b22cbe7..65e5de9234609 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -900,8 +900,10 @@ int RGWPutObjProcessor_Plain::prepare(RGWRados *store, void *obj_ctx, string *oi return 0; } -int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle) +int RGWPutObjProcessor_Plain::handle_data(bufferlist& bl, off_t _ofs, void **phandle, bool *again) { + *again = false; + if (ofs != _ofs) return -EINVAL; @@ -1026,8 +1028,10 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive); } -int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) +int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) { + *again = false; + *phandle = NULL; if (extra_data_len) { size_t extra_len = bl.length(); @@ -1052,6 +1056,9 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha pending_data_bl.splice(0, max_write_size, &bl); + /* do we have enough data pending accumulated that needs to be written? */ + *again = (pending_data_bl.length() >= max_chunk_size); + if (!data_ofs && !immutable_head()) { first_chunk.claim(bl); obj_len = (uint64_t)first_chunk.length(); @@ -3023,25 +3030,33 @@ public: int handle_data(bufferlist& bl, off_t ofs, off_t len) { progress_cb(ofs, progress_data); - void *handle; - int ret = processor->handle_data(bl, ofs, &handle); - if (ret < 0) - return ret; + bool again; - if (opstate) { - /* need to update opstate repository with new state. This is ratelimited, so we're not - * really doing it every time - */ - ret = opstate->renew_state(); - if (ret < 0) { - /* could not renew state! might have been marked as cancelled */ + bool need_opstate = true; + + do { + void *handle; + int ret = processor->handle_data(bl, ofs, &handle, &again); + if (ret < 0) return ret; + + if (need_opstate && opstate) { + /* need to update opstate repository with new state. This is ratelimited, so we're not + * really doing it every time + */ + ret = opstate->renew_state(); + if (ret < 0) { + /* could not renew state! might have been marked as cancelled */ + return ret; + } + + need_opstate = false; } - } - ret = processor->throttle_data(handle, false); - if (ret < 0) - return ret; + ret = processor->throttle_data(handle, false); + if (ret < 0) + return ret; + } while (again); return 0; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 6b93b98f9b2e4..2792352b8e80e 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -556,7 +556,7 @@ public: obj_ctx = _o; return 0; } - virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle) = 0; + virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again) = 0; virtual int throttle_data(void *handle, bool need_to_wait) = 0; virtual int complete(string& etag, time_t *mtime, time_t set_mtime, map& attrs); }; @@ -572,7 +572,7 @@ class RGWPutObjProcessor_Plain : public RGWPutObjProcessor protected: int prepare(RGWRados *store, void *obj_ctx, string *oid_rand); - int handle_data(bufferlist& bl, off_t ofs, void **phandle); + int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again); int do_complete(string& etag, time_t *mtime, time_t set_mtime, map& attrs); public: @@ -662,7 +662,7 @@ public: void set_extra_data_len(uint64_t len) { extra_data_len = len; } - virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle); + virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, bool *again); bufferlist& get_extra_data() { return extra_data_bl; } }; -- 2.39.5