From 5ad52aff9c76d18b6ae98b0d857c93d682dfacc4 Mon Sep 17 00:00:00 2001 From: Yehuda Sadeh Date: Tue, 21 Jun 2011 11:22:47 -0700 Subject: [PATCH] rgw: use adapting window for put obj --- src/rgw/rgw_access.h | 2 ++ src/rgw/rgw_op.cc | 57 ++++++++++++++++++++++++++++++++++---------- src/rgw/rgw_rados.cc | 5 ++++ src/rgw/rgw_rados.h | 1 + 4 files changed, 53 insertions(+), 12 deletions(-) diff --git a/src/rgw/rgw_access.h b/src/rgw/rgw_access.h index 95edf0e7c625a..93a1b0bb85435 100644 --- a/src/rgw/rgw_access.h +++ b/src/rgw/rgw_access.h @@ -69,6 +69,8 @@ public: } virtual int aio_wait(void *handle) { return -ENOTSUP; } + virtual bool aio_completed(void *handle) { return false; } + /** * Copy an object. * id: unused (well, it's passed to put_obj) diff --git a/src/rgw/rgw_op.cc b/src/rgw/rgw_op.cc index 508fc528058df..dd2a9b89e977f 100644 --- a/src/rgw/rgw_op.cc +++ b/src/rgw/rgw_op.cc @@ -437,27 +437,50 @@ struct put_obj_aio_info { void *handle; }; +static struct put_obj_aio_info pop_pending(std::list& pending) +{ + struct put_obj_aio_info info; + info = pending.front(); + pending.pop_front(); + return info; +} + +static int wait_pending_front(std::list& pending) +{ + struct put_obj_aio_info info = pop_pending(pending); + int ret = rgwstore->aio_wait(info.handle); + free(info.data); + return ret; +} + +static bool pending_has_completed(std::list& pending) +{ + if (pending.size() == 0) + return false; + + struct put_obj_aio_info& info = pending.front(); + return rgwstore->aio_completed(info.handle); +} + static int drain_pending(std::list& pending) { int ret = 0; while (!pending.empty()) { - struct put_obj_aio_info info = pending.front(); - int r = rgwstore->aio_wait(info.handle); - free(info.data); - if (r < 0) - ret = r; - - pending.pop_front(); + int r = wait_pending_front(pending); + if (r < 0) + ret = r; } return ret; } + void RGWPutObj::execute() { bool multipart; string multipart_meta_obj; string part_num; list pending; + size_t max_chunks = RGW_MAX_PENDING_CHUNKS; ret = -EINVAL; if (!s->object) { @@ -525,6 +548,7 @@ void RGWPutObj::execute() get_data(); if (len > 0) { struct put_obj_aio_info info; + size_t orig_size; // For the first call to put_obj_data, pass -1 as the offset to // do a write_full. void *handle; @@ -538,11 +562,20 @@ void RGWPutObj::execute() info.handle = handle; info.data = data; pending.push_back(info); - if (pending.size() > RGW_MAX_PENDING_CHUNKS) { - info = pending.front(); - pending.pop_front(); - ret = rgwstore->aio_wait(info.handle); - free(info.data); + orig_size = pending.size(); + while (pending_has_completed(pending)) { + ret = wait_pending_front(pending); + if (ret < 0) + goto done; + + } + + /* resize window in case messages are draining too fast */ + if (orig_size - pending.size() >= max_chunks) + max_chunks++; + + if (pending.size() > max_chunks) { + ret = wait_pending_front(pending); if (ret < 0) goto done; } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index 1627b4baeef50..5a971c5f6f9ac 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -398,6 +398,11 @@ int RGWRados::aio_wait(void *handle) return ret; } +bool RGWRados::aio_completed(void *handle) +{ + AioCompletion *c = (AioCompletion *)handle; + return c->is_complete(); +} /** * Copy an object. * id: unused (well, it's passed to put_obj) diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index a6e2d5795d6dd..4ea23367905ba 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -51,6 +51,7 @@ public: virtual int aio_put_obj_data(std::string& id, rgw_obj& obj, const char *data, off_t ofs, size_t len, void **handle); virtual int aio_wait(void *handle); + virtual bool aio_completed(void *handle); virtual int clone_range(rgw_obj& dst_obj, off_t dst_ofs, rgw_obj& src_obj, off_t src_ofs, size_t size); /** Copy an object, with many extra options */ -- 2.39.5