]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Add AioCompletion* versions for the rest of the FIFO methods
authorAdam C. Emerson <aemerson@redhat.com>
Tue, 3 Nov 2020 21:02:26 +0000 (16:02 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 5 Apr 2021 17:36:42 +0000 (13:36 -0400)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
(cherry picked from commit 665573ab8905bfa2e1ede6fc3be9bc80a625cb49)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/cls_fifo_legacy.cc
src/rgw/cls_fifo_legacy.h
src/rgw/rgw_datalog.cc
src/test/rgw/test_cls_fifo_legacy.cc

index d835aeec76ab8806ca7f078641dc5d6c229a5e56..569a3e77c458f770d85bb018e245e4abfecb2e22 100644 (file)
@@ -109,6 +109,7 @@ int get_meta(lr::IoCtx& ioctx, const std::string& oid,
   return r;
 };
 
+namespace {
 void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv,
                 const fifo::update& update)
 {
@@ -175,6 +176,27 @@ int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
   return retval;
 }
 
+void push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
+              std::deque<cb::list> data_bufs, std::uint64_t tid,
+              lr::AioCompletion* c)
+{
+  lr::ObjectWriteOperation op;
+  fifo::op::push_part pp;
+
+  pp.tag = tag;
+  pp.data_bufs = data_bufs;
+  pp.total_len = 0;
+
+  for (const auto& bl : data_bufs)
+    pp.total_len += bl.length();
+
+  cb::list in;
+  encode(pp, in);
+  op.exec(fifo::op::CLASS, fifo::op::PUSH_PART, in);
+  auto r = ioctx.aio_operate(oid, c, &op, lr::OPERATION_RETURNVEC);
+  ceph_assert(r >= 0);
+}
+
 void trim_part(lr::ObjectWriteOperation* op,
               std::optional<std::string_view> tag,
               std::uint64_t ofs, bool exclusive)
@@ -232,6 +254,70 @@ int list_part(lr::IoCtx& ioctx, const std::string& oid,
   return r;
 }
 
+struct list_entry_completion : public lr::ObjectOperationCompletion {
+  CephContext* cct;
+  int* r_out;
+  std::vector<fifo::part_list_entry>* entries;
+  bool* more;
+  bool* full_part;
+  std::string* ptag;
+  std::uint64_t tid;
+
+  list_entry_completion(CephContext* cct, int* r_out, std::vector<fifo::part_list_entry>* entries,
+                       bool* more, bool* full_part, std::string* ptag,
+                       std::uint64_t tid)
+    : cct(cct), r_out(r_out), entries(entries), more(more),
+      full_part(full_part), ptag(ptag), tid(tid) {}
+  virtual ~list_entry_completion() = default;
+  void handle_completion(int r, bufferlist& bl) override {
+    if (r >= 0) try {
+       fifo::op::list_part_reply reply;
+       auto iter = bl.cbegin();
+       decode(reply, iter);
+       if (entries) *entries = std::move(reply.entries);
+       if (more) *more = reply.more;
+       if (full_part) *full_part = reply.full_part;
+       if (ptag) *ptag = reply.tag;
+      } catch (const cb::error& err) {
+       lderr(cct)
+         << __PRETTY_FUNCTION__ << ":" << __LINE__
+         << " decode failed: " << err.what()
+         << " tid=" << tid << dendl;
+       r = from_error_code(err.code());
+      } else if (r < 0) {
+      lderr(cct)
+       << __PRETTY_FUNCTION__ << ":" << __LINE__
+       << " fifo::op::LIST_PART failed r=" << r << " tid=" << tid
+       << dendl;
+    }
+    if (r_out) *r_out = r;
+  }
+};
+
+lr::ObjectReadOperation list_part(CephContext* cct,
+                                 std::optional<std::string_view> tag,
+                                 std::uint64_t ofs,
+                                 std::uint64_t max_entries,
+                                 int* r_out,
+                                 std::vector<fifo::part_list_entry>* entries,
+                                 bool* more, bool* full_part,
+                                 std::string* ptag, std::uint64_t tid)
+{
+  lr::ObjectReadOperation op;
+  fifo::op::list_part lp;
+
+  lp.tag = tag;
+  lp.ofs = ofs;
+  lp.max_entries = max_entries;
+
+  cb::list in;
+  encode(lp, in);
+  op.exec(fifo::op::CLASS, fifo::op::LIST_PART, in,
+         new list_entry_completion(cct, r_out, entries, more, full_part,
+                                   ptag, tid));
+  return op;
+}
+
 int get_part_info(lr::IoCtx& ioctx, const std::string& oid,
                  fifo::part_header* header,
                  std::uint64_t tid, optional_yield y)
@@ -264,29 +350,131 @@ int get_part_info(lr::IoCtx& ioctx, const std::string& oid,
   return r;
 }
 
-static void complete(lr::AioCompletion* c_, int r)
+struct partinfo_completion : public lr::ObjectOperationCompletion {
+  CephContext* cct;
+  int* rp;
+  fifo::part_header* h;
+  std::uint64_t tid;
+  partinfo_completion(CephContext* cct, int* rp, fifo::part_header* h,
+                     std::uint64_t tid) :
+    cct(cct), rp(rp), h(h), tid(tid) {
+  }
+  virtual ~partinfo_completion() = default;
+  void handle_completion(int r, bufferlist& bl) override {
+    if (r >= 0) try {
+       fifo::op::get_part_info_reply reply;
+       auto iter = bl.cbegin();
+       decode(reply, iter);
+       if (h) *h = std::move(reply.header);
+      } catch (const cb::error& err) {
+       r = from_error_code(err.code());
+       lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " decode failed: " << err.what()
+                  << " tid=" << tid << dendl;
+      } else {
+      lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                << " fifo::op::GET_PART_INFO failed r=" << r << " tid=" << tid
+                << dendl;
+    }
+    if (rp) {
+      *rp = r;
+    }
+  }
+};
+
+template<typename T>
+struct Completion {
+private:
+  lr::AioCompletion* _cur = nullptr;
+  lr::AioCompletion* _super;
+public:
+
+  using Ptr = std::unique_ptr<T>;
+
+  lr::AioCompletion* cur() const {
+    return _cur;
+  }
+  lr::AioCompletion* super() const {
+    return _super;
+  }
+
+  Completion(lr::AioCompletion* super) : _super(super) {
+    super->pc->get();
+  }
+
+  ~Completion() {
+    if (_super) {
+      _super->pc->put();
+    }
+    if (_cur)
+      _cur->release();
+    _super = nullptr;
+    _cur = nullptr;
+  }
+
+  // The only times that aio_operate can return an error are:
+  // 1. The completion contains a null pointer. This should just
+  //    crash, and in our case it does.
+  // 2. An attempt is made to write to a snapshot. RGW doesn't use
+  //    snapshots, so we don't care.
+  //
+  // So we will just assert that initiating an Aio operation succeeds
+  // and not worry about recovering.
+  static lr::AioCompletion* call(Ptr&& p) {
+    p->_cur = lr::Rados::aio_create_completion(static_cast<void*>(p.get()),
+                                              &cb);
+    auto c = p->_cur;
+    p.release();
+    return c;
+  }
+  static void complete(Ptr&& p, int r) {
+    auto c = p->_super->pc;
+    p->_super = nullptr;
+    c->lock.lock();
+    c->rval = r;
+    c->complete = true;
+    c->lock.unlock();
+
+    auto cb_complete = c->callback_complete;
+    auto cb_complete_arg = c->callback_complete_arg;
+    if (cb_complete)
+      cb_complete(c, cb_complete_arg);
+
+    auto cb_safe = c->callback_safe;
+    auto cb_safe_arg = c->callback_safe_arg;
+    if (cb_safe)
+      cb_safe(c, cb_safe_arg);
+
+    c->lock.lock();
+    c->callback_complete = nullptr;
+    c->callback_safe = nullptr;
+    c->cond.notify_all();
+    c->put_unlock();
+  }
+
+  static void cb(lr::completion_t, void* arg) {
+    auto t = static_cast<T*>(arg);
+    auto r = t->_cur->get_return_value();
+    t->_cur->release();
+    t->_cur = nullptr;
+    t->handle(Ptr(t), r);
+  }
+};
+
+lr::ObjectReadOperation get_part_info(CephContext* cct,
+                                     fifo::part_header* header,
+                                     std::uint64_t tid, int* r = 0)
 {
-  auto c = c_->pc;
-  c->lock.lock();
-  c->rval = r;
-  c->complete = true;
-  c->lock.unlock();
-
-  auto cb_complete = c->callback_complete;
-  auto cb_complete_arg = c->callback_complete_arg;
-  if (cb_complete)
-    cb_complete(c, cb_complete_arg);
-
-  auto cb_safe = c->callback_safe;
-  auto cb_safe_arg = c->callback_safe_arg;
-  if (cb_safe)
-    cb_safe(c, cb_safe_arg);
-
-  c->lock.lock();
-  c->callback_complete = NULL;
-  c->callback_safe = NULL;
-  c->cond.notify_all();
-  c->put_unlock();
+  lr::ObjectReadOperation op;
+  fifo::op::get_part_info gpi;
+
+  cb::list in;
+  cb::list bl;
+  encode(gpi, in);
+  op.exec(fifo::op::CLASS, fifo::op::GET_PART_INFO, in,
+         new partinfo_completion(cct, r, header, tid));
+  return op;
+}
 }
 
 std::optional<marker> FIFO::to_marker(std::string_view s)
@@ -385,11 +573,8 @@ int FIFO::_update_meta(const fifo::update& update,
   return r;
 }
 
-struct Updater {
+struct Updater : public Completion<Updater> {
   FIFO* fifo;
-  lr::AioCompletion* super;
-  lr::AioCompletion* cur = lr::Rados::aio_create_completion(
-    static_cast<void*>(this), &FIFO::update_callback);
   fifo::update update;
   fifo::objv version;
   bool reread = false;
@@ -398,92 +583,74 @@ struct Updater {
   Updater(FIFO* fifo, lr::AioCompletion* super,
          const fifo::update& update, fifo::objv version,
          bool* pcanceled, std::uint64_t tid)
-    : fifo(fifo), super(super), update(update), version(version),
-      pcanceled(pcanceled), tid(tid) {
-    super->pc->get();
-  }
-  ~Updater() {
-    cur->release();
-  }
-};
-
-void FIFO::update_callback(lr::completion_t, void* arg)
-{
-  std::unique_ptr<Updater> updater(static_cast<Updater*>(arg));
-  auto cct = updater->fifo->cct;
-  auto tid = updater->tid;
-  ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " entering: tid=" << tid << dendl;
-  if (!updater->reread) {
-    ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                  << " handling async update_meta: tid="
-                  << tid << dendl;
-    int r = updater->cur->get_return_value();
+    : Completion(super), fifo(fifo), update(update), version(version),
+      pcanceled(pcanceled) {}
+
+  void handle(Ptr&& p, int r) {
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " entering: tid=" << tid << dendl;
+    if (reread)
+      handle_reread(std::move(p), r);
+    else
+      handle_update(std::move(p), r);
+  }
+
+  void handle_update(Ptr&& p, int r) {
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " handling async update_meta: tid="
+                        << tid << dendl;
     if (r < 0 && r != -ECANCELED) {
-      lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+      lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " update failed: r=" << r << " tid=" << tid << dendl;
-      complete(updater->super, r);
+      complete(std::move(p), r);
       return;
     }
     bool canceled = (r == -ECANCELED);
     if (!canceled) {
-      int r = updater->fifo->apply_update(&updater->fifo->info,
-                                         updater->version,
-                                         updater->update, tid);
+      int r = fifo->apply_update(&fifo->info, version, update, tid);
       if (r < 0) {
-       ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                      << " update failed, marking canceled: r=" << r << " tid="
-                      << tid << dendl;
+       ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                            << " update failed, marking canceled: r=" << r
+                            << " tid=" << tid << dendl;
        canceled = true;
       }
     }
     if (canceled) {
-      updater->cur->release();
-      updater->cur = lr::Rados::aio_create_completion(
-       arg, &FIFO::update_callback);
-      updater->reread = true;
-      auto r = updater->fifo->read_meta(tid, updater->cur);
-      if (r < 0) {
-       lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                  << " failed dispatching read_meta: r=" << r << " tid="
-                  << tid << dendl;
-       complete(updater->super, r);
-      } else {
-       updater.release();
-      }
+      reread = true;
+      fifo->read_meta(tid, call(std::move(p)));
       return;
     }
-    if (updater->pcanceled)
-      *updater->pcanceled = false;
-    ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                  << " completing: tid=" << tid << dendl;
-    complete(updater->super, 0);
-    return;
-  }
-
-  ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " handling async read_meta: tid="
-                << tid << dendl;
-  int r = updater->cur->get_return_value();
-  if (r < 0 && updater->pcanceled) {
-    *updater->pcanceled = false;
-  } else if (r >= 0 && updater->pcanceled) {
-    *updater->pcanceled = true;
-  }
-  if (r < 0) {
-    lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-              << " failed dispatching read_meta: r=" << r << " tid="
-              << tid << dendl;
-  } else {
-    ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                  << " completing: tid=" << tid << dendl;
+    if (pcanceled)
+      *pcanceled = false;
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " completing: tid=" << tid << dendl;
+    complete(std::move(p), 0);
+  }
+
+  void handle_reread(Ptr&& p, int r) {
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " handling async read_meta: tid="
+                        << tid << dendl;
+    if (r < 0 && pcanceled) {
+      *pcanceled = false;
+    } else if (r >= 0 && pcanceled) {
+      *pcanceled = true;
+    }
+    if (r < 0) {
+      lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " failed dispatching read_meta: r=" << r << " tid="
+                      << tid << dendl;
+    } else {
+      ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                          << " completing: tid=" << tid << dendl;
+    }
+    complete(std::move(p), r);
   }
-  complete(updater->super, r);
-}
+};
 
-int FIFO::_update_meta(const fifo::update& update,
-                      fifo::objv version, bool* pcanceled,
-                      std::uint64_t tid, lr::AioCompletion* c)
+void FIFO::_update_meta(const fifo::update& update,
+                       fifo::objv version, bool* pcanceled,
+                       std::uint64_t tid, lr::AioCompletion* c)
 {
   ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
@@ -491,15 +658,8 @@ int FIFO::_update_meta(const fifo::update& update,
   update_meta(&op, info.version, update);
   auto updater = std::make_unique<Updater>(this, c, update, version, pcanceled,
                                           tid);
-  auto r = ioctx.aio_operate(oid, updater->cur, &op);
-  if (r < 0) {
-    lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-              << " failed dispatching update_meta: r=" << r << " tid="
-              << tid << dendl;
-  } else {
-    updater.release();
-  }
-  return r;
+  auto r = ioctx.aio_operate(oid, Updater::call(std::move(updater)), &op);
+  assert(r >= 0);
 }
 
 int FIFO::create_part(int64_t part_num, std::string_view tag, std::uint64_t tid,
@@ -509,7 +669,7 @@ int FIFO::create_part(int64_t part_num, std::string_view tag, std::uint64_t tid,
                 << " entering: tid=" << tid << dendl;
   lr::ObjectWriteOperation op;
   op.create(false); /* We don't need exclusivity, part_init ensures
-                      we're creating from the  same journal entry. */
+                      we're creating from the same journal entry. */
   std::unique_lock l(m);
   part_init(&op, tag, info.params);
   auto oid = info.part_oid(part_num);
@@ -806,6 +966,209 @@ int FIFO::_prepare_new_head(std::uint64_t tid, optional_yield y)
   return 0;
 }
 
+struct NewPartPreparer : public Completion<NewPartPreparer> {
+  FIFO* f;
+  std::vector<fifo::journal_entry> jentries;
+  int i = 0;
+  std::int64_t new_head_part_num;
+  bool canceled = false;
+  uint64_t tid;
+
+  NewPartPreparer(FIFO* f, lr::AioCompletion* super,
+                 std::vector<fifo::journal_entry> jentries,
+                 std::int64_t new_head_part_num,
+                 std::uint64_t tid)
+    : Completion(super), f(f), jentries(std::move(jentries)),
+      new_head_part_num(new_head_part_num), tid(tid) {}
+
+  void handle(Ptr&& p, int r) {
+    ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                     << " entering: tid=" << tid << dendl;
+    if (r < 0) {
+      lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                   << " _update_meta failed:  r=" << r
+                   << " tid=" << tid << dendl;
+      complete(std::move(p), r);
+      return;
+    }
+
+    if (canceled) {
+      std::unique_lock l(f->m);
+      auto iter = f->info.journal.find(jentries.front().part_num);
+      auto max_push_part_num = f->info.max_push_part_num;
+      auto head_part_num = f->info.head_part_num;
+      auto version = f->info.version;
+      auto found = (iter != f->info.journal.end());
+      l.unlock();
+      if ((max_push_part_num >= jentries.front().part_num &&
+          head_part_num >= new_head_part_num)) {
+       ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                         << " raced, but journaled and processed: i=" << i
+                         << " tid=" << tid << dendl;
+       complete(std::move(p), 0);
+       return;
+      }
+      if (i >= MAX_RACE_RETRIES) {
+       complete(std::move(p), -ECANCELED);
+       return;
+      }
+      if (!found) {
+       ++i;
+       f->_update_meta(fifo::update{}
+                       .journal_entries_add(jentries),
+                        version, &canceled, tid, call(std::move(p)));
+       return;
+      } else {
+       ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                         << " raced, journaled but not processed: i=" << i
+                         << " tid=" << tid << dendl;
+       canceled = false;
+      }
+      // Fall through. We still need to process the journal.
+    }
+    f->process_journal(tid, super());
+    return;
+  }
+};
+
+void FIFO::_prepare_new_part(bool is_head, std::uint64_t tid,
+                            lr::AioCompletion* c)
+{
+  std::unique_lock l(m);
+  std::vector jentries = { info.next_journal_entry(generate_tag()) };
+  if (info.journal.find(jentries.front().part_num) != info.journal.end()) {
+    l.unlock();
+    ldout(cct, 5) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                 << " new part journaled, but not processed: tid="
+                 << tid << dendl;
+    process_journal(tid, c);
+    return;
+  }
+  std::int64_t new_head_part_num = info.head_part_num;
+  auto version = info.version;
+
+  if (is_head) {
+    auto new_head_jentry = jentries.front();
+    new_head_jentry.op = fifo::journal_entry::Op::set_head;
+    new_head_part_num = jentries.front().part_num;
+    jentries.push_back(std::move(new_head_jentry));
+  }
+  l.unlock();
+
+  auto n = std::make_unique<NewPartPreparer>(this, c, jentries,
+                                            new_head_part_num, tid);
+  auto np = n.get();
+  _update_meta(fifo::update{}.journal_entries_add(jentries), version,
+              &np->canceled, tid, NewPartPreparer::call(std::move(n)));
+}
+
+struct NewHeadPreparer : public Completion<NewHeadPreparer> {
+  FIFO* f;
+  int i = 0;
+  bool newpart;
+  std::int64_t new_head_num;
+  bool canceled = false;
+  std::uint64_t tid;
+
+  NewHeadPreparer(FIFO* f, lr::AioCompletion* super,
+                 bool newpart, std::int64_t new_head_num, std::uint64_t tid)
+    : Completion(super), f(f), newpart(newpart), new_head_num(new_head_num),
+      tid(tid) {}
+
+  void handle(Ptr&& p, int r) {
+    if (newpart)
+      handle_newpart(std::move(p), r);
+    else
+      handle_update(std::move(p), r);
+  }
+
+  void handle_newpart(Ptr&& p, int r) {
+    if (r < 0) {
+      lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                   << " _prepare_new_part failed: r=" << r
+                   << " tid=" << tid << dendl;
+      complete(std::move(p), r);
+      return;
+    }
+    std::unique_lock l(f->m);
+    if (f->info.max_push_part_num < new_head_num) {
+      l.unlock();
+      lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                   << " _prepare_new_part failed: r=" << r
+                   << " tid=" << tid << dendl;
+      complete(std::move(p), -EIO);
+    } else {
+      l.unlock();
+      complete(std::move(p), 0);
+    }
+  }
+
+  void handle_update(Ptr&& p, int r) {
+    std::unique_lock l(f->m);
+    auto head_part_num = f->info.head_part_num;
+    auto version = f->info.version;
+    l.unlock();
+
+    if (r < 0) {
+      lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                   << " _update_meta failed: r=" << r
+                   << " tid=" << tid << dendl;
+      complete(std::move(p), r);
+      return;
+    }
+    if (canceled) {
+      if (i >= MAX_RACE_RETRIES) {
+       lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                     << " canceled too many times, giving up: tid=" << tid << dendl;
+       complete(std::move(p), -ECANCELED);
+       return;
+      }
+
+      // Raced, but there's still work to do!
+      if (head_part_num < new_head_num) {
+       canceled = false;
+       ++i;
+       ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                         << " updating head: i=" << i << " tid=" << tid << dendl;
+       f->_update_meta(fifo::update{}.head_part_num(new_head_num),
+                       version, &this->canceled, tid, call(std::move(p)));
+       return;
+      }
+    }
+    ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " succeeded : i=" << i << " tid=" << tid << dendl;
+    complete(std::move(p), 0);
+    return;
+  }
+};
+
+void FIFO::_prepare_new_head(std::uint64_t tid, lr::AioCompletion* c)
+{
+  ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                << " entering: tid=" << tid << dendl;
+  std::unique_lock l(m);
+  int64_t new_head_num = info.head_part_num + 1;
+  auto max_push_part_num = info.max_push_part_num;
+  auto version = info.version;
+  l.unlock();
+
+  if (max_push_part_num < new_head_num) {
+    ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " need new part: tid=" << tid << dendl;
+    auto n = std::make_unique<NewHeadPreparer>(this, c, true, new_head_num,
+                                              tid);
+    _prepare_new_part(true, tid, NewHeadPreparer::call(std::move(n)));
+  } else {
+    ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " updating head: tid=" << tid << dendl;
+    auto n = std::make_unique<NewHeadPreparer>(this, c, false, new_head_num,
+                                              tid);
+    auto np = n.get();
+    _update_meta(fifo::update{}.head_part_num(new_head_num), version,
+                &np->canceled, tid, NewHeadPreparer::call(std::move(n)));
+  }
+}
+
 int FIFO::push_entries(const std::deque<cb::list>& data_bufs,
                       std::uint64_t tid, optional_yield y)
 {
@@ -825,6 +1188,18 @@ int FIFO::push_entries(const std::deque<cb::list>& data_bufs,
   return r;
 }
 
+void FIFO::push_entries(const std::deque<cb::list>& data_bufs,
+                       std::uint64_t tid, lr::AioCompletion* c)
+{
+  std::unique_lock l(m);
+  auto head_part_num = info.head_part_num;
+  auto tag = info.head_tag;
+  const auto part_oid = info.part_oid(head_part_num);
+  l.unlock();
+
+  push_part(ioctx, part_oid, tag, data_bufs, tid, c);
+}
+
 int FIFO::trim_part(int64_t part_num, uint64_t ofs,
                    std::optional<std::string_view> tag,
                    bool exclusive, std::uint64_t tid,
@@ -845,10 +1220,10 @@ int FIFO::trim_part(int64_t part_num, uint64_t ofs,
   return 0;
 }
 
-int FIFO::trim_part(int64_t part_num, uint64_t ofs,
-                   std::optional<std::string_view> tag,
-                   bool exclusive, std::uint64_t tid,
-                   lr::AioCompletion* c)
+void FIFO::trim_part(int64_t part_num, uint64_t ofs,
+                    std::optional<std::string_view> tag,
+                    bool exclusive, std::uint64_t tid,
+                    lr::AioCompletion* c)
 {
   ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
@@ -858,12 +1233,7 @@ int FIFO::trim_part(int64_t part_num, uint64_t ofs,
   l.unlock();
   rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
   auto r = ioctx.aio_operate(part_oid, c, &op);
-  if (r < 0) {
-    lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-              << " failed scheduling trim_part: r=" << r
-              << " tid=" << tid << dendl;
-  }
-  return r;
+  ceph_assert(r >= 0);
 }
 
 int FIFO::open(lr::IoCtx ioctx, std::string oid, std::unique_ptr<FIFO>* fifo,
@@ -960,54 +1330,42 @@ int FIFO::read_meta(optional_yield y) {
   return read_meta(tid, y);
 }
 
-struct Reader {
+struct Reader : public Completion<Reader> {
   FIFO* fifo;
   cb::list bl;
-  lr::AioCompletion* super;
   std::uint64_t tid;
-  lr::AioCompletion* cur = lr::Rados::aio_create_completion(
-    static_cast<void*>(this), &FIFO::read_callback);
   Reader(FIFO* fifo, lr::AioCompletion* super, std::uint64_t tid)
-    : fifo(fifo), super(super), tid(tid) {
-    super->pc->get();
-  }
-  ~Reader() {
-    cur->release();
-  }
-};
+    : Completion(super), fifo(fifo), tid(tid) {}
 
-void FIFO::read_callback(lr::completion_t, void* arg)
-{
-  std::unique_ptr<Reader> reader(static_cast<Reader*>(arg));
-  auto cct = reader->fifo->cct;
-  auto tid = reader->tid;
-  ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " entering: tid=" << tid << dendl;
-  auto r = reader->cur->get_return_value();
-  if (r >= 0) try {
-      fifo::op::get_meta_reply reply;
-      auto iter = reader->bl.cbegin();
-      decode(reply, iter);
-      std::unique_lock l(reader->fifo->m);
-      if (reply.info.version.same_or_later(reader->fifo->info.version)) {
-       reader->fifo->info = std::move(reply.info);
-       reader->fifo->part_header_size = reply.part_header_size;
-       reader->fifo->part_entry_overhead = reply.part_entry_overhead;
-      }
-    } catch (const cb::error& err) {
+  void handle(Ptr&& p, int r) {
+    auto cct = fifo->cct;
+    ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " entering: tid=" << tid << dendl;
+    if (r >= 0) try {
+       fifo::op::get_meta_reply reply;
+       auto iter = bl.cbegin();
+       decode(reply, iter);
+       std::unique_lock l(fifo->m);
+       if (reply.info.version.same_or_later(fifo->info.version)) {
+         fifo->info = std::move(reply.info);
+         fifo->part_header_size = reply.part_header_size;
+         fifo->part_entry_overhead = reply.part_entry_overhead;
+       }
+      } catch (const cb::error& err) {
+       lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " failed to decode response err=" << err.what()
+                  << " tid=" << tid << dendl;
+       r = from_error_code(err.code());
+      } else {
       lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " failed to decode response err=" << err.what()
+                << " read_meta failed r=" << r
                 << " tid=" << tid << dendl;
-      r = from_error_code(err.code());
-    } else {
-    lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-              << " read_meta failed r=" << r
-              << " tid=" << tid << dendl;
+    }
+    complete(std::move(p), r);
   }
-  complete(reader->super, r);
-}
+};
 
-int FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c)
+void FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c)
 {
   ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                 << " entering: tid=" << tid << dendl;
@@ -1016,16 +1374,10 @@ int FIFO::read_meta(std::uint64_t tid, lr::AioCompletion* c)
   cb::list in;
   encode(gm, in);
   auto reader = std::make_unique<Reader>(this, c, tid);
-  auto r = ioctx.aio_exec(oid, reader->cur, fifo::op::CLASS,
-                         fifo::op::GET_META, in, &reader->bl);
-  if (r < 0) {
-    lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-              << " failed scheduling read_meta r=" << r
-              << " tid=" << tid << dendl;
-  } else {
-    reader.release();
-  }
-  return r;
+  auto rp = reader.get();
+  auto r = ioctx.aio_exec(oid, Reader::call(std::move(reader)), fifo::op::CLASS,
+                         fifo::op::GET_META, in, &rp->bl);
+  assert(r >= 0);
 }
 
 const fifo::info& FIFO::meta() const {
@@ -1040,6 +1392,10 @@ int FIFO::push(const cb::list& bl, optional_yield y) {
   return push(std::vector{ bl }, y);
 }
 
+void FIFO::push(const cb::list& bl, lr::AioCompletion* c) {
+  push(std::vector{ bl }, c);
+}
+
 int FIFO::push(const std::vector<cb::list>& data_bufs, optional_yield y)
 {
   std::unique_lock l(m);
@@ -1153,24 +1509,185 @@ int FIFO::push(const std::vector<cb::list>& data_bufs, optional_yield y)
   return 0;
 }
 
-int FIFO::list(int max_entries,
-              std::optional<std::string_view> markstr,
-              std::vector<list_entry>* presult, bool* pmore,
-              optional_yield y)
-{
-  std::unique_lock l(m);
-  auto tid = ++next_tid;
-  std::int64_t part_num = info.tail_part_num;
-  l.unlock();
-  ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " entering: tid=" << tid << dendl;
-  std::uint64_t ofs = 0;
-  if (markstr) {
-    auto marker = to_marker(*markstr);
-    if (!marker) {
-      lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " invalid marker string: " << markstr
-                << " tid= "<< tid << dendl;
+struct Pusher : public Completion<Pusher> {
+  FIFO* f;
+  std::deque<cb::list> remaining;
+  std::deque<cb::list> batch;
+  int i = 0;
+  std::uint64_t tid;
+  bool new_heading = false;
+
+  void prep_then_push(Ptr&& p, const unsigned successes) {
+    std::unique_lock l(f->m);
+    auto max_part_size = f->info.params.max_part_size;
+    auto part_entry_overhead = f->part_entry_overhead;
+    l.unlock();
+
+    ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                     << " preparing push: remaining=" << remaining.size()
+                     << " batch=" << batch.size() << " i=" << i
+                     << " tid=" << tid << dendl;
+
+    uint64_t batch_len = 0;
+    if (successes > 0) {
+      if (successes == batch.size()) {
+       batch.clear();
+      } else  {
+       batch.erase(batch.begin(), batch.begin() + successes);
+       for (const auto& b : batch) {
+         batch_len +=  b.length() + part_entry_overhead;
+       }
+      }
+    }
+
+    if (batch.empty() && remaining.empty()) {
+      complete(std::move(p), 0);
+      return;
+    }
+
+    while (!remaining.empty() &&
+          (remaining.front().length() + batch_len <= max_part_size)) {
+
+      /* We can send entries with data_len up to max_entry_size,
+        however, we want to also account the overhead when
+        dealing with multiple entries. Previous check doesn't
+        account for overhead on purpose. */
+      batch_len += remaining.front().length() + part_entry_overhead;
+      batch.push_back(std::move(remaining.front()));
+      remaining.pop_front();
+    }
+    ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                     << " prepared push: remaining=" << remaining.size()
+                     << " batch=" << batch.size() << " i=" << i
+                     << " batch_len=" << batch_len
+                     << " tid=" << tid << dendl;
+    push(std::move(p));
+  }
+
+  void push(Ptr&& p) {
+    f->push_entries(batch, tid, call(std::move(p)));
+  }
+
+  void new_head(Ptr&& p) {
+    new_heading = true;
+    f->_prepare_new_head(tid, call(std::move(p)));
+  }
+
+  void handle(Ptr&& p, int r) {
+    if (!new_heading) {
+      if (r == -ERANGE) {
+       ldout(f->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " need new head tid=" << tid << dendl;
+       new_head(std::move(p));
+       return;
+      }
+      if (r < 0) {
+       lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                     << " push_entries failed: r=" << r
+                     << " tid=" << tid << dendl;
+       complete(std::move(p), r);
+       return;
+      }
+      i = 0; // We've made forward progress, so reset the race counter!
+      prep_then_push(std::move(p), r);
+    } else {
+      if (r < 0) {
+       lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                     << " prepare_new_head failed: r=" << r
+                     << " tid=" << tid << dendl;
+       complete(std::move(p), r);
+       return;
+      }
+      new_heading = false;
+      handle_new_head(std::move(p), r);
+    }
+  }
+
+  void handle_new_head(Ptr&& p, int r) {
+    if (r == -ECANCELED) {
+      if (p->i == MAX_RACE_RETRIES) {
+       lderr(f->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                     << " canceled too many times, giving up: tid=" << tid << dendl;
+       complete(std::move(p), -ECANCELED);
+       return;
+      }
+      ++p->i;
+    } else if (r) {
+      complete(std::move(p), r);
+      return;
+    }
+
+    if (p->batch.empty()) {
+      prep_then_push(std::move(p), 0);
+      return;
+    } else {
+      push(std::move(p));
+      return;
+    }
+  }
+
+  Pusher(FIFO* f, std::deque<cb::list>&& remaining,
+        std::uint64_t tid, lr::AioCompletion* super)
+    : Completion(super), f(f), remaining(std::move(remaining)),
+      tid(tid) {}
+};
+
+void FIFO::push(const std::vector<cb::list>& data_bufs,
+               lr::AioCompletion* c)
+{
+  std::unique_lock l(m);
+  auto tid = ++next_tid;
+  auto max_entry_size = info.params.max_entry_size;
+  auto need_new_head = info.need_new_head();
+  l.unlock();
+  ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                << " entering: tid=" << tid << dendl;
+  auto p = std::make_unique<Pusher>(this, std::deque<cb::list>(data_bufs.begin(), data_bufs.end()),
+                                   tid, c);
+  // Validate sizes
+  for (const auto& bl : data_bufs) {
+    if (bl.length() > max_entry_size) {
+      lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                << " entry bigger than max_entry_size tid=" << tid << dendl;
+      Pusher::complete(std::move(p), -E2BIG);
+      return;
+    }
+  }
+
+  if (data_bufs.empty() ) {
+    ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " empty push, returning success tid=" << tid << dendl;
+    Pusher::complete(std::move(p), 0);
+    return;
+  }
+
+  if (need_new_head) {
+    ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " need new head tid=" << tid << dendl;
+    p->new_head(std::move(p));
+  } else {
+    p->prep_then_push(std::move(p), 0);
+  }
+}
+
+int FIFO::list(int max_entries,
+              std::optional<std::string_view> markstr,
+              std::vector<list_entry>* presult, bool* pmore,
+              optional_yield y)
+{
+  std::unique_lock l(m);
+  auto tid = ++next_tid;
+  std::int64_t part_num = info.tail_part_num;
+  l.unlock();
+  ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                << " entering: tid=" << tid << dendl;
+  std::uint64_t ofs = 0;
+  if (markstr) {
+    auto marker = to_marker(*markstr);
+    if (!marker) {
+      lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                << " invalid marker string: " << markstr
+                << " tid= "<< tid << dendl;
       return -EINVAL;
     }
     part_num = marker->num;
@@ -1340,157 +1857,116 @@ int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y)
   return 0;
 }
 
-struct Trimmer {
+struct Trimmer : public Completion<Trimmer> {
   FIFO* fifo;
   std::int64_t part_num;
   std::uint64_t ofs;
   std::int64_t pn;
   bool exclusive;
-  lr::AioCompletion* super;
   std::uint64_t tid;
-  lr::AioCompletion* cur = lr::Rados::aio_create_completion(
-    static_cast<void*>(this), &FIFO::trim_callback);
   bool update = false;
   bool canceled = false;
   int retries = 0;
 
   Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
          bool exclusive, lr::AioCompletion* super, std::uint64_t tid)
-    : fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), exclusive(exclusive),
-      super(super), tid(tid) {
-    super->pc->get();
-  }
-  ~Trimmer() {
-    cur->release();
-  }
-};
+    : Completion(super), fifo(fifo), part_num(part_num), ofs(ofs), pn(pn),
+      exclusive(exclusive), tid(tid) {}
 
-void FIFO::trim_callback(lr::completion_t, void* arg)
-{
-  std::unique_ptr<Trimmer> trimmer(static_cast<Trimmer*>(arg));
-  auto cct = trimmer->fifo->cct;
-  auto tid = trimmer->tid;
-  ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " entering: tid=" << tid << dendl;
-  int r = trimmer->cur->get_return_value();
-  if (r == -ENOENT) {
-    r = 0;
-  }
-
-  if (r < 0) {
-    lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-              << " trim failed: r=" << r << " tid=" << tid << dendl;
-    complete(trimmer->super, r);
-    return;
-  }
-
-  if (!trimmer->update) {
+  void handle(Ptr&& p, int r) {
+    auto cct = fifo->cct;
     ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                  << " handling preceding trim callback: tid=" << tid << dendl;
-    trimmer->retries = 0;
-    if (trimmer->pn < trimmer->part_num) {
-      std::unique_lock l(trimmer->fifo->m);
-      const auto max_part_size = trimmer->fifo->info.params.max_part_size;
-      l.unlock();
-      trimmer->cur->release();
-      trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback);
-      r = trimmer->fifo->trim_part(trimmer->pn++, max_part_size, std::nullopt,
-                                  false, tid, trimmer->cur);
-      if (r < 0) {
-       lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                  << " trim failed: r=" << r << " tid=" << tid << dendl;
-       complete(trimmer->super, r);
-      } else {
-       trimmer.release();
-      }
-      return;
+                  << " entering: tid=" << tid << dendl;
+    if (r == -ENOENT) {
+      r = 0;
     }
 
-    std::unique_lock l(trimmer->fifo->m);
-    const auto tail_part_num = trimmer->fifo->info.tail_part_num;
-    l.unlock();
-    trimmer->cur->release();
-    trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback);
-    trimmer->update = true;
-    trimmer->canceled = tail_part_num < trimmer->part_num;
-    r = trimmer->fifo->trim_part(trimmer->part_num, trimmer->ofs,
-                                std::nullopt, trimmer->exclusive, tid, trimmer->cur);
     if (r < 0) {
       lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " failed scheduling trim: r=" << r << " tid=" << tid << dendl;
-      complete(trimmer->super, r);
-    } else {
-      trimmer.release();
+                << (update ? " update_meta " : " trim ") << "failed: r="
+                << r << " tid=" << tid << dendl;
+      complete(std::move(p), r);
+      return;
     }
-    return;
-  }
 
-  ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " handling update-needed callback: tid=" << tid << dendl;
-  std::unique_lock l(trimmer->fifo->m);
-  auto tail_part_num = trimmer->fifo->info.tail_part_num;
-  auto objv = trimmer->fifo->info.version;
-  l.unlock();
-  if ((tail_part_num < trimmer->part_num) &&
-      trimmer->canceled) {
-    if (trimmer->retries > MAX_RACE_RETRIES) {
-      lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " canceled too many times, giving up: tid=" << tid << dendl;
-      complete(trimmer->super, -EIO);
+    if (!update) {
+      ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                    << " handling preceding trim callback: tid=" << tid << dendl;
+      retries = 0;
+      if (pn < part_num) {
+       ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " pn=" << pn << " tid=" << tid << dendl;
+       std::unique_lock l(fifo->m);
+       const auto max_part_size = fifo->info.params.max_part_size;
+       l.unlock();
+       fifo->trim_part(pn++, max_part_size, std::nullopt,
+                       false, tid, call(std::move(p)));
+       return;
+      }
+
+      std::unique_lock l(fifo->m);
+      const auto tail_part_num = fifo->info.tail_part_num;
+      l.unlock();
+      update = true;
+      canceled = tail_part_num < part_num;
+      fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
+                     call(std::move(p)));
       return;
     }
-    trimmer->cur->release();
-    trimmer->cur = lr::Rados::aio_create_completion(arg,
-                                                   &FIFO::trim_callback);
-    ++trimmer->retries;
-    r = trimmer->fifo->_update_meta(fifo::update{}
-                                   .tail_part_num(trimmer->part_num),
-                                   objv, &trimmer->canceled,
-                                    tid, trimmer->cur);
-    if (r < 0) {
-      lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-                << " failed scheduling _update_meta: r="
-                << r << " tid=" << tid << dendl;
-      complete(trimmer->super, r);
+
+    ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " handling update-needed callback: tid=" << tid << dendl;
+    std::unique_lock l(fifo->m);
+    auto tail_part_num = fifo->info.tail_part_num;
+    auto objv = fifo->info.version;
+    l.unlock();
+    if ((tail_part_num < part_num) &&
+       canceled) {
+      if (retries > MAX_RACE_RETRIES) {
+       lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " canceled too many times, giving up: tid=" << tid << dendl;
+       complete(std::move(p), -EIO);
+       return;
+      }
+      ++retries;
+      fifo->_update_meta(fifo::update{}
+                        .tail_part_num(part_num), objv, &canceled,
+                         tid, call(std::move(p)));
     } else {
-      trimmer.release();
+      complete(std::move(p), 0);
     }
-  } else {
-    complete(trimmer->super, 0);
   }
-}
+};
 
-int FIFO::trim(std::string_view markstr, bool exclusive, lr::AioCompletion* c) {
+void FIFO::trim(std::string_view markstr, bool exclusive,
+               lr::AioCompletion* c) {
   auto marker = to_marker(markstr);
-  if (!marker) {
-    return -EINVAL;
-  }
+  auto realmark = marker.value_or(::rgw::cls::fifo::marker{});
   std::unique_lock l(m);
   const auto max_part_size = info.params.max_part_size;
   const auto pn = info.tail_part_num;
   const auto part_oid = info.part_oid(pn);
   auto tid = ++next_tid;
   l.unlock();
-  auto trimmer = std::make_unique<Trimmer>(this, marker->num, marker->ofs, pn, exclusive, c,
-                                          tid);
+  ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                << " entering: tid=" << tid << dendl;
+  auto trimmer = std::make_unique<Trimmer>(this, realmark.num, realmark.ofs,
+                                          pn, exclusive, c, tid);
+  if (!marker) {
+    Trimmer::complete(std::move(trimmer), -EINVAL);
+    return;
+  }
   ++trimmer->pn;
   auto ofs = marker->ofs;
   if (pn < marker->num) {
+    ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " pn=" << pn << " tid=" << tid << dendl;
     ofs = max_part_size;
   } else {
     trimmer->update = true;
   }
-  auto r = trim_part(pn, ofs, std::nullopt, exclusive,
-                    tid, trimmer->cur);
-  if (r < 0) {
-    lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
-              << " failed scheduling trim_part: r="
-              << r << " tid=" << tid << dendl;
-    complete(trimmer->super, r);
-  } else {
-    trimmer.release();
-  }
-  return r;
+  trim_part(pn, ofs, std::nullopt, exclusive,
+           tid, Trimmer::call(std::move(trimmer)));
 }
 
 int FIFO::get_part_info(int64_t part_num,
@@ -1509,4 +1985,521 @@ int FIFO::get_part_info(int64_t part_num,
   }
   return r;
 }
+
+void FIFO::get_part_info(int64_t part_num,
+                        fifo::part_header* header,
+                        lr::AioCompletion* c)
+{
+  std::unique_lock l(m);
+  const auto part_oid = info.part_oid(part_num);
+  auto tid = ++next_tid;
+  l.unlock();
+  auto op = rgw::cls::fifo::get_part_info(cct, header, tid);
+  auto r = ioctx.aio_operate(part_oid, c, &op, nullptr);
+  ceph_assert(r >= 0);
+}
+
+struct InfoGetter : Completion<InfoGetter> {
+  FIFO* fifo;
+  fifo::part_header header;
+  fu2::function<void(int r, fifo::part_header&&)> f;
+  std::uint64_t tid;
+  bool headerread = false;
+
+  InfoGetter(FIFO* fifo, fu2::function<void(int r, fifo::part_header&&)> f,
+            std::uint64_t tid, lr::AioCompletion* super)
+    : Completion(super), fifo(fifo), f(std::move(f)), tid(tid) {}
+  void handle(Ptr&& p, int r) {
+    if (!headerread) {
+      if (r < 0) {
+       lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " read_meta failed: r="
+                        << r << " tid=" << tid << dendl;
+       if (f)
+         f(r, {});
+       complete(std::move(p), r);
+       return;
+      }
+
+      auto info = fifo->meta();
+      auto hpn = info.head_part_num;
+      if (hpn < 0) {
+       ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                            << " no head, returning empty partinfo r="
+                            << r << " tid=" << tid << dendl;
+       if (f)
+         f(0, {});
+       complete(std::move(p), r);
+       return;
+      }
+      headerread = true;
+      auto op = rgw::cls::fifo::get_part_info(fifo->cct, &header, tid);
+      std::unique_lock l(fifo->m);
+      auto oid = fifo->info.part_oid(hpn);
+      l.unlock();
+      r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op,
+                                 nullptr);
+      ceph_assert(r >= 0);
+      return;
+    }
+
+    if (r < 0) {
+      lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " get_part_info failed: r="
+                      << r << " tid=" << tid << dendl;
+    }
+
+    if (f)
+      f(r, std::move(header));
+    complete(std::move(p), r);
+    return;
+  }
+};
+
+void FIFO::get_head_info(fu2::unique_function<void(int r,
+                                                  fifo::part_header&&)> f,
+                        lr::AioCompletion* c)
+{
+  std::unique_lock l(m);
+  auto tid = ++next_tid;
+  l.unlock();
+  auto ig = std::make_unique<InfoGetter>(this, std::move(f), tid, c);
+  read_meta(tid, InfoGetter::call(std::move(ig)));
+}
+
+struct JournalProcessor : public Completion<JournalProcessor> {
+private:
+  FIFO* const fifo;
+
+  std::vector<fifo::journal_entry> processed;
+  std::multimap<std::int64_t, fifo::journal_entry> journal;
+  std::multimap<std::int64_t, fifo::journal_entry>::iterator iter;
+  std::int64_t new_tail;
+  std::int64_t new_head;
+  std::int64_t new_max;
+  int race_retries = 0;
+  bool first_pp = true;
+  bool canceled = false;
+  std::uint64_t tid;
+
+  enum {
+    entry_callback,
+    pp_callback,
+  } state;
+
+  void create_part(Ptr&& p, int64_t part_num,
+                  std::string_view tag) {
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " entering: tid=" << tid << dendl;
+    state = entry_callback;
+    lr::ObjectWriteOperation op;
+    op.create(false); /* We don't need exclusivity, part_init ensures
+                        we're creating from the  same journal entry. */
+    std::unique_lock l(fifo->m);
+    part_init(&op, tag, fifo->info.params);
+    auto oid = fifo->info.part_oid(part_num);
+    l.unlock();
+    auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
+    ceph_assert(r >= 0);
+    return;
+  }
+
+  void remove_part(Ptr&& p, int64_t part_num,
+                  std::string_view tag) {
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " entering: tid=" << tid << dendl;
+    state = entry_callback;
+    lr::ObjectWriteOperation op;
+    op.remove();
+    std::unique_lock l(fifo->m);
+    auto oid = fifo->info.part_oid(part_num);
+    l.unlock();
+    auto r = fifo->ioctx.aio_operate(oid, call(std::move(p)), &op);
+    ceph_assert(r >= 0);
+    return;
+  }
+
+  void finish_je(Ptr&& p, int r,
+                const fifo::journal_entry& entry) {
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " entering: tid=" << tid << dendl;
+
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " finishing entry: entry=" << entry
+                        << " tid=" << tid << dendl;
+
+    if (entry.op == fifo::journal_entry::Op::remove && r == -ENOENT)
+      r = 0;
+
+    if (r < 0) {
+      lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " processing entry failed: entry=" << entry
+                      << " r=" << r << " tid=" << tid << dendl;
+      complete(std::move(p), r);
+      return;
+    } else {
+      switch (entry.op) {
+      case fifo::journal_entry::Op::unknown:
+      case fifo::journal_entry::Op::set_head:
+       // Can't happen. Filtered out in process.
+       complete(std::move(p), -EIO);
+       return;
+
+      case fifo::journal_entry::Op::create:
+       if (entry.part_num > new_max) {
+         new_max = entry.part_num;
+       }
+       break;
+      case fifo::journal_entry::Op::remove:
+       if (entry.part_num >= new_tail) {
+         new_tail = entry.part_num + 1;
+       }
+       break;
+      }
+      processed.push_back(entry);
+    }
+    ++iter;
+    process(std::move(p));
+  }
+
+  void postprocess(Ptr&& p) {
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " entering: tid=" << tid << dendl;
+    if (processed.empty()) {
+      ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                          << " nothing to update any more: race_retries="
+                          << race_retries << " tid=" << tid << dendl;
+      complete(std::move(p), 0);
+      return;
+    }
+    pp_run(std::move(p), 0, false);
+  }
+
+public:
+
+  JournalProcessor(FIFO* fifo, std::uint64_t tid, lr::AioCompletion* super)
+    : Completion(super), fifo(fifo), tid(tid) {
+    std::unique_lock l(fifo->m);
+    journal = fifo->info.journal;
+    iter = journal.begin();
+    new_tail = fifo->info.tail_part_num;
+    new_head = fifo->info.head_part_num;
+    new_max = fifo->info.max_push_part_num;
+  }
+
+  void pp_run(Ptr&& p, int r, bool canceled) {
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " entering: tid=" << tid << dendl;
+    std::optional<int64_t> tail_part_num;
+    std::optional<int64_t> head_part_num;
+    std::optional<int64_t> max_part_num;
+
+    if (r < 0) {
+      lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " failed, r=: " << r << " tid=" << tid << dendl;
+      complete(std::move(p), r);
+    }
+
+
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " postprocessing: race_retries="
+                        << race_retries << " tid=" << tid << dendl;
+
+    if (!first_pp && r == 0 && !canceled) {
+      ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                          << " nothing to update any more: race_retries="
+                          << race_retries << " tid=" << tid << dendl;
+      complete(std::move(p), 0);
+      return;
+    }
+
+    first_pp = false;
+
+    if (canceled) {
+      if (race_retries >= MAX_RACE_RETRIES) {
+       lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " canceled too many times, giving up: tid="
+                        << tid << dendl;
+       complete(std::move(p), -ECANCELED);
+       return;
+      }
+      ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                          << " update canceled, retrying: race_retries="
+                          << race_retries << " tid=" << tid << dendl;
+
+      ++race_retries;
+
+      std::vector<fifo::journal_entry> new_processed;
+      std::unique_lock l(fifo->m);
+      for (auto& e : processed) {
+       auto jiter = fifo->info.journal.find(e.part_num);
+       /* journal entry was already processed */
+       if (jiter == fifo->info.journal.end() ||
+           !(jiter->second == e)) {
+         continue;
+       }
+       new_processed.push_back(e);
+      }
+      processed = std::move(new_processed);
+    }
+
+    std::unique_lock l(fifo->m);
+    auto objv = fifo->info.version;
+    if (new_tail > fifo->info.tail_part_num) {
+      tail_part_num = new_tail;
+    }
+
+    if (new_head > fifo->info.head_part_num) {
+      head_part_num = new_head;
+    }
+
+    if (new_max > fifo->info.max_push_part_num) {
+      max_part_num = new_max;
+    }
+    l.unlock();
+
+    if (processed.empty() &&
+       !tail_part_num &&
+       !max_part_num) {
+      /* nothing to update anymore */
+      ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                          << " nothing to update any more: race_retries="
+                          << race_retries << " tid=" << tid << dendl;
+      complete(std::move(p), 0);
+      return;
+    }
+    state = pp_callback;
+    fifo->_update_meta(fifo::update{}
+                      .tail_part_num(tail_part_num)
+                      .head_part_num(head_part_num)
+                      .max_push_part_num(max_part_num)
+                      .journal_entries_rm(processed),
+                       objv, &this->canceled, tid, call(std::move(p)));
+    return;
+  }
+
+  JournalProcessor(const JournalProcessor&) = delete;
+  JournalProcessor& operator =(const JournalProcessor&) = delete;
+  JournalProcessor(JournalProcessor&&) = delete;
+  JournalProcessor& operator =(JournalProcessor&&) = delete;
+
+  void process(Ptr&& p) {
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " entering: tid=" << tid << dendl;
+    while (iter != journal.end()) {
+      ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                          << " processing entry: entry=" << *iter
+                          << " tid=" << tid << dendl;
+      const auto entry = iter->second;
+      switch (entry.op) {
+      case fifo::journal_entry::Op::create:
+       create_part(std::move(p), entry.part_num, entry.part_tag);
+       return;
+      case fifo::journal_entry::Op::set_head:
+       if (entry.part_num > new_head) {
+         new_head = entry.part_num;
+       }
+       processed.push_back(entry);
+       ++iter;
+       continue;
+      case fifo::journal_entry::Op::remove:
+       remove_part(std::move(p), entry.part_num, entry.part_tag);
+       return;
+      default:
+       lderr(fifo->cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " unknown journaled op: entry=" << entry << " tid="
+                        << tid << dendl;
+       complete(std::move(p), -EIO);
+       return;
+      }
+    }
+    postprocess(std::move(p));
+    return;
+  }
+
+  void handle(Ptr&& p, int r) {
+    ldout(fifo->cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                        << " entering: tid=" << tid << dendl;
+    switch (state) {
+    case entry_callback:
+      finish_je(std::move(p), r, iter->second);
+      return;
+    case pp_callback:
+      auto c = canceled;
+      canceled = false;
+      pp_run(std::move(p), r, c);
+      return;
+    }
+
+    abort();
+  }
+
+};
+
+void FIFO::process_journal(std::uint64_t tid, lr::AioCompletion* c) {
+  auto p = std::make_unique<JournalProcessor>(this, tid, c);
+  p->process(std::move(p));
+}
+
+struct Lister : Completion<Lister> {
+  FIFO* f;
+  std::vector<list_entry> result;
+  bool more = false;
+  std::int64_t part_num;
+  std::uint64_t ofs;
+  int max_entries;
+  int r_out = 0;
+  std::vector<fifo::part_list_entry> entries;
+  bool part_more = false;
+  bool part_full = false;
+  std::vector<list_entry>* entries_out;
+  bool* more_out;
+  std::uint64_t tid;
+
+  bool read = false;
+
+  void complete(Ptr&& p, int r) {
+    if (r >= 0) {
+      if (more_out) *more_out = more;
+      if (entries_out) *entries_out = std::move(result);
+    }
+    Completion::complete(std::move(p), r);
+  }
+
+public:
+  Lister(FIFO* f, std::int64_t part_num, std::uint64_t ofs, int max_entries,
+        std::vector<list_entry>* entries_out, bool* more_out,
+        std::uint64_t tid, lr::AioCompletion* super)
+    : Completion(super), f(f), part_num(part_num), ofs(ofs), max_entries(max_entries),
+      entries_out(entries_out), more_out(more_out), tid(tid) {
+    result.reserve(max_entries);
+  }
+
+  Lister(const Lister&) = delete;
+  Lister& operator =(const Lister&) = delete;
+  Lister(Lister&&) = delete;
+  Lister& operator =(Lister&&) = delete;
+
+  void handle(Ptr&& p, int r) {
+    if (read)
+      handle_read(std::move(p), r);
+    else
+      handle_list(std::move(p), r);
+  }
+
+  void list(Ptr&& p) {
+    if (max_entries > 0) {
+      part_more = false;
+      part_full = false;
+      entries.clear();
+
+      std::unique_lock l(f->m);
+      auto part_oid = f->info.part_oid(part_num);
+      l.unlock();
+
+      read = false;
+      auto op = list_part(f->cct, {}, ofs, max_entries, &r_out,
+                         &entries, &part_more, &part_full,
+                         nullptr, tid);
+      f->ioctx.aio_operate(part_oid, call(std::move(p)), &op, nullptr);
+    } else {
+      complete(std::move(p), 0);
+    }
+  }
+
+  void handle_read(Ptr&& p, int r) {
+    read = false;
+    if (r >= 0) r = r_out;
+    r_out = 0;
+
+    if (r < 0) {
+      complete(std::move(p), r);
+      return;
+    }
+
+    if (part_num < f->info.tail_part_num) {
+      /* raced with trim? restart */
+      max_entries += result.size();
+      result.clear();
+      part_num = f->info.tail_part_num;
+      ofs = 0;
+      list(std::move(p));
+      return;
+    }
+    /* assuming part was not written yet, so end of data */
+    more = false;
+    complete(std::move(p), 0);
+    return;
+  }
+
+  void handle_list(Ptr&& p, int r) {
+    if (r >= 0) r = r_out;
+    r_out = 0;
+    std::unique_lock l(f->m);
+    auto part_oid = f->info.part_oid(part_num);
+    l.unlock();
+    if (r == -ENOENT) {
+      read = true;
+      f->read_meta(tid, call(std::move(p)));
+      return;
+    }
+    if (r < 0) {
+      complete(std::move(p), r);
+      return;
+    }
+
+    more = part_full || part_more;
+    for (auto& entry : entries) {
+      list_entry e;
+      e.data = std::move(entry.data);
+      e.marker = marker{part_num, entry.ofs}.to_string();
+      e.mtime = entry.mtime;
+      result.push_back(std::move(e));
+    }
+    max_entries -= entries.size();
+    entries.clear();
+    if (max_entries > 0 && part_more) {
+      list(std::move(p));
+      return;
+    }
+
+    if (!part_full) { /* head part is not full */
+      complete(std::move(p), 0);
+      return;
+    }
+    ++part_num;
+    ofs = 0;
+    list(std::move(p));
+  }
+};
+
+void FIFO::list(int max_entries,
+               std::optional<std::string_view> markstr,
+               std::vector<list_entry>* out,
+               bool* more,
+               lr::AioCompletion* c) {
+  std::unique_lock l(m);
+  auto tid = ++next_tid;
+  std::int64_t part_num = info.tail_part_num;
+  l.unlock();
+  std::uint64_t ofs = 0;
+  std::optional<::rgw::cls::fifo::marker> marker;
+
+  if (markstr) {
+    marker = to_marker(*markstr);
+    if (marker) {
+      part_num = marker->num;
+      ofs = marker->ofs;
+    }
+  }
+
+  auto ls = std::make_unique<Lister>(this, part_num, ofs, max_entries, out,
+                                    more, tid, c);
+  if (markstr && !marker) {
+    auto l = ls.get();
+    l->complete(std::move(ls), -EINVAL);
+  } else {
+    ls->list(std::move(ls));
+  }
+}
 }
index 1f8d3f3fc95d889fee46f705c23d19db9c7cb68a..b6b5f04bb30ad4318de09d41673f9c437c994aea 100644 (file)
@@ -31,6 +31,7 @@
 
 #include "include/rados/librados.hpp"
 #include "include/buffer.h"
+#include "include/function2.hpp"
 
 #include "common/async/yield_context.h"
 
@@ -57,24 +58,6 @@ int get_meta(lr::IoCtx& ioctx, const std::string& oid,
             std::uint32_t* part_entry_overhead,
             std::uint64_t tid, optional_yield y,
             bool probe = false);
-void update_meta(lr::ObjectWriteOperation* op, const fifo::objv& objv,
-                const fifo::update& update);
-void part_init(lr::ObjectWriteOperation* op, std::string_view tag,
-              fifo::data_params params);
-int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
-             std::deque<cb::list> data_bufs, std::uint64_t tid, optional_yield y);
-void trim_part(lr::ObjectWriteOperation* op,
-              std::optional<std::string_view> tag, std::uint64_t ofs,
-              bool exclusive);
-int list_part(lr::IoCtx& ioctx, const std::string& oid,
-             std::optional<std::string_view> tag, std::uint64_t ofs,
-             std::uint64_t max_entries,
-             std::vector<fifo::part_list_entry>* entries,
-             bool* more, bool* full_part, std::string* ptag,
-             std::uint64_t tid, optional_yield y);
-int get_part_info(lr::IoCtx& ioctx, const std::string& oid,
-                 fifo::part_header* header, std::uint64_t,
-                 optional_yield y);
 
 struct marker {
   std::int64_t num = 0;
@@ -117,6 +100,12 @@ class FIFO {
   friend struct Reader;
   friend struct Updater;
   friend struct Trimmer;
+  friend struct InfoGetter;
+  friend struct Pusher;
+  friend struct NewPartPreparer;
+  friend struct NewHeadPreparer;
+  friend struct JournalProcessor;
+  friend struct Lister;
 
   mutable lr::IoCtx ioctx;
   CephContext* cct = static_cast<CephContext*>(ioctx.cct());
@@ -144,32 +133,34 @@ class FIFO {
   int _update_meta(const fifo::update& update,
                   fifo::objv version, bool* pcanceled,
                   std::uint64_t tid, optional_yield y);
-  int _update_meta(const fifo::update& update,
-                  fifo::objv version, bool* pcanceled,
-                  std::uint64_t tid, lr::AioCompletion* c);
+  void _update_meta(const fifo::update& update,
+                   fifo::objv version, bool* pcanceled,
+                   std::uint64_t tid, lr::AioCompletion* c);
   int create_part(int64_t part_num, std::string_view tag, std::uint64_t tid,
                  optional_yield y);
   int remove_part(int64_t part_num, std::string_view tag, std::uint64_t tid,
                  optional_yield y);
   int process_journal(std::uint64_t tid, optional_yield y);
+  void process_journal(std::uint64_t tid, lr::AioCompletion* c);
   int _prepare_new_part(bool is_head, std::uint64_t tid, optional_yield y);
+  void _prepare_new_part(bool is_head, std::uint64_t tid, lr::AioCompletion* c);
   int _prepare_new_head(std::uint64_t tid, optional_yield y);
+  void _prepare_new_head(std::uint64_t tid, lr::AioCompletion* c);
   int push_entries(const std::deque<cb::list>& data_bufs,
                   std::uint64_t tid, optional_yield y);
+  void push_entries(const std::deque<cb::list>& data_bufs,
+                   std::uint64_t tid, lr::AioCompletion* c);
   int trim_part(int64_t part_num, uint64_t ofs,
                std::optional<std::string_view> tag, bool exclusive,
                std::uint64_t tid, optional_yield y);
-  int trim_part(int64_t part_num, uint64_t ofs,
-               std::optional<std::string_view> tag, bool exclusive,
-               std::uint64_t tid, lr::AioCompletion* c);
+  void trim_part(int64_t part_num, uint64_t ofs,
+                std::optional<std::string_view> tag, bool exclusive,
+                std::uint64_t tid, lr::AioCompletion* c);
 
-  static void trim_callback(lr::completion_t, void* arg);
-  static void update_callback(lr::completion_t, void* arg);
-  static void read_callback(lr::completion_t, void* arg);
   /// Force refresh of metadata, yielding/blocking style
   int read_meta(std::uint64_t tid, optional_yield y);
   /// Force refresh of metadata, with a librados Completion
-  int read_meta(std::uint64_t tid, lr::AioCompletion* c);
+  void read_meta(std::uint64_t tid, lr::AioCompletion* c);
 
 public:
 
@@ -215,12 +206,20 @@ public:
   int push(const cb::list& bl, //< Entry to push
           optional_yield y //< Optional yield
     );
-  /// Push entres to the FIFO
+  /// Push an entry to the FIFO
+  void push(const cb::list& bl, //< Entry to push
+           lr::AioCompletion* c //< Async Completion
+    );
+  /// Push entries to the FIFO
   int push(const std::vector<cb::list>& data_bufs, //< Entries to push
-          /// Optional yield
-          optional_yield y);
+          optional_yield y //< Optional yield
+    );
+  /// Push entries to the FIFO
+  void push(const std::vector<cb::list>& data_bufs, //< Entries to push
+           lr::AioCompletion* c //< Async Completion
+    );
   /// List entries
-  int list(int max_entries, /// Maximum entries to list
+  int list(int max_entries, //< Maximum entries to list
           /// Point after which to begin listing. Start at tail if null
           std::optional<std::string_view> markstr,
           std::vector<list_entry>* out, //< OUT: entries
@@ -228,6 +227,14 @@ public:
           bool* more,
           optional_yield y //< Optional yield
     );
+  void list(int max_entries, //< Maximum entries to list
+           /// Point after which to begin listing. Start at tail if null
+           std::optional<std::string_view> markstr,
+           std::vector<list_entry>* out, //< OUT: entries
+           /// OUT: True if more entries in FIFO beyond the last returned
+           bool* more,
+           lr::AioCompletion* c //< Async Completion
+    );
   /// Trim entries, coroutine/block style
   int trim(std::string_view markstr, //< Position to which to trim, inclusive
           bool exclusive, //< If true, do not trim the target entry
@@ -235,16 +242,28 @@ public:
           optional_yield y //< Optional yield
     );
   /// Trim entries, librados AioCompletion style
-  int trim(std::string_view markstr, //< Position to which to trim, inclusive
-          bool exclusive, //< If true, do not trim the target entry
-                          //< itself, just all those before it.
-          lr::AioCompletion* c //< librados AIO Completion
+  void trim(std::string_view markstr, //< Position to which to trim, inclusive
+           bool exclusive, //< If true, do not trim the target entry
+                           //< itself, just all those before it.
+           lr::AioCompletion* c //< librados AIO Completion
     );
   /// Get part info
   int get_part_info(int64_t part_num, /// Part number
                    fifo::part_header* header, //< OUT: Information
                    optional_yield y //< Optional yield
     );
+  /// Get part info
+  void get_part_info(int64_t part_num, //< Part number
+                   fifo::part_header* header, //< OUT: Information
+                   lr::AioCompletion* c //< AIO Completion
+    );
+  /// A convenience method to fetch the part information for the FIFO
+  /// head, using librados::AioCompletion, since
+  /// libradio::AioCompletions compose lousily.
+  void get_head_info(fu2::unique_function< //< Function to receive info
+                      void(int r, fifo::part_header&&)>,
+                    lr::AioCompletion* c //< AIO Completion
+    );
 };
 }
 
index a875d075ecade3aa1b85c2aba1b49697b1a4e893..8142b26e01a8bdce0804e9f4ce3a613835d166f4 100644 (file)
@@ -469,12 +469,7 @@ public:
       pc->cond.notify_all();
       pc->put_unlock();
     } else {
-      r = fifos[index]->trim(marker, false, c);
-      if (r < 0) {
-       lderr(cct) << __PRETTY_FUNCTION__
-                  << ": unable to trim FIFO: " << get_oid(index)
-                  << ": " << cpp_strerror(-r) << dendl;
-      }
+      fifos[index]->trim(marker, false, c);
     }
     return r;
   }
index dae4980f8dca4fd28baab5315af7277b425cacb1..69cee5a887405ff3abeda0c00c9007ce56dd8426 100644 (file)
@@ -69,6 +69,8 @@ protected:
 };
 
 using LegacyClsFIFO = LegacyFIFO;
+using AioLegacyFIFO = LegacyFIFO;
+
 
 TEST_F(LegacyClsFIFO, TestCreate)
 {
@@ -577,8 +579,7 @@ TEST_F(LegacyFIFO, TestAioTrim)
     marker = result.front().marker;
     std::unique_ptr<R::AioCompletion> c(rados.aio_create_completion(nullptr,
                                                                    nullptr));
-    r = f->trim(*marker, false, c.get());
-    ASSERT_EQ(0, r);
+    f->trim(*marker, false, c.get());
     c->wait_for_complete();
     r = c->get_return_value();
     ASSERT_EQ(0, r);
@@ -645,3 +646,482 @@ TEST_F(LegacyFIFO, TestTrimExclusive) {
   ASSERT_EQ(result.size(), 1);
   ASSERT_EQ(max_entries - 1, val);
 }
+
+TEST_F(AioLegacyFIFO, TestPushListTrim)
+{
+  std::unique_ptr<RCf::FIFO> f;
+  auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
+  ASSERT_EQ(0, r);
+  static constexpr auto max_entries = 10u;
+  for (uint32_t i = 0; i < max_entries; ++i) {
+    cb::list bl;
+    encode(i, bl);
+    auto c = R::Rados::aio_create_completion();
+    f->push(bl, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    ASSERT_EQ(0, r);
+  }
+
+  std::optional<std::string> marker;
+  /* get entries one by one */
+  std::vector<RCf::list_entry> result;
+  bool more = false;
+  for (auto i = 0u; i < max_entries; ++i) {
+    auto c = R::Rados::aio_create_completion();
+    f->list(1, marker, &result, &more, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    ASSERT_EQ(0, r);
+
+    bool expected_more = (i != (max_entries - 1));
+    ASSERT_EQ(expected_more, more);
+    ASSERT_EQ(1, result.size());
+
+    std::uint32_t val;
+    std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
+
+    ASSERT_EQ(i, val);
+    result.clear();
+  }
+
+  /* get all entries at once */
+  std::string markers[max_entries];
+  std::uint32_t min_entry = 0;
+  auto c = R::Rados::aio_create_completion();
+  f->list(max_entries * 10, std::nullopt, &result, &more, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+
+  ASSERT_FALSE(more);
+  ASSERT_EQ(max_entries, result.size());
+  for (auto i = 0u; i < max_entries; ++i) {
+    std::uint32_t val;
+    std::tie(val, markers[i]) = decode_entry<std::uint32_t>(result[i]);
+    ASSERT_EQ(i, val);
+  }
+
+  /* trim one entry */
+  c = R::Rados::aio_create_completion();
+  f->trim(markers[min_entry], false, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+  ++min_entry;
+
+  c = R::Rados::aio_create_completion();
+  f->list(max_entries * 10, std::nullopt, &result, &more, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+  ASSERT_FALSE(more);
+  ASSERT_EQ(max_entries - min_entry, result.size());
+
+  for (auto i = min_entry; i < max_entries; ++i) {
+    std::uint32_t val;
+    std::tie(val, markers[i - min_entry]) =
+      decode_entry<std::uint32_t>(result[i - min_entry]);
+    EXPECT_EQ(i, val);
+  }
+}
+
+
+TEST_F(AioLegacyFIFO, TestPushTooBig)
+{
+  static constexpr auto max_part_size = 2048ull;
+  static constexpr auto max_entry_size = 128ull;
+
+  std::unique_ptr<RCf::FIFO> f;
+  auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
+                            std::nullopt, false, max_part_size, max_entry_size);
+  ASSERT_EQ(0, r);
+
+  char buf[max_entry_size + 1];
+  memset(buf, 0, sizeof(buf));
+
+  cb::list bl;
+  bl.append(buf, sizeof(buf));
+
+  auto c = R::Rados::aio_create_completion();
+  f->push(bl, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  ASSERT_EQ(-E2BIG, r);
+  c->release();
+
+  c = R::Rados::aio_create_completion();
+  f->push(std::vector<cb::list>{}, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  EXPECT_EQ(0, r);
+}
+
+
+TEST_F(AioLegacyFIFO, TestMultipleParts)
+{
+  static constexpr auto max_part_size = 2048ull;
+  static constexpr auto max_entry_size = 128ull;
+  std::unique_ptr<RCf::FIFO> f;
+  auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
+                            std::nullopt, false, max_part_size,
+                            max_entry_size);
+  ASSERT_EQ(0, r);
+
+  {
+    auto c = R::Rados::aio_create_completion();
+    f->get_head_info([&](int r, RCf::part_info&& p) {
+      ASSERT_TRUE(p.tag.empty());
+      ASSERT_EQ(0, p.magic);
+      ASSERT_EQ(0, p.min_ofs);
+      ASSERT_EQ(0, p.last_ofs);
+      ASSERT_EQ(0, p.next_ofs);
+      ASSERT_EQ(0, p.min_index);
+      ASSERT_EQ(0, p.max_index);
+      ASSERT_EQ(ceph::real_time{}, p.max_time);
+    }, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+  }
+
+  char buf[max_entry_size];
+  memset(buf, 0, sizeof(buf));
+  const auto [part_header_size, part_entry_overhead] =
+    f->get_part_layout_info();
+  const auto entries_per_part = ((max_part_size - part_header_size) /
+                                (max_entry_size + part_entry_overhead));
+  const auto max_entries = entries_per_part * 4 + 1;
+  /* push enough entries */
+  for (auto i = 0u; i < max_entries; ++i) {
+    cb::list bl;
+    *(int *)buf = i;
+    bl.append(buf, sizeof(buf));
+    auto c = R::Rados::aio_create_completion();
+    f->push(bl, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    EXPECT_EQ(0, r);
+  }
+
+  auto info = f->meta();
+  ASSERT_EQ(info.id, fifo_id);
+  /* head should have advanced */
+  ASSERT_GT(info.head_part_num, 0);
+
+  /* list all at once */
+  std::vector<RCf::list_entry> result;
+  bool more = false;
+  auto c = R::Rados::aio_create_completion();
+  f->list(max_entries, std::nullopt, &result, &more, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  EXPECT_EQ(0, r);
+  EXPECT_EQ(false, more);
+  ASSERT_EQ(max_entries, result.size());
+
+  for (auto i = 0u; i < max_entries; ++i) {
+    auto& bl = result[i].data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+
+  std::optional<std::string> marker;
+  /* get entries one by one */
+
+  for (auto i = 0u; i < max_entries; ++i) {
+    c = R::Rados::aio_create_completion();
+    f->list(1, marker, &result, &more, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    EXPECT_EQ(0, r);
+    ASSERT_EQ(result.size(), 1);
+    const bool expected_more = (i != (max_entries - 1));
+    ASSERT_EQ(expected_more, more);
+
+    std::uint32_t val;
+    std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
+
+    auto& entry = result.front();
+    auto& bl = entry.data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+    marker = entry.marker;
+  }
+
+  /* trim one at a time */
+  marker.reset();
+  for (auto i = 0u; i < max_entries; ++i) {
+    /* read single entry */
+    c = R::Rados::aio_create_completion();
+    f->list(1, marker, &result, &more, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    EXPECT_EQ(0, r);
+    ASSERT_EQ(result.size(), 1);
+    const bool expected_more = (i != (max_entries - 1));
+    ASSERT_EQ(expected_more, more);
+
+    marker = result.front().marker;
+    c = R::Rados::aio_create_completion();
+    f->trim(*marker, false, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    EXPECT_EQ(0, r);
+    ASSERT_EQ(result.size(), 1);
+
+    /* check tail */
+    info = f->meta();
+    ASSERT_EQ(info.tail_part_num, i / entries_per_part);
+
+    /* try to read all again, see how many entries left */
+    c = R::Rados::aio_create_completion();
+    f->list(max_entries, marker, &result, &more, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    EXPECT_EQ(0, r);
+    ASSERT_EQ(max_entries - i - 1, result.size());
+    ASSERT_EQ(false, more);
+  }
+
+  /* tail now should point at head */
+  info = f->meta();
+  ASSERT_EQ(info.head_part_num, info.tail_part_num);
+
+  /* check old tails are removed */
+  for (auto i = 0; i < info.tail_part_num; ++i) {
+    c = R::Rados::aio_create_completion();
+    RCf::part_info partinfo;
+    f->get_part_info(i, &partinfo, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    ASSERT_EQ(-ENOENT, r);
+  }
+  /* check current tail exists */
+  std::uint64_t next_ofs;
+  {
+    c = R::Rados::aio_create_completion();
+    RCf::part_info partinfo;
+    f->get_part_info(info.tail_part_num, &partinfo, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    next_ofs = partinfo.next_ofs;
+  }
+  ASSERT_EQ(0, r);
+
+  c = R::Rados::aio_create_completion();
+  f->get_head_info([&](int r, RCf::part_info&& p) {
+    ASSERT_EQ(next_ofs, p.next_ofs);
+  }, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+}
+
+TEST_F(AioLegacyFIFO, TestTwoPushers)
+{
+  static constexpr auto max_part_size = 2048ull;
+  static constexpr auto max_entry_size = 128ull;
+
+  std::unique_ptr<RCf::FIFO> f;
+  auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
+                            std::nullopt, false, max_part_size,
+                            max_entry_size);
+  ASSERT_EQ(0, r);
+  char buf[max_entry_size];
+  memset(buf, 0, sizeof(buf));
+
+  auto [part_header_size, part_entry_overhead] = f->get_part_layout_info();
+  const auto entries_per_part = ((max_part_size - part_header_size) /
+                                (max_entry_size + part_entry_overhead));
+  const auto max_entries = entries_per_part * 4 + 1;
+  std::unique_ptr<RCf::FIFO> f2;
+  r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield);
+  std::vector fifos{&f, &f2};
+
+  for (auto i = 0u; i < max_entries; ++i) {
+    cb::list bl;
+    *(int *)buf = i;
+    bl.append(buf, sizeof(buf));
+    auto& f = *fifos[i % fifos.size()];
+    auto c = R::Rados::aio_create_completion();
+    f->push(bl, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    ASSERT_EQ(0, r);
+  }
+
+  /* list all by both */
+  std::vector<RCf::list_entry> result;
+  bool more = false;
+  auto c = R::Rados::aio_create_completion();
+  f2->list(max_entries, std::nullopt, &result, &more, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+  ASSERT_EQ(false, more);
+  ASSERT_EQ(max_entries, result.size());
+
+  c = R::Rados::aio_create_completion();
+  f2->list(max_entries, std::nullopt, &result, &more, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+  ASSERT_EQ(false, more);
+  ASSERT_EQ(max_entries, result.size());
+
+  for (auto i = 0u; i < max_entries; ++i) {
+    auto& bl = result[i].data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+}
+
+TEST_F(AioLegacyFIFO, TestTwoPushersTrim)
+{
+  static constexpr auto max_part_size = 2048ull;
+  static constexpr auto max_entry_size = 128ull;
+  std::unique_ptr<RCf::FIFO> f1;
+  auto r = RCf::FIFO::create(ioctx, fifo_id, &f1, null_yield, std::nullopt,
+                            std::nullopt, false, max_part_size,
+                            max_entry_size);
+  ASSERT_EQ(0, r);
+
+  char buf[max_entry_size];
+  memset(buf, 0, sizeof(buf));
+
+  auto [part_header_size, part_entry_overhead] = f1->get_part_layout_info();
+  const auto entries_per_part = ((max_part_size - part_header_size) /
+                                (max_entry_size + part_entry_overhead));
+  const auto max_entries = entries_per_part * 4 + 1;
+
+  std::unique_ptr<RCf::FIFO> f2;
+  r = RCf::FIFO::open(ioctx, fifo_id, &f2, null_yield);
+  ASSERT_EQ(0, r);
+
+  /* push one entry to f2 and the rest to f1 */
+  for (auto i = 0u; i < max_entries; ++i) {
+    cb::list bl;
+    *(int *)buf = i;
+    bl.append(buf, sizeof(buf));
+    auto& f = (i < 1 ? f2 : f1);
+    auto c = R::Rados::aio_create_completion();
+    f->push(bl, c);
+    c->wait_for_complete();
+    r = c->get_return_value();
+    c->release();
+    ASSERT_EQ(0, r);
+  }
+
+  /* trim half by fifo1 */
+  auto num = max_entries / 2;
+  std::string marker;
+  std::vector<RCf::list_entry> result;
+  bool more = false;
+  auto c = R::Rados::aio_create_completion();
+  f1->list(num, std::nullopt, &result, &more, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+  ASSERT_EQ(true, more);
+  ASSERT_EQ(num, result.size());
+
+  for (auto i = 0u; i < num; ++i) {
+    auto& bl = result[i].data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+
+  auto& entry = result[num - 1];
+  marker = entry.marker;
+  c = R::Rados::aio_create_completion();
+  f1->trim(marker, false, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+  /* list what's left by fifo2 */
+
+  const auto left = max_entries - num;
+  c = R::Rados::aio_create_completion();
+  f2->list(left, marker, &result, &more, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+  ASSERT_EQ(left, result.size());
+  ASSERT_EQ(false, more);
+
+  for (auto i = num; i < max_entries; ++i) {
+    auto& bl = result[i - num].data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+}
+
+TEST_F(AioLegacyFIFO, TestPushBatch)
+{
+  static constexpr auto max_part_size = 2048ull;
+  static constexpr auto max_entry_size = 128ull;
+
+  std::unique_ptr<RCf::FIFO> f;
+  auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield, std::nullopt,
+                            std::nullopt, false, max_part_size,
+                            max_entry_size);
+  ASSERT_EQ(0, r);
+
+  char buf[max_entry_size];
+  memset(buf, 0, sizeof(buf));
+  auto [part_header_size, part_entry_overhead] = f->get_part_layout_info();
+  auto entries_per_part = ((max_part_size - part_header_size) /
+                          (max_entry_size + part_entry_overhead));
+  auto max_entries = entries_per_part * 4 + 1; /* enough entries to span multiple parts */
+  std::vector<cb::list> bufs;
+  for (auto i = 0u; i < max_entries; ++i) {
+    cb::list bl;
+    *(int *)buf = i;
+    bl.append(buf, sizeof(buf));
+    bufs.push_back(bl);
+  }
+  ASSERT_EQ(max_entries, bufs.size());
+
+  auto c = R::Rados::aio_create_completion();
+  f->push(bufs, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+
+  /* list all */
+
+  std::vector<RCf::list_entry> result;
+  bool more = false;
+  c = R::Rados::aio_create_completion();
+  f->list(max_entries, std::nullopt, &result, &more, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(0, r);
+  ASSERT_EQ(false, more);
+  ASSERT_EQ(max_entries, result.size());
+  for (auto i = 0u; i < max_entries; ++i) {
+    auto& bl = result[i].data;
+    ASSERT_EQ(i, *(int *)bl.c_str());
+  }
+  auto& info = f->meta();
+  ASSERT_EQ(info.head_part_num, 4);
+}