int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y)
{
+ bool overshoot = false;
auto marker = to_marker(markstr);
if (!marker) {
return -EINVAL;
auto ofs = marker->ofs;
std::unique_lock l(m);
auto tid = ++next_tid;
+ auto hn = info.head_part_num;
+ const auto max_part_size = info.params.max_part_size;
+ if (part_num > hn) {
+ l.unlock();
+ auto r = read_meta(tid, y);
+ if (r < 0) {
+ return r;
+ }
+ l.lock();
+ auto hn = info.head_part_num;
+ if (part_num > hn) {
+ overshoot = true;
+ part_num = hn;
+ ofs = max_part_size;
+ }
+ }
+ if (part_num < info.tail_part_num) {
+ return -ENODATA;
+ }
auto pn = info.tail_part_num;
l.unlock();
ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " pn=" << pn << " tid=" << tid << dendl;
std::unique_lock l(m);
- auto max_part_size = info.params.max_part_size;
l.unlock();
r = trim_part(pn, max_part_size, std::nullopt, false, tid, y);
if (r < 0 && r == -ENOENT) {
<< " canceled too many times, giving up: tid=" << tid << dendl;
return -EIO;
}
- return 0;
+ return overshoot ? -ENODATA : 0;
}
struct Trimmer : public Completion<Trimmer> {
bool exclusive;
std::uint64_t tid;
bool update = false;
+ bool reread = false;
bool canceled = false;
+ bool overshoot = false;
int retries = 0;
Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
auto cct = fifo->cct;
ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " entering: tid=" << tid << dendl;
+
+ if (reread) {
+ reread = false;
+ if (r < 0) {
+ lderr(cct) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " read_meta failed: r="
+ << r << " tid=" << tid << dendl;
+ complete(std::move(p), r);
+ return;
+ }
+ std::unique_lock l(fifo->m);
+ auto hn = fifo->info.head_part_num;
+ const auto max_part_size = fifo->info.params.max_part_size;
+ const auto tail_part_num = fifo->info.tail_part_num;
+ l.unlock();
+ if (part_num > hn) {
+ part_num = hn;
+ ofs = max_part_size;
+ overshoot = true;
+ }
+ if (part_num < tail_part_num) {
+ complete(std::move(p), -ENODATA);
+ return;
+ }
+ pn = tail_part_num;
+ if (pn < part_num) {
+ ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
+ << " pn=" << pn << " tid=" << tid << dendl;
+ fifo->trim_part(pn++, max_part_size, std::nullopt,
+ false, tid, call(std::move(p)));
+ } else {
+ update = true;
+ canceled = tail_part_num < part_num;
+ fifo->trim_part(part_num, ofs, std::nullopt, exclusive, tid,
+ call(std::move(p)));
+ }
+ return;
+ }
+
if (r == -ENOENT) {
r = 0;
}
.tail_part_num(part_num), objv, &canceled,
tid, call(std::move(p)));
} else {
- complete(std::move(p), 0);
+ complete(std::move(p), overshoot ? -ENODATA : 0);
}
}
};
auto marker = to_marker(markstr);
auto realmark = marker.value_or(::rgw::cls::fifo::marker{});
std::unique_lock l(m);
+ const auto hn = info.head_part_num;
const auto max_part_size = info.params.max_part_size;
const auto pn = info.tail_part_num;
const auto part_oid = info.part_oid(pn);
}
++trimmer->pn;
auto ofs = marker->ofs;
+ if (marker->num > hn) {
+ trimmer->reread = true;
+ read_meta(tid, Trimmer::call(std::move(trimmer)));
+ return;
+ }
if (pn < marker->num) {
ldout(cct, 20) << __PRETTY_FUNCTION__ << ":" << __LINE__
<< " pn=" << pn << " tid=" << tid << dendl;
auto& info = f->meta();
ASSERT_EQ(info.head_part_num, 4);
}
+
+TEST_F(LegacyFIFO, TrimAll)
+{
+ std::unique_ptr<RCf::FIFO> f;
+ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
+ ASSERT_EQ(0, r);
+ static constexpr auto max_entries = 10u;
+ for (uint32_t i = 0; i < max_entries; ++i) {
+ cb::list bl;
+ encode(i, bl);
+ r = f->push(bl, null_yield);
+ ASSERT_EQ(0, r);
+ }
+
+ /* trim one entry */
+ r = f->trim(RCf::marker::max().to_string(), false, null_yield);
+ ASSERT_EQ(-ENODATA, r);
+
+ std::vector<RCf::list_entry> result;
+ bool more;
+ r = f->list(1, std::nullopt, &result, &more, null_yield);
+ ASSERT_EQ(0, r);
+ ASSERT_TRUE(result.empty());
+}
+
+TEST_F(LegacyFIFO, AioTrimAll)
+{
+ std::unique_ptr<RCf::FIFO> f;
+ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
+ ASSERT_EQ(0, r);
+ static constexpr auto max_entries = 10u;
+ for (uint32_t i = 0; i < max_entries; ++i) {
+ cb::list bl;
+ encode(i, bl);
+ r = f->push(bl, null_yield);
+ ASSERT_EQ(0, r);
+ }
+
+ auto c = R::Rados::aio_create_completion();
+ f->trim(RCf::marker::max().to_string(), false, c);
+ c->wait_for_complete();
+ r = c->get_return_value();
+ c->release();
+ ASSERT_EQ(-ENODATA, r);
+
+ std::vector<RCf::list_entry> result;
+ bool more;
+ r = f->list(1, std::nullopt, &result, &more, null_yield);
+ ASSERT_EQ(0, r);
+ ASSERT_TRUE(result.empty());
+}