]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rgw: Clamp FIFO trim to head
authorAdam C. Emerson <aemerson@redhat.com>
Tue, 26 Jan 2021 06:27:24 +0000 (01:27 -0500)
committerAdam C. Emerson <aemerson@redhat.com>
Mon, 5 Apr 2021 17:45:08 +0000 (13:45 -0400)
Don't try to trim a bunch of parts that don't exist.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
(cherry picked from commit 60b729e32602b7401e15957cef976386281c4ccb)
Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/rgw/cls_fifo_legacy.cc
src/test/rgw/test_cls_fifo_legacy.cc

index 3ddb2578d35410232285d6bdd6d0d6dbf9d7f8db..45a3ad505146a62fe3250f86e62044bb98dfb46a 100644 (file)
@@ -1701,6 +1701,7 @@ int FIFO::list(int max_entries,
 
 int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y)
 {
+  bool overshoot = false;
   auto marker = to_marker(markstr);
   if (!marker) {
     return -EINVAL;
@@ -1709,6 +1710,25 @@ int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y)
   auto ofs = marker->ofs;
   std::unique_lock l(m);
   auto tid = ++next_tid;
+  auto hn = info.head_part_num;
+  const auto max_part_size = info.params.max_part_size;
+  if (part_num > hn) {
+    l.unlock();
+    auto r = read_meta(tid, y);
+    if (r < 0) {
+      return r;
+    }
+    l.lock();
+    auto hn = info.head_part_num;
+    if (part_num > hn) {
+      overshoot = true;
+      part_num = hn;
+      ofs = max_part_size;
+    }
+  }
+  if (part_num < info.tail_part_num) {
+    return -ENODATA;
+  }
   auto pn = info.tail_part_num;
   l.unlock();
   ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
@@ -1719,7 +1739,6 @@ int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y)
     ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " pn=" << pn << " tid=" << tid << dendl;
     std::unique_lock l(m);
-    auto max_part_size = info.params.max_part_size;
     l.unlock();
     r = trim_part(pn, max_part_size, std::nullopt, false, tid, y);
     if (r < 0 && r == -ENOENT) {
@@ -1771,7 +1790,7 @@ int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y)
               << " canceled too many times, giving up: tid=" << tid << dendl;
     return -EIO;
   }
-  return 0;
+  return overshoot ? -ENODATA : 0;
 }
 
 struct Trimmer : public Completion<Trimmer> {
@@ -1782,7 +1801,9 @@ struct Trimmer : public Completion<Trimmer> {
   bool exclusive;
   std::uint64_t tid;
   bool update = false;
+  bool reread = false;
   bool canceled = false;
+  bool overshoot = false;
   int retries = 0;
 
   Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
@@ -1794,6 +1815,45 @@ struct Trimmer : public Completion<Trimmer> {
     auto cct = fifo->cct;
     ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " entering: tid=" << tid << dendl;
+
+    if (reread) {
+      reread = false;
+      if (r < 0) {
+       lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                  << " read_meta failed: r="
+                  << r << " tid=" << tid << dendl;
+       complete(std::move(p), r);
+       return;
+      }
+      std::unique_lock l(fifo->m);
+      auto hn = fifo->info.head_part_num;
+      const auto max_part_size = fifo->info.params.max_part_size;
+      const auto tail_part_num = fifo->info.tail_part_num;
+      l.unlock();
+      if (part_num > hn) {
+       part_num = hn;
+       ofs = max_part_size;
+       overshoot = true;
+      }
+      if (part_num < tail_part_num) {
+       complete(std::move(p), -ENODATA);
+       return;
+      }
+      pn = tail_part_num;
+      if (pn < part_num) {
+       ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+                      << " pn=" << pn << " tid=" << tid << dendl;
+       fifo->trim_part(pn++, max_part_size, std::nullopt,
+                       false, tid, call(std::move(p)));
+      } else {
+       update = true;
+       canceled = tail_part_num < part_num;
+       fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
+                       call(std::move(p)));
+      }
+      return;
+    }
+
     if (r == -ENOENT) {
       r = 0;
     }
@@ -1850,7 +1910,7 @@ struct Trimmer : public Completion<Trimmer> {
                         .tail_part_num(part_num), objv, &canceled,
                          tid, call(std::move(p)));
     } else {
-      complete(std::move(p), 0);
+      complete(std::move(p), overshoot ? -ENODATA : 0);
     }
   }
 };
@@ -1860,6 +1920,7 @@ void FIFO::trim(std::string_view markstr, bool exclusive,
   auto marker = to_marker(markstr);
   auto realmark = marker.value_or(::rgw::cls::fifo::marker{});
   std::unique_lock l(m);
+  const auto hn = info.head_part_num;
   const auto max_part_size = info.params.max_part_size;
   const auto pn = info.tail_part_num;
   const auto part_oid = info.part_oid(pn);
@@ -1875,6 +1936,11 @@ void FIFO::trim(std::string_view markstr, bool exclusive,
   }
   ++trimmer->pn;
   auto ofs = marker->ofs;
+  if (marker->num > hn) {
+    trimmer->reread = true;
+    read_meta(tid, Trimmer::call(std::move(trimmer)));
+    return;
+  }
   if (pn < marker->num) {
     ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
                   << " pn=" << pn << " tid=" << tid << dendl;
index 69cee5a887405ff3abeda0c00c9007ce56dd8426..26d9e9a9253e4e67f069481d75b3c8c49fac7ed0 100644 (file)
@@ -1125,3 +1125,54 @@ TEST_F(AioLegacyFIFO, TestPushBatch)
   auto& info = f->meta();
   ASSERT_EQ(info.head_part_num, 4);
 }
+
+TEST_F(LegacyFIFO, TrimAll)
+{
+  std::unique_ptr<RCf::FIFO> f;
+  auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
+  ASSERT_EQ(0, r);
+  static constexpr auto max_entries = 10u;
+  for (uint32_t i = 0; i < max_entries; ++i) {
+    cb::list bl;
+    encode(i, bl);
+    r = f->push(bl, null_yield);
+    ASSERT_EQ(0, r);
+  }
+
+  /* trim one entry */
+  r = f->trim(RCf::marker::max().to_string(), false, null_yield);
+  ASSERT_EQ(-ENODATA, r);
+
+  std::vector<RCf::list_entry> result;
+  bool more;
+  r = f->list(1, std::nullopt, &result, &more, null_yield);
+  ASSERT_EQ(0, r);
+  ASSERT_TRUE(result.empty());
+}
+
+TEST_F(LegacyFIFO, AioTrimAll)
+{
+  std::unique_ptr<RCf::FIFO> f;
+  auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
+  ASSERT_EQ(0, r);
+  static constexpr auto max_entries = 10u;
+  for (uint32_t i = 0; i < max_entries; ++i) {
+    cb::list bl;
+    encode(i, bl);
+    r = f->push(bl, null_yield);
+    ASSERT_EQ(0, r);
+  }
+
+  auto c = R::Rados::aio_create_completion();
+  f->trim(RCf::marker::max().to_string(), false, c);
+  c->wait_for_complete();
+  r = c->get_return_value();
+  c->release();
+  ASSERT_EQ(-ENODATA, r);
+
+  std::vector<RCf::list_entry> result;
+  bool more;
+  r = f->list(1, std::nullopt, &result, &more, null_yield);
+  ASSERT_EQ(0, r);
+  ASSERT_TRUE(result.empty());
+}