]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
cls/fifo: Add 'exclusive' option to Trim 35548/head
authorAdam C. Emerson <aemerson@redhat.com>
Fri, 4 Sep 2020 00:43:47 +0000 (20:43 -0400)
committerAdam C. Emerson <aemerson@redhat.com>
Wed, 9 Sep 2020 02:09:40 +0000 (22:09 -0400)
To support RGW's MetadataLog, add a flag to FIFO::trim that when true
trims up to but not including the given entry.

Signed-off-by: Adam C. Emerson <aemerson@redhat.com>
src/cls/fifo/cls_fifo.cc
src/cls/fifo/cls_fifo_ops.h
src/neorados/cls/fifo.cc
src/neorados/cls/fifo.h
src/rgw/cls_fifo_legacy.cc
src/rgw/cls_fifo_legacy.h
src/rgw/rgw_datalog.cc
src/test/cls_fifo/bench_cls_fifo.cc
src/test/cls_fifo/test_cls_fifo.cc
src/test/rgw/test_cls_fifo_legacy.cc

index baa94dc8eb830cee037fa0261f8dc00ac8273406..4bf0c34e7eb152449a0b7ac2e09b1b0416d4bf2b 100644 (file)
@@ -776,6 +776,9 @@ int trim_part(cls_method_context_t hctx,
   if (op.ofs < part_header.min_ofs) {
     return 0;
   }
+  if (op.exclusive && op.ofs == part_header.min_ofs) {
+    return 0;
+  }
 
   if (op.ofs >= part_header.next_ofs) {
     if (full_part(part_header)) {
@@ -803,15 +806,19 @@ int trim_part(cls_method_context_t hctx,
       return r;
     }
 
-    r = reader.get_next_entry(nullptr, nullptr, nullptr);
-    if (r < 0) {
-      CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d",
-             __func__, r);
-      return r;
+    if (op.exclusive) {
+      part_header.min_index = pre_header.index;
+    } else {
+      r = reader.get_next_entry(nullptr, nullptr, nullptr);
+      if (r < 0) {
+       CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d",
+               __func__, r);
+       return r;
+      }
+      part_header.min_index = pre_header.index + 1;
     }
 
     part_header.min_ofs = reader.get_ofs();
-    part_header.min_index = pre_header.index + 1;
   }
 
   r = write_part_header(hctx, part_header);
index 0adae1e5d39ae96c373d89758207e491d2f09cd1..a3f4ae237c9c5f6b52274dffc46b16f4416924ae 100644 (file)
@@ -194,17 +194,20 @@ struct trim_part
 {
   std::optional<std::string> tag;
   std::uint64_t ofs{0};
+  bool exclusive = false;
 
   void encode(ceph::buffer::list& bl) const {
     ENCODE_START(1, 1, bl);
     encode(tag, bl);
     encode(ofs, bl);
+    encode(exclusive, bl);
     ENCODE_FINISH(bl);
   }
   void decode(ceph::buffer::list::const_iterator& bl) {
     DECODE_START(1, bl);
     decode(tag, bl);
     decode(ofs, bl);
+    decode(exclusive, bl);
     DECODE_FINISH(bl);
   }
 };
index 84f20a50e79bb2385404e03ce1b1f38d61fc8ad2..fa99275b25f96cbf3350830f35a72504b89070d1 100644 (file)
@@ -143,12 +143,13 @@ void push_part(WriteOp& op, std::string_view tag,
 
 void trim_part(WriteOp& op,
               std::optional<std::string_view> tag,
-              std::uint64_t ofs)
+              std::uint64_t ofs, bool exclusive)
 {
   fifo::op::trim_part tp;
 
   tp.tag = tag;
   tp.ofs = ofs;
+  tp.exclusive = exclusive;
 
   bufferlist in;
   encode(tp, in);
index 3bdf55a0af62a415bd6a6eeb09251026691aba59..05865dccace3c4e5c92261ce2d36ad160db4d4e9 100644 (file)
@@ -112,7 +112,8 @@ void push_part(WriteOp& op, std::string_view tag,
               std::deque<cb::list> data_bufs,
               fu2::unique_function<void(bs::error_code, int)>);
 void trim_part(WriteOp& op, std::optional<std::string_view> tag,
-              std::uint64_t ofs);
+              std::uint64_t ofs,
+              bool exclusive);
 void list_part(ReadOp& op,
               std::optional<std::string_view> tag,
               std::uint64_t ofs,
@@ -567,6 +568,8 @@ public:
   /// Signature: (bs::error_code)
   template<typename CT>
   auto trim(std::string_view markstr, //< Position to which to trim, inclusive
+           bool exclusive, //< If true, trim markers up to but NOT INCLUDING
+                           //< markstr, otherwise trim markstr as well.
            CT&& ct //< CompletionToken
     ) {
     auto m = to_marker(markstr);
@@ -582,7 +585,7 @@ public:
     } else {
       using handler_type = decltype(init.completion_handler);
       auto t = ceph::allocate_unique<Trimmer<handler_type>>(
-       a, this, m->num, m->ofs, std::move(init.completion_handler));
+       a, this, m->num, m->ofs, exclusive, std::move(init.completion_handler));
       t.release()->trim();
     }
     return init.result.get();
@@ -1088,9 +1091,10 @@ private:
   auto trim_part(int64_t part_num,
                 uint64_t ofs,
                 std::optional<std::string_view> tag,
+                bool exclusive,
                 CT&& ct) {
     WriteOp op;
-    cls::fifo::trim_part(op, tag, ofs);
+    cls::fifo::trim_part(op, tag, ofs, exclusive);
     return r->execute(info.part_oid(part_num), ioc, std::move(op),
                      std::forward<CT>(ct));
   }
@@ -1362,6 +1366,7 @@ private:
     FIFO* f;
     std::int64_t part_num;
     std::uint64_t ofs;
+    bool exclusive;
     Handler handler;
     std::int64_t pn;
     int i = 0;
@@ -1411,8 +1416,9 @@ private:
 
   public:
     Trimmer(FIFO* f, std::int64_t part_num, std::uint64_t ofs,
-           Handler&& handler)
-      : f(f), part_num(part_num), ofs(ofs), handler(std::move(handler)) {
+           bool exclusive, Handler&& handler)
+      : f(f), part_num(part_num), ofs(ofs), exclusive(exclusive),
+       handler(std::move(handler)) {
       std::unique_lock l(f->m);
       pn = f->info.tail_part_num;
     }
@@ -1426,6 +1432,7 @@ private:
        l.unlock();
        f->trim_part(
          pn, max_part_size, std::nullopt,
+         false,
          ca::bind_ea(
            e, a,
            [t = std::unique_ptr<Trimmer>(this),
@@ -1444,7 +1451,7 @@ private:
        return;
       }
       f->trim_part(
-       part_num, ofs, std::nullopt,
+       part_num, ofs, std::nullopt, exclusive,
        ca::bind_ea(
          e, a,
          [t = std::unique_ptr<Trimmer>(this),
index d44f52cf15e257e5145660868213305e6180fd1c..5b6015623a1f50df8b626f29aa4081fbdd6fd387 100644 (file)
@@ -152,12 +152,13 @@ int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
 
 void trim_part(lr::ObjectWriteOperation* op,
               std::optional<std::string_view> tag,
-              std::uint64_t ofs)
+              std::uint64_t ofs, bool exclusive)
 {
   fifo::op::trim_part tp;
 
   tp.tag = tag;
   tp.ofs = ofs;
+  tp.exclusive = exclusive;
 
   cb::list in;
   encode(tp, in);
@@ -621,25 +622,27 @@ int FIFO::push_entries(const std::deque<cb::list>& data_bufs,
 
 int FIFO::trim_part(int64_t part_num, uint64_t ofs,
                    std::optional<std::string_view> tag,
+                   bool exclusive,
                    optional_yield y)
 {
   lr::ObjectWriteOperation op;
   std::unique_lock l(m);
   const auto part_oid = info.part_oid(part_num);
   l.unlock();
-  rgw::cls::fifo::trim_part(&op, tag, ofs);
+  rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
   return rgw_rados_operate(ioctx, part_oid, &op, y);
 }
 
 int FIFO::trim_part(int64_t part_num, uint64_t ofs,
                    std::optional<std::string_view> tag,
+                   bool exclusive,
                    lr::AioCompletion* c)
 {
   lr::ObjectWriteOperation op;
   std::unique_lock l(m);
   const auto part_oid = info.part_oid(part_num);
   l.unlock();
-  rgw::cls::fifo::trim_part(&op, tag, ofs);
+  rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
   return ioctx.aio_operate(part_oid, c, &op);
 }
 
@@ -924,7 +927,7 @@ int FIFO::list(int max_entries,
   return r;
 }
 
-int FIFO::trim(std::string_view markstr, optional_yield y)
+int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y)
 {
   auto marker = to_marker(markstr);
   if (!marker) {
@@ -941,13 +944,13 @@ int FIFO::trim(std::string_view markstr, optional_yield y)
     std::unique_lock l(m);
     auto max_part_size = info.params.max_part_size;
     l.unlock();
-    r = trim_part(pn, max_part_size, std::nullopt, y);
+    r = trim_part(pn, max_part_size, std::nullopt, false, y);
     if (r < 0 && r == -ENOENT) {
       return r;
     }
     ++pn;
   }
-  r = trim_part(part_num, ofs, std::nullopt, y);
+  r = trim_part(part_num, ofs, std::nullopt, exclusive, y);
   if (r < 0 && r != -ENOENT) {
     return r;
   }
@@ -982,6 +985,7 @@ struct Trimmer {
   std::int64_t part_num;
   std::uint64_t ofs;
   std::int64_t pn;
+  bool exclusive;
   lr::AioCompletion* super;
   lr::AioCompletion* cur = lr::Rados::aio_create_completion(
     static_cast<void*>(this), &FIFO::trim_callback);
@@ -990,8 +994,9 @@ struct Trimmer {
   int retries = 0;
 
   Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
-         lr::AioCompletion* super)
-    : fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), super(super) {
+         bool exclusive, lr::AioCompletion* super)
+    : fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), exclusive(exclusive),
+      super(super) {
     super->pc->get();
   }
   ~Trimmer() {
@@ -1019,7 +1024,7 @@ void FIFO::trim_callback(lr::completion_t, void* arg)
       trimmer->cur->release();
       trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback);
       r = trimmer->fifo->trim_part(trimmer->pn++, max_part_size, std::nullopt,
-                                  trimmer->cur);
+                                  false, trimmer->cur);
       if (r < 0) {
        complete(trimmer->super, r);
        delete trimmer;
@@ -1033,7 +1038,7 @@ void FIFO::trim_callback(lr::completion_t, void* arg)
       trimmer->update = true;
       trimmer->canceled = tail_part_num < trimmer->part_num;
       r = trimmer->fifo->trim_part(trimmer->part_num, trimmer->ofs,
-                                  std::nullopt, trimmer->cur);
+                                  std::nullopt, trimmer->exclusive, trimmer->cur);
       if (r < 0) {
        complete(trimmer->super, r);
        delete trimmer;
@@ -1069,7 +1074,7 @@ void FIFO::trim_callback(lr::completion_t, void* arg)
   }
 }
 
-int FIFO::trim(std::string_view markstr, lr::AioCompletion* c) {
+int FIFO::trim(std::string_view markstr, bool exclusive, lr::AioCompletion* c) {
   auto marker = to_marker(markstr);
   if (!marker) {
     return -EINVAL;
@@ -1079,7 +1084,7 @@ int FIFO::trim(std::string_view markstr, lr::AioCompletion* c) {
   const auto pn = info.tail_part_num;
   const auto part_oid = info.part_oid(pn);
   l.unlock();
-  auto trimmer = new Trimmer(this, marker->num, marker->ofs, pn, c);
+  auto trimmer = new Trimmer(this, marker->num, marker->ofs, pn, exclusive, c);
   ++trimmer->pn;
   auto ofs = marker->ofs;
   if (pn < marker->num) {
@@ -1087,7 +1092,8 @@ int FIFO::trim(std::string_view markstr, lr::AioCompletion* c) {
   } else {
     trimmer->update = true;
   }
-  auto r = trimmer->fifo->trim_part(pn, ofs, std::nullopt, trimmer->cur);
+  auto r = trimmer->fifo->trim_part(pn, ofs, std::nullopt, exclusive,
+                                   trimmer->cur);
   if (r < 0) {
     complete(trimmer->super, r);
     delete trimmer;
index 19c024719b018eb26b85f2138a49fc4920519427..734e05112ff24cf7810b1a340e2be111f61feda2 100644 (file)
@@ -62,7 +62,8 @@ void part_init(lr::ObjectWriteOperation* op, std::string_view tag,
 int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
              std::deque<cb::list> data_bufs, optional_yield y);
 void trim_part(lr::ObjectWriteOperation* op,
-              std::optional<std::string_view> tag, std::uint64_t ofs);
+              std::optional<std::string_view> tag, std::uint64_t ofs,
+              bool exclusive);
 int list_part(lr::IoCtx& ioctx, const std::string& oid,
              std::optional<std::string_view> tag, std::uint64_t ofs,
              std::uint64_t max_entries,
@@ -148,9 +149,11 @@ class FIFO {
   int push_entries(const std::deque<cb::list>& data_bufs,
                   optional_yield y);
   int trim_part(int64_t part_num, uint64_t ofs,
-               std::optional<std::string_view> tag, optional_yield y);
+               std::optional<std::string_view> tag, bool exclusive,
+               optional_yield y);
   int trim_part(int64_t part_num, uint64_t ofs,
-               std::optional<std::string_view> tag, lr::AioCompletion* c);
+               std::optional<std::string_view> tag, bool exclusive,
+               lr::AioCompletion* c);
 
   static void trim_callback(lr::completion_t, void* arg);
   static void update_callback(lr::completion_t, void* arg);
@@ -214,10 +217,14 @@ public:
     );
   /// Trim entries, coroutine/block style
   int trim(std::string_view markstr, //< Position to which to trim, inclusive
+          bool exclusive, //< If true, do not trim the target entry
+                          //< itself, just all those before it.
           optional_yield y //< Optional yield
     );
   /// Trim entries, librados AioCompletion style
   int trim(std::string_view markstr, //< Position to which to trim, inclusive
+          bool exclusive, //< If true, do not trim the target entry
+                          //< itself, just all those before it.
           lr::AioCompletion* c //< librados AIO Completion
     );
   /// Get part info
index 33edaf7271b58f2252d1a867ce82e267cd2b5431..b494d53e9983c87eb134ac1079d7b3049c780e56 100644 (file)
@@ -430,7 +430,7 @@ public:
     return 0;
   }
   int trim(int index, std::string_view marker) override {
-    auto r = fifos[index]->trim(marker, null_yield);
+    auto r = fifos[index]->trim(marker, false, null_yield);
     if (r < 0) {
       lderr(cct) << __PRETTY_FUNCTION__
                 << ": unable to trim FIFO: " << get_oid(index)
@@ -464,7 +464,7 @@ public:
       pc->cond.notify_all();
       pc->put_unlock();
     } else {
-      r = fifos[index]->trim(marker, c);
+      r = fifos[index]->trim(marker, false, c);
       if (r < 0) {
        lderr(cct) << __PRETTY_FUNCTION__
                   << ": unable to trim FIFO: " << get_oid(index)
index b990fbedfc865cad3d4abb08583edad25ce557f9..df390cd3171252d24baa86962d40f03f50a0f10b 100644 (file)
@@ -105,7 +105,7 @@ benchmark pull(RCf::FIFO& f, const std::uint32_t count,
       break;
     got += result.size();
     remaining -= result.size();
-    f.trim(result.back().marker, y);
+    f.trim(result.back().marker, false, y);
   }
   auto finish = sc::steady_clock::now();
   return benchmark(got, (finish - start));
@@ -140,7 +140,7 @@ void concurpull(const std::string& oid, const std::int64_t pool,
          got += result.size();
          remaining -= result.size();
          if (*exit_early) break;
-         f->trim(result.back().marker, y);
+         f->trim(result.back().marker, false, y);
        }
        auto finish = sc::steady_clock::now();
        bench.entries = got;
index 23f106c49caef0e0a98af29d86f59c6060ca0540..484248c0c81ca3178df8ae4c5e234ab9c0f305ff 100644 (file)
@@ -297,7 +297,7 @@ TEST(FIFO, TestPushListTrim) {
 
 
                  /* trim one entry */
-                 f->trim(markers[min_entry], y);
+                 f->trim(markers[min_entry], false, y);
                  ++min_entry;
                }
 
@@ -444,7 +444,7 @@ TEST(FIFO, TestMultipleParts) {
 
                    marker = result.front().marker;
 
-                   f->trim(*marker, y);
+                   f->trim(*marker, false, y);
                  }
 
                  /* check tail */
@@ -606,7 +606,7 @@ TEST(FIFO, TestTwoPushersTrim) {
                  auto& entry = result[num - 1];
                  marker = entry.marker;
 
-                 f1->trim(marker, y);
+                 f1->trim(marker, false, y);
 
                  /* list what's left by fifo2 */
 
@@ -686,3 +686,54 @@ TEST(FIFO, TestPushBatch) {
              });
   c.run();
 }
+
+TEST(FIFO, TestTrimExclusive) {
+  ba::io_context c;
+  auto fifo_id = "fifo"sv;
+
+  s::spawn(c, [&](s::yield_context y) mutable {
+               auto r = R::RADOS::Builder{}.build(c, y);
+               auto pool = create_pool(r, get_temp_pool_name(), y);
+               auto sg = make_scope_guard(
+                 [&] {
+                   r.delete_pool(pool, y);
+                 });
+               R::IOContext ioc(pool);
+               auto f = RCf::FIFO::create(r, ioc, fifo_id, y);
+               static constexpr auto max_entries = 10u;
+               for (uint32_t i = 0; i < max_entries; ++i) {
+                 cb::list bl;
+                 encode(i, bl);
+                 f->push(bl, y);
+               }
+
+               {
+                 auto [result, more] = f->list(1, std::nullopt, y);
+                 auto [val, marker] =
+                   decode_entry<std::uint32_t>(result.front());
+                 ASSERT_EQ(0, val);
+                 f->trim(marker, true, y);
+               }
+               {
+                 auto [result, more] = f->list(max_entries, std::nullopt, y);
+                 auto [val, marker] = decode_entry<std::uint32_t>(result.front());
+                 ASSERT_EQ(0, val);
+                 f->trim(result[4].marker, true, y);
+               }
+               {
+                 auto [result, more] = f->list(max_entries, std::nullopt, y);
+                 auto [val, marker] =
+                   decode_entry<std::uint32_t>(result.front());
+                 ASSERT_EQ(4, val);
+                 f->trim(result.back().marker, true, y);
+               }
+               {
+                 auto [result, more] = f->list(max_entries, std::nullopt, y);
+                 auto [val, marker] =
+                   decode_entry<std::uint32_t>(result.front());
+                 ASSERT_EQ(result.size(), 1);
+                 ASSERT_EQ(max_entries - 1, val);
+               }
+             });
+  c.run();
+}
index f8808b87de8e4f493e4af5c10925b3491cbcf03c..17e78e357f6e3f21e1fbfaac8dc1d84398105dcd 100644 (file)
@@ -220,7 +220,7 @@ TEST_F(LegacyFIFO, TestPushListTrim)
   }
 
   /* trim one entry */
-  r = f->trim(markers[min_entry], null_yield);
+  r = f->trim(markers[min_entry], false, null_yield);
   ASSERT_EQ(0, r);
   ++min_entry;
 
@@ -333,7 +333,7 @@ TEST_F(LegacyFIFO, TestMultipleParts)
     ASSERT_EQ(expected_more, more);
 
     marker = result.front().marker;
-    r = f->trim(*marker, null_yield);
+    r = f->trim(*marker, false, null_yield);
     ASSERT_EQ(0, r);
 
     /* check tail */
@@ -459,7 +459,7 @@ TEST_F(LegacyFIFO, TestTwoPushersTrim)
 
   auto& entry = result[num - 1];
   marker = entry.marker;
-  r = f1->trim(marker, null_yield);
+  r = f1->trim(marker, false, null_yield);
   /* list what's left by fifo2 */
 
   const auto left = max_entries - num;
@@ -577,7 +577,7 @@ TEST_F(LegacyFIFO, TestAioTrim)
     marker = result.front().marker;
     std::unique_ptr<R::AioCompletion> c(rados.aio_create_completion(nullptr,
                                                                    nullptr));
-    r = f->trim(*marker, c.get());
+    r = f->trim(*marker, false, c.get());
     ASSERT_EQ(0, r);
     c->wait_for_complete();
     r = c->get_return_value();
@@ -607,3 +607,41 @@ TEST_F(LegacyFIFO, TestAioTrim)
   r = f->get_part_info(info.tail_part_num, &partinfo, null_yield);
   ASSERT_EQ(0, r);
 }
+
+TEST_F(LegacyFIFO, TestTrimExclusive) {
+  std::unique_ptr<RCf::FIFO> f;
+  auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
+  ASSERT_EQ(0, r);
+  std::vector<RCf::list_entry> result;
+  bool more = false;
+
+  static constexpr auto max_entries = 10u;
+  for (uint32_t i = 0; i < max_entries; ++i) {
+    cb::list bl;
+    encode(i, bl);
+    f->push(bl, null_yield);
+  }
+
+  f->list(1, std::nullopt, &result, &more, null_yield);
+  auto [val, marker] = decode_entry<std::uint32_t>(result.front());
+  ASSERT_EQ(0, val);
+  f->trim(marker, true, null_yield);
+
+  result.clear();
+  f->list(max_entries, std::nullopt, &result, &more, null_yield);
+  std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
+  ASSERT_EQ(0, val);
+  f->trim(result[4].marker, true, null_yield);
+
+  result.clear();
+  f->list(max_entries, std::nullopt, &result, &more, null_yield);
+  std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
+  ASSERT_EQ(4, val);
+  f->trim(result.back().marker, true, null_yield);
+
+  result.clear();
+  f->list(max_entries, std::nullopt, &result, &more, null_yield);
+  std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
+  ASSERT_EQ(result.size(), 1);
+  ASSERT_EQ(max_entries - 1, val);
+}