From 046769848d4321f4ea4d2f523e6b452520689f74 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Wed, 21 Nov 2018 15:17:06 -0500 Subject: [PATCH] rgw: use AioThrottle for Object::Read::iterate() ops Signed-off-by: Casey Bodley --- src/rgw/rgw_rados.cc | 374 +++++++------------------------------------ 1 file changed, 59 insertions(+), 315 deletions(-) diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index ebf215d423d40..34faff5dd1f89 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -6479,186 +6479,55 @@ int RGWRados::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl) return bl.length(); } -struct get_obj_data; - -struct get_obj_aio_data { - struct get_obj_data *op_data; - off_t ofs; - off_t len; -}; - -struct get_obj_io { - off_t len; - bufferlist bl; -}; - -static void _get_obj_aio_completion_cb(completion_t cb, void *arg); - -struct get_obj_data : public RefCountedObject { - CephContext *cct; - RGWRados *rados; - IoCtx io_ctx; - map io_map; - map completion_map; - uint64_t total_read; - Mutex lock; - Mutex data_lock; - list aio_data; - RGWGetDataCB *client_cb; - std::atomic cancelled = { false }; - std::atomic err_code = { 0 }; - Throttle throttle; - list read_list; - - explicit get_obj_data(CephContext *_cct) - : cct(_cct), - rados(NULL), - total_read(0), lock("get_obj_data"), data_lock("get_obj_data::data_lock"), - client_cb(NULL), - throttle(cct, "get_obj_data", cct->_conf->rgw_get_obj_window_size, false) {} - ~get_obj_data() override { } - void set_cancelled(int r) { - cancelled = true; - err_code = r; - } - - bool is_cancelled() { - return cancelled; - } - - int get_err_code() { - return err_code; - } - - int wait_next_io(bool *done) { - lock.Lock(); - map::iterator iter = completion_map.begin(); - if (iter == completion_map.end()) { - *done = true; - lock.Unlock(); - return 0; +struct get_obj_data { + RGWRados* store; + RGWGetDataCB* client_cb; + rgw::Aio* aio; + uint64_t offset; // next offset to write to client + rgw::AioResultList completed; // completed read results, sorted by offset + + get_obj_data(RGWRados* store, RGWGetDataCB* cb, rgw::Aio* aio, uint64_t offset) + : store(store), client_cb(cb), aio(aio), offset(offset) {} + + int flush(rgw::AioResultList&& results) { + int r = rgw::check_for_errors(results); + if (r < 0) { + return r; } - off_t cur_ofs = iter->first; - librados::AioCompletion *c = iter->second; - lock.Unlock(); - c->wait_for_safe_and_cb(); - int r = c->get_return_value(); + auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; }; + results.sort(cmp); // merge() requires results to be sorted first + completed.merge(results, cmp); // merge results in sorted order - lock.Lock(); - completion_map.erase(cur_ofs); - - if (completion_map.empty()) { - *done = true; - } - lock.Unlock(); + while (!completed.empty() && completed.front().id == offset) { + auto bl = std::move(completed.front().data); + completed.pop_front_and_dispose(std::default_delete{}); - c->release(); - - return r; - } - - void add_io(off_t ofs, off_t len, bufferlist **pbl, AioCompletion **pc) { - Mutex::Locker l(lock); - - const auto& io_iter = io_map.insert( - map::value_type(ofs, get_obj_io())); - - ceph_assert(io_iter.second); // assert new insertion - - get_obj_io& io = (io_iter.first)->second; - *pbl = &io.bl; - - struct get_obj_aio_data aio; - aio.ofs = ofs; - aio.len = len; - aio.op_data = this; - - aio_data.push_back(aio); - - struct get_obj_aio_data *paio_data = &aio_data.back(); /* last element */ - - librados::AioCompletion *c = librados::Rados::aio_create_completion((void *)paio_data, NULL, _get_obj_aio_completion_cb); - completion_map[ofs] = c; - - *pc = c; - - /* we have a reference per IO, plus one reference for the calling function. - * reference is dropped for each callback, plus when we're done iterating - * over the parts */ - get(); - } - - void cancel_io(off_t ofs) { - ldout(cct, 20) << "get_obj_data::cancel_io() ofs=" << ofs << dendl; - lock.Lock(); - map::iterator iter = completion_map.find(ofs); - if (iter != completion_map.end()) { - AioCompletion *c = iter->second; - c->release(); - completion_map.erase(ofs); - io_map.erase(ofs); + offset += bl.length(); + int r = client_cb->handle_data(bl, 0, bl.length()); + if (r < 0) { + return r; + } } - lock.Unlock(); - - /* we don't drop a reference here -- e.g., not calling d->put(), because we still - * need IoCtx to live, as io callback may still be called - */ + return 0; } - void cancel_all_io() { - ldout(cct, 20) << "get_obj_data::cancel_all_io()" << dendl; - Mutex::Locker l(lock); - for (map::iterator iter = completion_map.begin(); - iter != completion_map.end(); ++iter) { - librados::AioCompletion *c = iter->second; - c->release(); - } + void cancel() { + // wait for all completions to drain and ignore the results + aio->drain(); } - int get_complete_ios(off_t ofs, list& bl_list) { - Mutex::Locker l(lock); - - map::iterator liter = io_map.begin(); - - if (liter == io_map.end() || - liter->first != ofs) { - return 0; - } - - map::iterator aiter; - aiter = completion_map.find(ofs); - if (aiter == completion_map.end()) { - /* completion map does not hold this io, it was cancelled */ - return 0; - } - - AioCompletion *completion = aiter->second; - int r = completion->get_return_value(); - if (r < 0) - return r; - - for (; aiter != completion_map.end(); ++aiter) { - completion = aiter->second; - if (!completion->is_safe()) { - /* reached a request that is not yet complete, stop */ - break; - } - - r = completion->get_return_value(); + int drain() { + auto c = aio->wait(); + while (!c.empty()) { + int r = flush(std::move(c)); if (r < 0) { - set_cancelled(r); /* mark it as cancelled, so that we don't continue processing next operations */ + cancel(); return r; } - - total_read += r; - - map::iterator old_liter = liter++; - bl_list.push_back(old_liter->second.bl); - io_map.erase(old_liter); + c = aio->wait(); } - - return 0; + return flush(std::move(c)); } }; @@ -6668,91 +6537,10 @@ static int _get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs, { struct get_obj_data *d = (struct get_obj_data *)arg; - return d->rados->get_obj_iterate_cb(read_obj, obj_ofs, read_ofs, len, + return d->store->get_obj_iterate_cb(read_obj, obj_ofs, read_ofs, len, is_head_obj, astate, arg); } -static void _get_obj_aio_completion_cb(completion_t cb, void *arg) -{ - struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg; - struct get_obj_data *d = aio_data->op_data; - - d->rados->get_obj_aio_completion_cb(cb, arg); -} - - -void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg) -{ - struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg; - struct get_obj_data *d = aio_data->op_data; - off_t ofs = aio_data->ofs; - off_t len = aio_data->len; - - list bl_list; - list::iterator iter; - int r; - - ldout(cct, 20) << "get_obj_aio_completion_cb: io completion ofs=" << ofs << " len=" << len << dendl; - d->throttle.put(len); - - r = rados_aio_get_return_value(c); - if (r < 0) { - ldout(cct, 0) << "ERROR: got unexpected error when trying to read object: " << r << dendl; - d->set_cancelled(r); - goto done; - } - - if (d->is_cancelled()) { - goto done; - } - - d->data_lock.Lock(); - - r = d->get_complete_ios(ofs, bl_list); - if (r < 0) { - goto done_unlock; - } - - d->read_list.splice(d->read_list.end(), bl_list); - -done_unlock: - d->data_lock.Unlock(); -done: - d->put(); - return; -} - -int RGWRados::flush_read_list(struct get_obj_data *d) -{ - d->data_lock.Lock(); - list l; - l.swap(d->read_list); - d->get(); - d->read_list.clear(); - - d->data_lock.Unlock(); - - int r = 0; - - list::iterator iter; - for (iter = l.begin(); iter != l.end(); ++iter) { - bufferlist& bl = *iter; - r = d->client_cb->handle_data(bl, 0, bl.length()); - if (r < 0) { - dout(0) << "ERROR: flush_read_list(): d->client_cb->handle_data() returned " << r << dendl; - break; - } - } - - d->data_lock.Lock(); - d->put(); - if (r < 0) { - d->set_cancelled(r); - } - d->data_lock.Unlock(); - return r; -} - int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs, off_t read_ofs, off_t len, bool is_head_obj, RGWObjState *astate, void *arg) @@ -6760,14 +6548,10 @@ int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs, ObjectReadOperation op; struct get_obj_data *d = (struct get_obj_data *)arg; string oid, key; - bufferlist *pbl; - AioCompletion *c; - - int r; if (is_head_obj) { /* only when reading from the head object do we need to do the atomic test */ - r = append_atomic_test(astate, op); + int r = append_atomic_test(astate, op); if (r < 0) return r; @@ -6779,11 +6563,8 @@ int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs, if (r < 0) return r; - d->lock.Lock(); - d->total_read += chunk_len; - d->lock.Unlock(); - len -= chunk_len; + d->offset += chunk_len; read_ofs += chunk_len; obj_ofs += chunk_len; if (!len) @@ -6791,81 +6572,44 @@ int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs, } } - d->throttle.get(len); - if (d->is_cancelled()) { - return d->get_err_code(); - } - - /* add io after we check that we're not cancelled, otherwise we're going to have trouble - * cleaning up - */ - d->add_io(obj_ofs, len, &pbl, &c); - - ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << read_obj.oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl; - op.read(read_ofs, len, pbl, NULL); - - librados::IoCtx io_ctx(d->io_ctx); - io_ctx.locator_set_key(read_obj.loc); - - r = io_ctx.aio_operate(read_obj.oid, c, &op, NULL); + auto obj = d->store->svc.rados->obj(read_obj); + int r = obj.open(); if (r < 0) { - ldout(cct, 0) << "rados->aio_operate r=" << r << dendl; - goto done_err; + ldout(cct, 4) << "failed to open rados context for " << read_obj << dendl; + return r; } - // Flush data to client if there is any - r = flush_read_list(d); - if (r < 0) - return r; + ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << read_obj.oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl; + op.read(read_ofs, len, nullptr, nullptr); - return 0; + const uint64_t cost = len; + const uint64_t id = obj_ofs; // use logical object offset for sorting replies -done_err: - ldout(cct, 20) << "cancelling io r=" << r << " obj_ofs=" << obj_ofs << dendl; - d->set_cancelled(r); - d->cancel_io(obj_ofs); + auto completed = d->aio->submit(obj, read_obj, &op, cost, id); - return r; + return d->flush(std::move(completed)); } int RGWRados::Object::Read::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb) { RGWRados *store = source->get_store(); CephContext *cct = store->ctx(); - - struct get_obj_data *data = new get_obj_data(cct); - bool done = false; - RGWObjectCtx& obj_ctx = source->get_ctx(); + const uint64_t chunk_size = cct->_conf->rgw_get_obj_max_req_size; + const uint64_t window_size = cct->_conf->rgw_get_obj_window_size; - data->rados = store; - data->io_ctx.dup(state.io_ctx); - data->client_cb = cb; + rgw::AioThrottle aio(window_size); + get_obj_data data(store, cb, &aio, ofs); - int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data); + int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj, + ofs, end, chunk_size, _get_obj_iterate_cb, &data); if (r < 0) { - data->cancel_all_io(); - goto done; - } - - while (!done) { - r = data->wait_next_io(&done); - if (r < 0) { - dout(10) << __func__ << " r=" << r << ", canceling all io" << dendl; - data->cancel_all_io(); - break; - } - r = store->flush_read_list(data); - if (r < 0) { - dout(10) << __func__ << " r=" << r << ", canceling all io" << dendl; - data->cancel_all_io(); - break; - } + ldout(cct, 0) << "iterate_obj() failed with " << r << dendl; + data.cancel(); // drain completions without writing back to client + return r; } -done: - data->put(); - return r; + return data.drain(); } int RGWRados::iterate_obj(RGWObjectCtx& obj_ctx, -- 2.39.5