]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: use AioThrottle for Object::Read::iterate() ops
authorCasey Bodley <cbodley@redhat.com>
Wed, 21 Nov 2018 20:17:06 +0000 (15:17 -0500)
committerCasey Bodley <cbodley@redhat.com>
Wed, 5 Dec 2018 16:16:54 +0000 (11:16 -0500)
Signed-off-by: Casey Bodley <cbodley@redhat.com>
src/rgw/rgw_rados.cc

index ebf215d423d403f52a12d50fed947490c60b77ed..34faff5dd1f8982144fabf9dc20fdc96c5234433 100644 (file)
@@ -6479,186 +6479,55 @@ int RGWRados::Object::Read::read(int64_t ofs, int64_t end, bufferlist& bl)
   return bl.length();
 }
 
-struct get_obj_data;
-
-struct get_obj_aio_data {
-  struct get_obj_data *op_data;
-  off_t ofs;
-  off_t len;
-};
-
-struct get_obj_io {
-  off_t len;
-  bufferlist bl;
-};
-
-static void _get_obj_aio_completion_cb(completion_t cb, void *arg);
-
-struct get_obj_data : public RefCountedObject {
-  CephContext *cct;
-  RGWRados *rados;
-  IoCtx io_ctx;
-  map<off_t, get_obj_io> io_map;
-  map<off_t, librados::AioCompletion *> completion_map;
-  uint64_t total_read;
-  Mutex lock;
-  Mutex data_lock;
-  list<get_obj_aio_data> aio_data;
-  RGWGetDataCB *client_cb;
-  std::atomic<bool> cancelled = { false };
-  std::atomic<int64_t> err_code = { 0 };
-  Throttle throttle;
-  list<bufferlist> read_list;
-
-  explicit get_obj_data(CephContext *_cct)
-    : cct(_cct),
-      rados(NULL),
-      total_read(0), lock("get_obj_data"), data_lock("get_obj_data::data_lock"),
-      client_cb(NULL),
-      throttle(cct, "get_obj_data", cct->_conf->rgw_get_obj_window_size, false) {}
-  ~get_obj_data() override { } 
-  void set_cancelled(int r) {
-    cancelled = true;
-    err_code = r;
-  }
-
-  bool is_cancelled() {
-    return cancelled;
-  }
-
-  int get_err_code() {
-    return err_code;
-  }
-
-  int wait_next_io(bool *done) {
-    lock.Lock();
-    map<off_t, librados::AioCompletion *>::iterator iter = completion_map.begin();
-    if (iter == completion_map.end()) {
-      *done = true;
-      lock.Unlock();
-      return 0;
+struct get_obj_data {
+  RGWRados* store;
+  RGWGetDataCB* client_cb;
+  rgw::Aio* aio;
+  uint64_t offset; // next offset to write to client
+  rgw::AioResultList completed; // completed read results, sorted by offset
+
+  get_obj_data(RGWRados* store, RGWGetDataCB* cb, rgw::Aio* aio, uint64_t offset)
+    : store(store), client_cb(cb), aio(aio), offset(offset) {}
+
+  int flush(rgw::AioResultList&& results) {
+    int r = rgw::check_for_errors(results);
+    if (r < 0) {
+      return r;
     }
-    off_t cur_ofs = iter->first;
-    librados::AioCompletion *c = iter->second;
-    lock.Unlock();
 
-    c->wait_for_safe_and_cb();
-    int r = c->get_return_value();
+    auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
+    results.sort(cmp); // merge() requires results to be sorted first
+    completed.merge(results, cmp); // merge results in sorted order
 
-    lock.Lock();
-    completion_map.erase(cur_ofs);
-
-    if (completion_map.empty()) {
-      *done = true;
-    }
-    lock.Unlock();
+    while (!completed.empty() && completed.front().id == offset) {
+      auto bl = std::move(completed.front().data);
+      completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
 
-    c->release();
-    
-    return r;
-  }
-
-  void add_io(off_t ofs, off_t len, bufferlist **pbl, AioCompletion **pc) {
-    Mutex::Locker l(lock);
-
-    const auto& io_iter = io_map.insert(
-      map<off_t, get_obj_io>::value_type(ofs, get_obj_io()));
-
-    ceph_assert(io_iter.second); // assert new insertion
-
-    get_obj_io& io = (io_iter.first)->second;
-    *pbl = &io.bl;
-
-    struct get_obj_aio_data aio;
-    aio.ofs = ofs;
-    aio.len = len;
-    aio.op_data = this;
-
-    aio_data.push_back(aio);
-
-    struct get_obj_aio_data *paio_data =  &aio_data.back(); /* last element */
-
-    librados::AioCompletion *c = librados::Rados::aio_create_completion((void *)paio_data, NULL, _get_obj_aio_completion_cb);
-    completion_map[ofs] = c;
-
-    *pc = c;
-
-    /* we have a reference per IO, plus one reference for the calling function.
-     * reference is dropped for each callback, plus when we're done iterating
-     * over the parts */
-    get();
-  }
-
-  void cancel_io(off_t ofs) {
-    ldout(cct, 20) << "get_obj_data::cancel_io() ofs=" << ofs << dendl;
-    lock.Lock();
-    map<off_t, AioCompletion *>::iterator iter = completion_map.find(ofs);
-    if (iter != completion_map.end()) {
-      AioCompletion *c = iter->second;
-      c->release();
-      completion_map.erase(ofs);
-      io_map.erase(ofs);
+      offset += bl.length();
+      int r = client_cb->handle_data(bl, 0, bl.length());
+      if (r < 0) {
+        return r;
+      }
     }
-    lock.Unlock();
-
-    /* we don't drop a reference here -- e.g., not calling d->put(), because we still
-     * need IoCtx to live, as io callback may still be called
-     */
+    return 0;
   }
 
-  void cancel_all_io() {
-    ldout(cct, 20) << "get_obj_data::cancel_all_io()" << dendl;
-    Mutex::Locker l(lock);
-    for (map<off_t, librados::AioCompletion *>::iterator iter = completion_map.begin();
-         iter != completion_map.end(); ++iter) {
-      librados::AioCompletion  *c = iter->second;
-      c->release();
-    }
+  void cancel() {
+    // wait for all completions to drain and ignore the results
+    aio->drain();
   }
 
-  int get_complete_ios(off_t ofs, list<bufferlist>& bl_list) {
-    Mutex::Locker l(lock);
-
-    map<off_t, get_obj_io>::iterator liter = io_map.begin();
-
-    if (liter == io_map.end() ||
-        liter->first != ofs) {
-      return 0;
-    }
-
-    map<off_t, librados::AioCompletion *>::iterator aiter;
-    aiter = completion_map.find(ofs);
-    if (aiter == completion_map.end()) {
-    /* completion map does not hold this io, it was cancelled */
-      return 0;
-    }
-
-    AioCompletion *completion = aiter->second;
-    int r = completion->get_return_value();
-    if (r < 0)
-      return r;
-
-    for (; aiter != completion_map.end(); ++aiter) {
-      completion = aiter->second;
-      if (!completion->is_safe()) {
-        /* reached a request that is not yet complete, stop */
-        break;
-      }
-
-      r = completion->get_return_value();
+  int drain() {
+    auto c = aio->wait();
+    while (!c.empty()) {
+      int r = flush(std::move(c));
       if (r < 0) {
-        set_cancelled(r); /* mark it as cancelled, so that we don't continue processing next operations */
+        cancel();
         return r;
       }
-
-      total_read += r;
-
-      map<off_t, get_obj_io>::iterator old_liter = liter++;
-      bl_list.push_back(old_liter->second.bl);
-      io_map.erase(old_liter);
+      c = aio->wait();
     }
-
-    return 0;
+    return flush(std::move(c));
   }
 };
 
@@ -6668,91 +6537,10 @@ static int _get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs,
 {
   struct get_obj_data *d = (struct get_obj_data *)arg;
 
-  return d->rados->get_obj_iterate_cb(read_obj, obj_ofs, read_ofs, len,
+  return d->store->get_obj_iterate_cb(read_obj, obj_ofs, read_ofs, len,
                                       is_head_obj, astate, arg);
 }
 
-static void _get_obj_aio_completion_cb(completion_t cb, void *arg)
-{
-  struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg;
-  struct get_obj_data *d = aio_data->op_data;
-
-  d->rados->get_obj_aio_completion_cb(cb, arg);
-}
-
-
-void RGWRados::get_obj_aio_completion_cb(completion_t c, void *arg)
-{
-  struct get_obj_aio_data *aio_data = (struct get_obj_aio_data *)arg;
-  struct get_obj_data *d = aio_data->op_data;
-  off_t ofs = aio_data->ofs;
-  off_t len = aio_data->len;
-
-  list<bufferlist> bl_list;
-  list<bufferlist>::iterator iter;
-  int r;
-
-  ldout(cct, 20) << "get_obj_aio_completion_cb: io completion ofs=" << ofs << " len=" << len << dendl;
-  d->throttle.put(len);
-
-  r = rados_aio_get_return_value(c);
-  if (r < 0) {
-    ldout(cct, 0) << "ERROR: got unexpected error when trying to read object: " << r << dendl;
-    d->set_cancelled(r);
-    goto done;
-  }
-
-  if (d->is_cancelled()) {
-    goto done;
-  }
-
-  d->data_lock.Lock();
-
-  r = d->get_complete_ios(ofs, bl_list);
-  if (r < 0) {
-    goto done_unlock;
-  }
-
-  d->read_list.splice(d->read_list.end(), bl_list);
-
-done_unlock:
-  d->data_lock.Unlock();
-done:
-  d->put();
-  return;
-}
-
-int RGWRados::flush_read_list(struct get_obj_data *d)
-{
-  d->data_lock.Lock();
-  list<bufferlist> l;
-  l.swap(d->read_list);
-  d->get();
-  d->read_list.clear();
-
-  d->data_lock.Unlock();
-
-  int r = 0;
-
-  list<bufferlist>::iterator iter;
-  for (iter = l.begin(); iter != l.end(); ++iter) {
-    bufferlist& bl = *iter;
-    r = d->client_cb->handle_data(bl, 0, bl.length());
-    if (r < 0) {
-      dout(0) << "ERROR: flush_read_list(): d->client_cb->handle_data() returned " << r << dendl;
-      break;
-    }
-  }
-
-  d->data_lock.Lock();
-  d->put();
-  if (r < 0) {
-    d->set_cancelled(r);
-  }
-  d->data_lock.Unlock();
-  return r;
-}
-
 int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs,
                                  off_t read_ofs, off_t len, bool is_head_obj,
                                  RGWObjState *astate, void *arg)
@@ -6760,14 +6548,10 @@ int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs,
   ObjectReadOperation op;
   struct get_obj_data *d = (struct get_obj_data *)arg;
   string oid, key;
-  bufferlist *pbl;
-  AioCompletion *c;
-
-  int r;
 
   if (is_head_obj) {
     /* only when reading from the head object do we need to do the atomic test */
-    r = append_atomic_test(astate, op);
+    int r = append_atomic_test(astate, op);
     if (r < 0)
       return r;
 
@@ -6779,11 +6563,8 @@ int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs,
       if (r < 0)
         return r;
 
-      d->lock.Lock();
-      d->total_read += chunk_len;
-      d->lock.Unlock();
-       
       len -= chunk_len;
+      d->offset += chunk_len;
       read_ofs += chunk_len;
       obj_ofs += chunk_len;
       if (!len)
@@ -6791,81 +6572,44 @@ int RGWRados::get_obj_iterate_cb(const rgw_raw_obj& read_obj, off_t obj_ofs,
     }
   }
 
-  d->throttle.get(len);
-  if (d->is_cancelled()) {
-    return d->get_err_code();
-  }
-
-  /* add io after we check that we're not cancelled, otherwise we're going to have trouble
-   * cleaning up
-   */
-  d->add_io(obj_ofs, len, &pbl, &c);
-
-  ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << read_obj.oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
-  op.read(read_ofs, len, pbl, NULL);
-
-  librados::IoCtx io_ctx(d->io_ctx);
-  io_ctx.locator_set_key(read_obj.loc);
-
-  r = io_ctx.aio_operate(read_obj.oid, c, &op, NULL);
+  auto obj = d->store->svc.rados->obj(read_obj);
+  int r = obj.open();
   if (r < 0) {
-    ldout(cct, 0) << "rados->aio_operate r=" << r << dendl;
-    goto done_err;
+    ldout(cct, 4) << "failed to open rados context for " << read_obj << dendl;
+    return r;
   }
 
-  // Flush data to client if there is any
-  r = flush_read_list(d);
-  if (r < 0)
-    return r;
+  ldout(cct, 20) << "rados->get_obj_iterate_cb oid=" << read_obj.oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
+  op.read(read_ofs, len, nullptr, nullptr);
 
-  return 0;
+  const uint64_t cost = len;
+  const uint64_t id = obj_ofs; // use logical object offset for sorting replies
 
-done_err:
-  ldout(cct, 20) << "cancelling io r=" << r << " obj_ofs=" << obj_ofs << dendl;
-  d->set_cancelled(r);
-  d->cancel_io(obj_ofs);
+  auto completed = d->aio->submit(obj, read_obj, &op, cost, id);
 
-  return r;
+  return d->flush(std::move(completed));
 }
 
 int RGWRados::Object::Read::iterate(int64_t ofs, int64_t end, RGWGetDataCB *cb)
 {
   RGWRados *store = source->get_store();
   CephContext *cct = store->ctx();
-
-  struct get_obj_data *data = new get_obj_data(cct);
-  bool done = false;
-
   RGWObjectCtx& obj_ctx = source->get_ctx();
+  const uint64_t chunk_size = cct->_conf->rgw_get_obj_max_req_size;
+  const uint64_t window_size = cct->_conf->rgw_get_obj_window_size;
 
-  data->rados = store;
-  data->io_ctx.dup(state.io_ctx);
-  data->client_cb = cb;
+  rgw::AioThrottle aio(window_size);
+  get_obj_data data(store, cb, &aio, ofs);
 
-  int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj, ofs, end, cct->_conf->rgw_get_obj_max_req_size, _get_obj_iterate_cb, (void *)data);
+  int r = store->iterate_obj(obj_ctx, source->get_bucket_info(), state.obj,
+                             ofs, end, chunk_size, _get_obj_iterate_cb, &data);
   if (r < 0) {
-    data->cancel_all_io();
-    goto done;
-  }
-
-  while (!done) {
-    r = data->wait_next_io(&done);
-    if (r < 0) {
-      dout(10) << __func__ << " r=" << r << ", canceling all io" << dendl;
-      data->cancel_all_io();
-      break;
-    }
-    r = store->flush_read_list(data);
-    if (r < 0) {
-      dout(10) << __func__ << " r=" << r << ", canceling all io" << dendl;
-      data->cancel_all_io();
-      break;
-    }
+    ldout(cct, 0) << "iterate_obj() failed with " << r << dendl;
+    data.cancel(); // drain completions without writing back to client
+    return r;
   }
 
-done:
-  data->put();
-  return r;
+  return data.drain();
 }
 
 int RGWRados::iterate_obj(RGWObjectCtx& obj_ctx,