return bl.length();
}
-struct get_obj_data;
-
-struct get_obj_aio_data {
- struct get_obj_data *op_data;
- off_t ofs;
- off_t len;
-};
-
-struct get_obj_io {
- off_t len;
- bufferlist bl;
-};
-
-static void _get_obj_aio_completion_cb(completion_t cb, void *arg);
-
-struct get_obj_data : public RefCountedObject {
- CephContext *cct;
- RGWRados *rados;
- IoCtx io_ctx;
- map<off_t, get_obj_io> io_map;
- map<off_t, librados::AioCompletion *> completion_map;
- uint64_t total_read;
- Mutex lock;
- Mutex data_lock;
- list<get_obj_aio_data> aio_data;
- RGWGetDataCB *client_cb;
- std::atomic<bool> cancelled = { false };
- std::atomic<int64_t> err_code = { 0 };
- Throttle throttle;
- list<bufferlist> read_list;
-
- explicit get_obj_data(CephContext *_cct)
- : cct(_cct),
- rados(NULL),
- total_read(0), lock("get_obj_data"), data_lock("get_obj_data::data_lock"),
- client_cb(NULL),
- throttle(cct, "get_obj_data", cct->_conf->rgw_get_obj_window_size, false) {}
- ~get_obj_data() override { }
- void set_cancelled(int r) {
- cancelled = true;
- err_code = r;
- }
-
- bool is_cancelled() {
- return cancelled;
- }
-
- int get_err_code() {
- return err_code;
- }
-
- int wait_next_io(bool *done) {
- lock.Lock();
- map<off_t, librados::AioCompletion *>::iterator iter = completion_map.begin();
- if (iter == completion_map.end()) {
- *done = true;
- lock.Unlock();
- return 0;
+struct get_obj_data {
+ RGWRados* store;
+ RGWGetDataCB* client_cb;
+ rgw::Aio* aio;
+ uint64_t offset; // next offset to write to client
+ rgw::AioResultList completed; // completed read results, sorted by offset
+
+ get_obj_data(RGWRados* store, RGWGetDataCB* cb, rgw::Aio* aio, uint64_t offset)
+ : store(store), client_cb(cb), aio(aio), offset(offset) {}
+
+ int flush(rgw::AioResultList&& results) {
+ int r = rgw::check_for_errors(results);
+ if (r < 0) {
+ return r;
}
- off_t cur_ofs = iter->first;
- librados::AioCompletion *c = iter->second;
- lock.Unlock();
- c->wait_for_safe_and_cb();
- int r = c->get_return_value();
+ auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
+ results.sort(cmp); // merge() requires results to be sorted first
+ completed.merge(results, cmp); // merge results in sorted order
- lock.Lock();
- completion_map.erase(cur_ofs);
-
- if (completion_map.empty()) {
- *done = true;
- }
- lock.Unlock();
+ while (!completed.empty() && completed.front().id == offset) {
+ auto bl = std::move(completed.front().data);
+ completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
- c->release();
-
- return r;
- }
-
- void add_io(off_t ofs, off_t len, bufferlist **pbl, AioCompletion **pc) {
- Mutex::Locker l(lock);
-
- const auto& io_iter = io_map.insert(
- map<off_t, get_obj_io>::value_type(ofs, get_obj_io()));
-
- ceph_assert(io_iter.second); // assert new insertion
-
- get_obj_io& io = (io_iter.first)->second;
- *pbl = &io.bl;
-
- struct get_obj_aio_data aio;
- aio.ofs = ofs;
- aio.len = len;
- aio.op_data = this;
-
- aio_data.push_back(aio);
-
- struct get_obj_aio_data *paio_data = &aio_data.back(); /* last element */
-
- librados::AioCompletion *c = librados::Rados::aio_create_completion((void *)paio_data, NULL, _get_obj_aio_completion_cb);
- completion_map[ofs] = c;
-
- *pc = c;
-
- /* we have a reference per IO, plus one reference for the calling function.
- * reference is dropped for each callback, plus when we're done iterating
- * over the parts */
- get();
- }
-
- void cancel_io(off_t ofs) {
- ldout(cct, 20) << "get_obj_data::cancel_io() ofs=" << ofs << dendl;
- lock.Lock();
- map<off_t, AioCompletion *>::iterator iter = completion_map.find(ofs);
- if (iter != completion_map.end()) {
- AioCompletion *c = iter->second;
- c->release();
- completion_map.erase(ofs);
- io_map.erase(ofs);
+ offset += bl.length();
+ int r = client_cb->handle_data(bl, 0, bl.length());
+ if (r < 0) {
+ return r;
+ }
}
- lock.Unlock();
-
- /* we don't drop a reference here -- e.g., not calling d->put(), because we still
- * need IoCtx to live, as io callback may still be called
- */
+ return 0;
}
- void cancel_all_io() {
- ldout(cct, 20) << "get_obj_data::cancel_all_io()" << dendl;
- Mutex::Locker l(lock);
- for (map<off_t, librados::AioCompletion *>::iterator iter = completion_map.begin();
- iter != completion_map.end(); ++iter) {
- librados::AioCompletion *c = iter->second;
- c->release();
- }
+ void cancel() {
+ // wait for all completions to drain and ignore the results
+ aio->drain();
}
- int get_complete_ios(off_t ofs, list<bufferlist>& bl_list) {
- Mutex::Locker l(lock);
-
- map<off_t, get_obj_io>::iterator liter = io_map.begin();
-
- if (liter == io_map.end() ||
- liter->first != ofs) {
- return 0;
- }
-
- map<off_t, librados::AioCompletion *>::iterator aiter;
- aiter = completion_map.find(ofs);
- if (aiter == completion_map.end()) {
- /* completion map does not hold this io, it was cancelled */
- return 0;
- }
-
- AioCompletion *completion = aiter->second;
- int r = completion->get_return_value();
- if (r < 0)
- return r;
-
- for (; aiter != completion_map.end(); ++aiter) {
- completion = aiter->second;
- if (!completion->is_safe()) {
- /* reached a request that is not yet complete, stop */
- break;
- }
-
- r = completion->get_return_value();
+ int drain() {
+ auto c = aio->wait();
+ while (!c.empty()) {
+ int r = flush(std::move(c));
if (r < 0) {
- set_cancelled(r); /* mark it as cancelled, so that we don't continue processing next operations */
+ cancel();
return r;
}
-
- total_read += r;
-
- map<off_t, get_obj_io>::iterator old_liter = liter++;
- bl_list.push_back(old_liter->second.bl);
- io_map.erase(old_liter);
+ c = aio->wait();
}
-
- return 0;
+ return flush(std::move(c));
}
};
{
struct get_obj_data *d = (struct get_obj_data *)arg;
- return d->rados->get_obj_iterate_cb(read_obj, obj_ofs, read_ofs, len,
+ return d->store->get_obj_iterate_cb(read_obj, obj_ofs, read_ofs, len,
is_head_obj, astate, arg);
}
-static void _get_obj_aio_completion_cb(completion_t cb, void *arg)
-{
- struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg;
- struct get_obj_data *d = aio_data->op_data;
-
- d->rados->get_obj_aio_completion_cb(cb, arg);
-}
-
-
-void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg)
-{
- struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg;
- struct get_obj_data *d = aio_data->op_data;
- off_t ofs = aio_data->ofs;
- off_t len = aio_data->len;
-
- list<bufferlist> bl_list;
- list<bufferlist>::iterator iter;
- int r;
-
- ldout(cct, 20) << "get_obj_aio_completion_cb: io completion ofs=" << ofs << " len=" << len << dendl;
- d->throttle.put(len);
-
- r = rados_aio_get_return_value(c);
- if (r < 0) {
- ldout(cct, 0) << "ERROR: got unexpected error when trying to read object: " << r << dendl;
- d->set_cancelled(r);
- goto done;
- }
-
- if (d->is_cancelled()) {
- goto done;
- }
-
- d->data_lock.Lock();
-
- r = d->get_complete_ios(ofs, bl_list);
- if (r < 0) {
- goto done_unlock;
- }
-
- d->read_list.splice(d->read_list.end(), bl_list);
-
-done_unlock:
- d->data_lock.Unlock();
-done:
- d->put();
- return;
-}
-
-int RGWRados::flush_read_list(struct get_obj_data *d)
-{
- d->data_lock.Lock();
- list<bufferlist> l;
- l.swap(d->read_list);
- d->get();
- d->read_list.clear();
-
- d->data_lock.Unlock();
-
- int r = 0;
-
- list<bufferlist>::iterator iter;
- for (iter = l.begin(); iter != l.end(); ++iter) {
- bufferlist& bl = *iter;
- r = d->client_cb->handle_data(bl, 0, bl.length());
- if (r < 0) {
- dout(0) << "ERROR: flush_read_list(): d->client_cb->handle_data() returned " << r << dendl;
- break;
- }
- }
-
- d->data_lock.Lock();
- d->put();
- if (r < 0) {
- d->set_cancelled(r);
- }
- d->data_lock.Unlock();
- return r;
-}
-
int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs,
off_t read_ofs, off_t len, bool is_head_obj,
RGWObjState *astate, void *arg)
ObjectReadOperation op;
struct get_obj_data *d = (struct get_obj_data *)arg;
string oid, key;
- bufferlist *pbl;
- AioCompletion *c;
-
- int r;
if (is_head_obj) {
/* only when reading from the head object do we need to do the atomic test */
- r = append_atomic_test(astate, op);
+ int r = append_atomic_test(astate, op);
if (r < 0)
return r;
if (r < 0)
return r;
- d->lock.Lock();
- d->total_read += chunk_len;
- d->lock.Unlock();
-
len -= chunk_len;
+ d->offset += chunk_len;
read_ofs += chunk_len;
obj_ofs += chunk_len;
if (!len)
}
}
- d->throttle.get(len);
- if (d->is_cancelled()) {
- return d->get_err_code();
- }
-
- /* add io after we check that we're not cancelled, otherwise we're going to have trouble
- * cleaning up
- */
- d->add_io(obj_ofs, len, &pbl, &c);
-
- ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << read_obj.oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
- op.read(read_ofs, len, pbl, NULL);
-
- librados::IoCtx io_ctx(d->io_ctx);
- io_ctx.locator_set_key(read_obj.loc);
-
- r = io_ctx.aio_operate(read_obj.oid, c, &op, NULL);
+ auto obj = d->store->svc.rados->obj(read_obj);
+ int r = obj.open();
if (r < 0) {
- ldout(cct, 0) << "rados->aio_operate r=" << r << dendl;
- goto done_err;
+ ldout(cct, 4) << "failed to open rados context for " << read_obj << dendl;
+ return r;
}
- // Flush data to client if there is any
- r = flush_read_list(d);
- if (r < 0)
- return r;
+ ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << read_obj.oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
+ op.read(read_ofs, len, nullptr, nullptr);
- return 0;
+ const uint64_t cost = len;
+ const uint64_t id = obj_ofs; // use logical object offset for sorting replies
-done_err:
- ldout(cct, 20) << "cancelling io r=" << r << " obj_ofs=" << obj_ofs << dendl;
- d->set_cancelled(r);
- d->cancel_io(obj_ofs);
+ auto completed = d->aio->submit(obj, read_obj, &op, cost, id);
- return r;
+ return d->flush(std::move(completed));
}
int RGWRados::Object::Read::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb)
{
RGWRados *store = source->get_store();
CephContext *cct = store->ctx();
-
- struct get_obj_data *data = new get_obj_data(cct);
- bool done = false;
-
RGWObjectCtx& obj_ctx = source->get_ctx();
+ const uint64_t chunk_size = cct->_conf->rgw_get_obj_max_req_size;
+ const uint64_t window_size = cct->_conf->rgw_get_obj_window_size;
- data->rados = store;
- data->io_ctx.dup(state.io_ctx);
- data->client_cb = cb;
+ rgw::AioThrottle aio(window_size);
+ get_obj_data data(store, cb, &aio, ofs);
- int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data);
+ int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj,
+ ofs, end, chunk_size, _get_obj_iterate_cb, &data);
if (r < 0) {
- data->cancel_all_io();
- goto done;
- }
-
- while (!done) {
- r = data->wait_next_io(&done);
- if (r < 0) {
- dout(10) << __func__ << " r=" << r << ", canceling all io" << dendl;
- data->cancel_all_io();
- break;
- }
- r = store->flush_read_list(data);
- if (r < 0) {
- dout(10) << __func__ << " r=" << r << ", canceling all io" << dendl;
- data->cancel_all_io();
- break;
- }
+ ldout(cct, 0) << "iterate_obj() failed with " << r << dendl;
+ data.cancel(); // drain completions without writing back to client
+ return r;
}
-done:
- data->put();
- return r;
+ return data.drain();
}
int RGWRados::iterate_obj(RGWObjectCtx& obj_ctx,