From: Yehuda Sadeh Date: Fri, 20 Jan 2017 22:40:58 +0000 (-0800) Subject: rgw: configurable write obj window size X-Git-Tag: v12.0.1~490^2~2 X-Git-Url: http://git-server-git.apps.pok.os.sepia.ceph.com/?a=commitdiff_plain;h=66a82b4266acedfdd71c64394d68d9e50ed11b20;p=ceph.git rgw: configurable write obj window size Fixes: http://tracker.ceph.com/issues/18623 Signed-off-by: Yehuda Sadeh --- diff --git a/src/common/config_opts.h b/src/common/config_opts.h index 41cfa7dd4abc..d194d1b5bfe1 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -1366,6 +1366,8 @@ OPTION(nss_db_path, OPT_STR, "") // path to nss db OPTION(rgw_max_chunk_size, OPT_INT, 4 * 1024 * 1024) +OPTION(rgw_put_obj_min_window_size, OPT_INT, 16 * 1024 * 1024) +OPTION(rgw_put_obj_max_window_size, OPT_INT, 64 * 1024 * 1024) OPTION(rgw_max_put_size, OPT_U64, 5ULL*1024*1024*1024) /** diff --git a/src/rgw/rgw_common.h b/src/rgw/rgw_common.h index b852520e4c6e..e3acf3c2059e 100644 --- a/src/rgw/rgw_common.h +++ b/src/rgw/rgw_common.h @@ -117,8 +117,6 @@ using ceph::crypto::MD5; #define RGW_BUCKETS_OBJ_SUFFIX ".buckets" -#define RGW_MAX_PENDING_CHUNKS 16 - #define RGW_FORMAT_PLAIN 0 #define RGW_FORMAT_XML 1 #define RGW_FORMAT_JSON 2 diff --git a/src/rgw/rgw_op.h b/src/rgw/rgw_op.h index 44a693c10bb8..d1098c341e75 100644 --- a/src/rgw/rgw_op.h +++ b/src/rgw/rgw_op.h @@ -779,8 +779,8 @@ public: virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override { return next->handle_data(bl, ofs, phandle, pobj, again); } - virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) override { - return next->throttle_data(handle, obj, need_to_wait); + virtual int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait) override { + return next->throttle_data(handle, obj, size, need_to_wait); } }; /* RGWPutObj_Filter */ @@ -1606,11 +1606,13 @@ static inline int put_data_and_throttle(RGWPutObjDataProcessor *processor, void *handle; rgw_obj obj; + uint64_t size = data.length(); + int ret = processor->handle_data(data, ofs, &handle, &obj, &again); if (ret < 0) return ret; - ret = processor->throttle_data(handle, obj, need_to_wait); + ret = processor->throttle_data(handle, obj, size, need_to_wait); if (ret < 0) return ret; diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 8757baf1c578..7c9f6b881f37 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -2287,6 +2287,7 @@ struct put_obj_aio_info RGWPutObjProcessor_Aio::pop_pending() struct put_obj_aio_info info; info = pending.front(); pending.pop_front(); + pending_size -= info.size; return info; } @@ -2325,7 +2326,7 @@ int RGWPutObjProcessor_Aio::drain_pending() return ret; } -int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) +int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait) { bool _wait = need_to_wait; @@ -2333,9 +2334,11 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool struct put_obj_aio_info info; info.handle = handle; info.obj = obj; + info.size = size; + pending_size += size; pending.push_back(info); } - size_t orig_size = pending.size(); + size_t orig_size = pending_size; /* first drain complete IOs */ while (pending_has_completed()) { @@ -2347,12 +2350,16 @@ int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool } /* resize window in case messages are draining too fast */ - if (orig_size - pending.size() >= max_chunks) { - max_chunks++; + if (orig_size - pending_size >= window_size) { + window_size += store->ctx()->_conf->rgw_max_chunk_size; + uint64_t max_window_size = store->ctx()->_conf->rgw_put_obj_max_window_size; + if (window_size > max_window_size) { + window_size = max_window_size; + } } /* now throttle. Note that need_to_wait should only affect the first IO operation */ - if (pending.size() > max_chunks || _wait) { + if (pending_size > window_size || _wait) { int r = wait_pending_front(); if (r < 0) return r; @@ -2374,6 +2381,15 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive); } +int RGWPutObjProcessor_Aio::prepare(RGWRados *store, string *oid_rand) +{ + RGWPutObjProcessor::prepare(store, oid_rand); + + window_size = store->ctx()->_conf->rgw_put_obj_min_window_size; + + return 0; +} + int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) { *phandle = NULL; @@ -2415,7 +2431,7 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha int RGWPutObjProcessor_Atomic::prepare_init(RGWRados *store, string *oid_rand) { - RGWPutObjProcessor::prepare(store, oid_rand); + RGWPutObjProcessor_Aio::prepare(store, oid_rand); int r = store->get_max_chunk_size(bucket, &max_chunk_size); if (r < 0) { @@ -2492,13 +2508,14 @@ int RGWPutObjProcessor_Atomic::complete_writing_data() } bufferlist bl; pending_data_bl.splice(0, max_write_size, &bl); + uint64_t write_len = bl.length(); int r = write_data(bl, data_ofs, &handle, &obj, false); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl; return r; } - data_ofs += bl.length(); - r = throttle_data(handle, obj, false); + data_ofs += write_len; + r = throttle_data(handle, obj, write_len, false); if (r < 0) { ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl; return r; @@ -6713,6 +6730,7 @@ public: do { void *handle = NULL; rgw_obj obj; + uint64_t size = bl.length(); int ret = filter->handle_data(bl, ofs, &handle, &obj, &again); if (ret < 0) return ret; @@ -6724,7 +6742,7 @@ public: ret = opstate->renew_state(); if (ret < 0) { ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl; - int r = filter->throttle_data(handle, obj, false); + int r = filter->throttle_data(handle, obj, size, false); if (r < 0) { ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl; } @@ -6735,7 +6753,7 @@ public: need_opstate = false; } - ret = filter->throttle_data(handle, obj, false); + ret = filter->throttle_data(handle, obj, size, false); if (ret < 0) return ret; } while (again); @@ -7648,7 +7666,7 @@ int RGWRados::copy_obj_data(RGWObjectCtx& obj_ctx, if (ret < 0) { return ret; } - ret = processor.throttle_data(handle, obj, false); + ret = processor.throttle_data(handle, obj, end - ofs + 1, false); if (ret < 0) return ret; } while (again); diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 9d820ff7f00d..8cea3ad63e0c 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -3331,7 +3331,7 @@ public: RGWPutObjDataProcessor(){} virtual ~RGWPutObjDataProcessor(){} virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) = 0; - virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0; + virtual int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait) = 0; }; /* RGWPutObjDataProcessor */ @@ -3374,12 +3374,16 @@ public: struct put_obj_aio_info { void *handle; rgw_obj obj; + uint64_t size; }; +#define RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT (16 * 1024 * 1024) + class RGWPutObjProcessor_Aio : public RGWPutObjProcessor { list pending; - size_t max_chunks; + uint64_t window_size{RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT}; + uint64_t pending_size{0}; struct put_obj_aio_info pop_pending(); int wait_pending_front(); @@ -3388,7 +3392,7 @@ class RGWPutObjProcessor_Aio : public RGWPutObjProcessor rgw_obj last_written_obj; protected: - uint64_t obj_len; + uint64_t obj_len{0}; set written_objs; @@ -3400,9 +3404,10 @@ protected: int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive); public: - int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait); + int prepare(RGWRados *store, string *oid_rand); + int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait); - RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {} + RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info) {} virtual ~RGWPutObjProcessor_Aio(); }; /* RGWPutObjProcessor_Aio */