void *handle;
};
+static struct put_obj_aio_info pop_pending(std::list<struct put_obj_aio_info>& pending)
+{
+ struct put_obj_aio_info info;
+ info = pending.front();
+ pending.pop_front();
+ return info;
+}
+
+static int wait_pending_front(std::list<struct put_obj_aio_info>& pending)
+{
+ struct put_obj_aio_info info = pop_pending(pending);
+ int ret = rgwstore->aio_wait(info.handle);
+ free(info.data);
+ return ret;
+}
+
+static bool pending_has_completed(std::list<struct put_obj_aio_info>& pending)
+{
+ if (pending.size() == 0)
+ return false;
+
+ struct put_obj_aio_info& info = pending.front();
+ return rgwstore->aio_completed(info.handle);
+}
+
static int drain_pending(std::list<struct put_obj_aio_info>& pending)
{
int ret = 0;
while (!pending.empty()) {
- struct put_obj_aio_info info = pending.front();
- int r = rgwstore->aio_wait(info.handle);
- free(info.data);
- if (r < 0)
- ret = r;
-
- pending.pop_front();
+ int r = wait_pending_front(pending);
+ if (r < 0)
+ ret = r;
}
return ret;
}
+
void RGWPutObj::execute()
{
bool multipart;
string multipart_meta_obj;
string part_num;
list<struct put_obj_aio_info> pending;
+ size_t max_chunks = RGW_MAX_PENDING_CHUNKS;
ret = -EINVAL;
if (!s->object) {
get_data();
if (len > 0) {
struct put_obj_aio_info info;
+ size_t orig_size;
// For the first call to put_obj_data, pass -1 as the offset to
// do a write_full.
void *handle;
info.handle = handle;
info.data = data;
pending.push_back(info);
- if (pending.size() > RGW_MAX_PENDING_CHUNKS) {
- info = pending.front();
- pending.pop_front();
- ret = rgwstore->aio_wait(info.handle);
- free(info.data);
+ orig_size = pending.size();
+ while (pending_has_completed(pending)) {
+ ret = wait_pending_front(pending);
+ if (ret < 0)
+ goto done;
+
+ }
+
+ /* resize window in case messages are draining too fast */
+ if (orig_size - pending.size() >= max_chunks)
+ max_chunks++;
+
+ if (pending.size() > max_chunks) {
+ ret = wait_pending_front(pending);
if (ret < 0)
goto done;
}
virtual int aio_put_obj_data(std::string& id, rgw_obj& obj, const char *data,
off_t ofs, size_t len, void **handle);
virtual int aio_wait(void *handle);
+ virtual bool aio_completed(void *handle);
virtual int clone_range(rgw_obj& dst_obj, off_t dst_ofs,
rgw_obj& src_obj, off_t src_ofs, size_t size);
/** Copy an object, with many extra options */