virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) override {
return next->handle_data(bl, ofs, phandle, pobj, again);
}
- virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) override {
- return next->throttle_data(handle, obj, need_to_wait);
+ virtual int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait) override {
+ return next->throttle_data(handle, obj, size, need_to_wait);
}
}; /* RGWPutObj_Filter */
void *handle;
rgw_obj obj;
+ uint64_t size = data.length();
+
int ret = processor->handle_data(data, ofs, &handle, &obj, &again);
if (ret < 0)
return ret;
- ret = processor->throttle_data(handle, obj, need_to_wait);
+ ret = processor->throttle_data(handle, obj, size, need_to_wait);
if (ret < 0)
return ret;
struct put_obj_aio_info info;
info = pending.front();
pending.pop_front();
+ pending_size -= info.size;
return info;
}
return ret;
}
-int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait)
+int RGWPutObjProcessor_Aio::throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait)
{
bool _wait = need_to_wait;
struct put_obj_aio_info info;
info.handle = handle;
info.obj = obj;
+ info.size = size;
+ pending_size += size;
pending.push_back(info);
}
- size_t orig_size = pending.size();
+ size_t orig_size = pending_size;
/* first drain complete IOs */
while (pending_has_completed()) {
}
/* resize window in case messages are draining too fast */
- if (orig_size - pending.size() >= max_chunks) {
- max_chunks++;
+ if (orig_size - pending_size >= window_size) {
+ window_size += store->ctx()->_conf->rgw_max_chunk_size;
+ uint64_t max_window_size = store->ctx()->_conf->rgw_put_obj_max_window_size;
+ if (window_size > max_window_size) {
+ window_size = max_window_size;
+ }
}
/* now throttle. Note that need_to_wait should only affect the first IO operation */
- if (pending.size() > max_chunks || _wait) {
+ if (pending_size > window_size || _wait) {
int r = wait_pending_front();
if (r < 0)
return r;
return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle, exclusive);
}
+int RGWPutObjProcessor_Aio::prepare(RGWRados *store, string *oid_rand)
+{
+ RGWPutObjProcessor::prepare(store, oid_rand);
+
+ window_size = store->ctx()->_conf->rgw_put_obj_min_window_size;
+
+ return 0;
+}
+
int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again)
{
*phandle = NULL;
int RGWPutObjProcessor_Atomic::prepare_init(RGWRados *store, string *oid_rand)
{
- RGWPutObjProcessor::prepare(store, oid_rand);
+ RGWPutObjProcessor_Aio::prepare(store, oid_rand);
int r = store->get_max_chunk_size(bucket, &max_chunk_size);
if (r < 0) {
}
bufferlist bl;
pending_data_bl.splice(0, max_write_size, &bl);
+ uint64_t write_len = bl.length();
int r = write_data(bl, data_ofs, &handle, &obj, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: write_data() returned " << r << dendl;
return r;
}
- data_ofs += bl.length();
- r = throttle_data(handle, obj, false);
+ data_ofs += write_len;
+ r = throttle_data(handle, obj, write_len, false);
if (r < 0) {
ldout(store->ctx(), 0) << "ERROR: throttle_data() returned " << r << dendl;
return r;
do {
void *handle = NULL;
rgw_obj obj;
+ uint64_t size = bl.length();
int ret = filter->handle_data(bl, ofs, &handle, &obj, &again);
if (ret < 0)
return ret;
ret = opstate->renew_state();
if (ret < 0) {
ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): failed to renew op state ret=" << ret << dendl;
- int r = filter->throttle_data(handle, obj, false);
+ int r = filter->throttle_data(handle, obj, size, false);
if (r < 0) {
ldout(cct, 0) << "ERROR: RGWRadosPutObj::handle_data(): processor->throttle_data() returned " << r << dendl;
}
need_opstate = false;
}
- ret = filter->throttle_data(handle, obj, false);
+ ret = filter->throttle_data(handle, obj, size, false);
if (ret < 0)
return ret;
} while (again);
if (ret < 0) {
return ret;
}
- ret = processor.throttle_data(handle, obj, false);
+ ret = processor.throttle_data(handle, obj, end - ofs + 1, false);
if (ret < 0)
return ret;
} while (again);
RGWPutObjDataProcessor(){}
virtual ~RGWPutObjDataProcessor(){}
virtual int handle_data(bufferlist& bl, off_t ofs, void **phandle, rgw_obj *pobj, bool *again) = 0;
- virtual int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait) = 0;
+ virtual int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait) = 0;
}; /* RGWPutObjDataProcessor */
struct put_obj_aio_info {
void *handle;
rgw_obj obj;
+ uint64_t size;
};
+#define RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT (16 * 1024 * 1024)
+
class RGWPutObjProcessor_Aio : public RGWPutObjProcessor
{
list<struct put_obj_aio_info> pending;
- size_t max_chunks;
+ uint64_t window_size{RGW_PUT_OBJ_MIN_WINDOW_SIZE_DEFAULT};
+ uint64_t pending_size{0};
struct put_obj_aio_info pop_pending();
int wait_pending_front();
rgw_obj last_written_obj;
protected:
- uint64_t obj_len;
+ uint64_t obj_len{0};
set<rgw_obj> written_objs;
int handle_obj_data(rgw_obj& obj, bufferlist& bl, off_t ofs, off_t abs_ofs, void **phandle, bool exclusive);
public:
- int throttle_data(void *handle, const rgw_obj& obj, bool need_to_wait);
+ int prepare(RGWRados *store, string *oid_rand);
+ int throttle_data(void *handle, const rgw_obj& obj, uint64_t size, bool need_to_wait);
- RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info), max_chunks(RGW_MAX_PENDING_CHUNKS), obj_len(0) {}
+ RGWPutObjProcessor_Aio(RGWObjectCtx& obj_ctx, RGWBucketInfo& bucket_info) : RGWPutObjProcessor(obj_ctx, bucket_info) {}
virtual ~RGWPutObjProcessor_Aio();
}; /* RGWPutObjProcessor_Aio */