if (op.ofs < part_header.min_ofs) {
return 0;
}
+ if (op.exclusive && op.ofs == part_header.min_ofs) {
+ return 0;
+ }
if (op.ofs >= part_header.next_ofs) {
if (full_part(part_header)) {
return r;
}
- r = reader.get_next_entry(nullptr, nullptr, nullptr);
- if (r < 0) {
- CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d",
- __func__, r);
- return r;
+ if (op.exclusive) {
+ part_header.min_index = pre_header.index;
+ } else {
+ r = reader.get_next_entry(nullptr, nullptr, nullptr);
+ if (r < 0) {
+ CLS_ERR("ERROR: %s(): unexpected failure at get_next_entry(): r=%d",
+ __func__, r);
+ return r;
+ }
+ part_header.min_index = pre_header.index + 1;
}
part_header.min_ofs = reader.get_ofs();
- part_header.min_index = pre_header.index + 1;
}
r = write_part_header(hctx, part_header);
{
std::optional<std::string> tag;
std::uint64_t ofs{0};
+ bool exclusive = false;
void encode(ceph::buffer::list& bl) const {
ENCODE_START(1, 1, bl);
encode(tag, bl);
encode(ofs, bl);
+ encode(exclusive, bl);
ENCODE_FINISH(bl);
}
void decode(ceph::buffer::list::const_iterator& bl) {
DECODE_START(1, bl);
decode(tag, bl);
decode(ofs, bl);
+ decode(exclusive, bl);
DECODE_FINISH(bl);
}
};
void trim_part(WriteOp& op,
std::optional<std::string_view> tag,
- std::uint64_t ofs)
+ std::uint64_t ofs, bool exclusive)
{
fifo::op::trim_part tp;
tp.tag = tag;
tp.ofs = ofs;
+ tp.exclusive = exclusive;
bufferlist in;
encode(tp, in);
std::deque<cb::list> data_bufs,
fu2::unique_function<void(bs::error_code, int)>);
void trim_part(WriteOp& op, std::optional<std::string_view> tag,
- std::uint64_t ofs);
+ std::uint64_t ofs,
+ bool exclusive);
void list_part(ReadOp& op,
std::optional<std::string_view> tag,
std::uint64_t ofs,
/// Signature: (bs::error_code)
template<typename CT>
auto trim(std::string_view markstr, //< Position to which to trim, inclusive
+ bool exclusive, //< If true, trim markers up to but NOT INCLUDING
+ //< markstr, otherwise trim markstr as well.
CT&& ct //< CompletionToken
) {
auto m = to_marker(markstr);
} else {
using handler_type = decltype(init.completion_handler);
auto t = ceph::allocate_unique<Trimmer<handler_type>>(
- a, this, m->num, m->ofs, std::move(init.completion_handler));
+ a, this, m->num, m->ofs, exclusive, std::move(init.completion_handler));
t.release()->trim();
}
return init.result.get();
auto trim_part(int64_t part_num,
uint64_t ofs,
std::optional<std::string_view> tag,
+ bool exclusive,
CT&& ct) {
WriteOp op;
- cls::fifo::trim_part(op, tag, ofs);
+ cls::fifo::trim_part(op, tag, ofs, exclusive);
return r->execute(info.part_oid(part_num), ioc, std::move(op),
std::forward<CT>(ct));
}
FIFO* f;
std::int64_t part_num;
std::uint64_t ofs;
+ bool exclusive;
Handler handler;
std::int64_t pn;
int i = 0;
public:
Trimmer(FIFO* f, std::int64_t part_num, std::uint64_t ofs,
- Handler&& handler)
- : f(f), part_num(part_num), ofs(ofs), handler(std::move(handler)) {
+ bool exclusive, Handler&& handler)
+ : f(f), part_num(part_num), ofs(ofs), exclusive(exclusive),
+ handler(std::move(handler)) {
std::unique_lock l(f->m);
pn = f->info.tail_part_num;
}
l.unlock();
f->trim_part(
pn, max_part_size, std::nullopt,
+ false,
ca::bind_ea(
e, a,
[t = std::unique_ptr<Trimmer>(this),
return;
}
f->trim_part(
- part_num, ofs, std::nullopt,
+ part_num, ofs, std::nullopt, exclusive,
ca::bind_ea(
e, a,
[t = std::unique_ptr<Trimmer>(this),
void trim_part(lr::ObjectWriteOperation* op,
std::optional<std::string_view> tag,
- std::uint64_t ofs)
+ std::uint64_t ofs, bool exclusive)
{
fifo::op::trim_part tp;
tp.tag = tag;
tp.ofs = ofs;
+ tp.exclusive = exclusive;
cb::list in;
encode(tp, in);
int FIFO::trim_part(int64_t part_num, uint64_t ofs,
std::optional<std::string_view> tag,
+ bool exclusive,
optional_yield y)
{
lr::ObjectWriteOperation op;
std::unique_lock l(m);
const auto part_oid = info.part_oid(part_num);
l.unlock();
- rgw::cls::fifo::trim_part(&op, tag, ofs);
+ rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
return rgw_rados_operate(ioctx, part_oid, &op, y);
}
int FIFO::trim_part(int64_t part_num, uint64_t ofs,
std::optional<std::string_view> tag,
+ bool exclusive,
lr::AioCompletion* c)
{
lr::ObjectWriteOperation op;
std::unique_lock l(m);
const auto part_oid = info.part_oid(part_num);
l.unlock();
- rgw::cls::fifo::trim_part(&op, tag, ofs);
+ rgw::cls::fifo::trim_part(&op, tag, ofs, exclusive);
return ioctx.aio_operate(part_oid, c, &op);
}
return r;
}
-int FIFO::trim(std::string_view markstr, optional_yield y)
+int FIFO::trim(std::string_view markstr, bool exclusive, optional_yield y)
{
auto marker = to_marker(markstr);
if (!marker) {
std::unique_lock l(m);
auto max_part_size = info.params.max_part_size;
l.unlock();
- r = trim_part(pn, max_part_size, std::nullopt, y);
+ r = trim_part(pn, max_part_size, std::nullopt, false, y);
if (r < 0 && r == -ENOENT) {
return r;
}
++pn;
}
- r = trim_part(part_num, ofs, std::nullopt, y);
+ r = trim_part(part_num, ofs, std::nullopt, exclusive, y);
if (r < 0 && r != -ENOENT) {
return r;
}
std::int64_t part_num;
std::uint64_t ofs;
std::int64_t pn;
+ bool exclusive;
lr::AioCompletion* super;
lr::AioCompletion* cur = lr::Rados::aio_create_completion(
static_cast<void*>(this), &FIFO::trim_callback);
int retries = 0;
Trimmer(FIFO* fifo, std::int64_t part_num, std::uint64_t ofs, std::int64_t pn,
- lr::AioCompletion* super)
- : fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), super(super) {
+ bool exclusive, lr::AioCompletion* super)
+ : fifo(fifo), part_num(part_num), ofs(ofs), pn(pn), exclusive(exclusive),
+ super(super) {
super->pc->get();
}
~Trimmer() {
trimmer->cur->release();
trimmer->cur = lr::Rados::aio_create_completion(arg, &FIFO::trim_callback);
r = trimmer->fifo->trim_part(trimmer->pn++, max_part_size, std::nullopt,
- trimmer->cur);
+ false, trimmer->cur);
if (r < 0) {
complete(trimmer->super, r);
delete trimmer;
trimmer->update = true;
trimmer->canceled = tail_part_num < trimmer->part_num;
r = trimmer->fifo->trim_part(trimmer->part_num, trimmer->ofs,
- std::nullopt, trimmer->cur);
+ std::nullopt, trimmer->exclusive, trimmer->cur);
if (r < 0) {
complete(trimmer->super, r);
delete trimmer;
}
}
-int FIFO::trim(std::string_view markstr, lr::AioCompletion* c) {
+int FIFO::trim(std::string_view markstr, bool exclusive, lr::AioCompletion* c) {
auto marker = to_marker(markstr);
if (!marker) {
return -EINVAL;
const auto pn = info.tail_part_num;
const auto part_oid = info.part_oid(pn);
l.unlock();
- auto trimmer = new Trimmer(this, marker->num, marker->ofs, pn, c);
+ auto trimmer = new Trimmer(this, marker->num, marker->ofs, pn, exclusive, c);
++trimmer->pn;
auto ofs = marker->ofs;
if (pn < marker->num) {
} else {
trimmer->update = true;
}
- auto r = trimmer->fifo->trim_part(pn, ofs, std::nullopt, trimmer->cur);
+ auto r = trimmer->fifo->trim_part(pn, ofs, std::nullopt, exclusive,
+ trimmer->cur);
if (r < 0) {
complete(trimmer->super, r);
delete trimmer;
int push_part(lr::IoCtx& ioctx, const std::string& oid, std::string_view tag,
std::deque<cb::list> data_bufs, optional_yield y);
void trim_part(lr::ObjectWriteOperation* op,
- std::optional<std::string_view> tag, std::uint64_t ofs);
+ std::optional<std::string_view> tag, std::uint64_t ofs,
+ bool exclusive);
int list_part(lr::IoCtx& ioctx, const std::string& oid,
std::optional<std::string_view> tag, std::uint64_t ofs,
std::uint64_t max_entries,
int push_entries(const std::deque<cb::list>& data_bufs,
optional_yield y);
int trim_part(int64_t part_num, uint64_t ofs,
- std::optional<std::string_view> tag, optional_yield y);
+ std::optional<std::string_view> tag, bool exclusive,
+ optional_yield y);
int trim_part(int64_t part_num, uint64_t ofs,
- std::optional<std::string_view> tag, lr::AioCompletion* c);
+ std::optional<std::string_view> tag, bool exclusive,
+ lr::AioCompletion* c);
static void trim_callback(lr::completion_t, void* arg);
static void update_callback(lr::completion_t, void* arg);
);
/// Trim entries, coroutine/block style
int trim(std::string_view markstr, //< Position to which to trim, inclusive
+ bool exclusive, //< If true, do not trim the target entry
+ //< itself, just all those before it.
optional_yield y //< Optional yield
);
/// Trim entries, librados AioCompletion style
int trim(std::string_view markstr, //< Position to which to trim, inclusive
+ bool exclusive, //< If true, do not trim the target entry
+ //< itself, just all those before it.
lr::AioCompletion* c //< librados AIO Completion
);
/// Get part info
return 0;
}
int trim(int index, std::string_view marker) override {
- auto r = fifos[index]->trim(marker, null_yield);
+ auto r = fifos[index]->trim(marker, false, null_yield);
if (r < 0) {
lderr(cct) << __PRETTY_FUNCTION__
<< ": unable to trim FIFO: " << get_oid(index)
pc->cond.notify_all();
pc->put_unlock();
} else {
- r = fifos[index]->trim(marker, c);
+ r = fifos[index]->trim(marker, false, c);
if (r < 0) {
lderr(cct) << __PRETTY_FUNCTION__
<< ": unable to trim FIFO: " << get_oid(index)
break;
got += result.size();
remaining -= result.size();
- f.trim(result.back().marker, y);
+ f.trim(result.back().marker, false, y);
}
auto finish = sc::steady_clock::now();
return benchmark(got, (finish - start));
got += result.size();
remaining -= result.size();
if (*exit_early) break;
- f->trim(result.back().marker, y);
+ f->trim(result.back().marker, false, y);
}
auto finish = sc::steady_clock::now();
bench.entries = got;
/* trim one entry */
- f->trim(markers[min_entry], y);
+ f->trim(markers[min_entry], false, y);
++min_entry;
}
marker = result.front().marker;
- f->trim(*marker, y);
+ f->trim(*marker, false, y);
}
/* check tail */
auto& entry = result[num - 1];
marker = entry.marker;
- f1->trim(marker, y);
+ f1->trim(marker, false, y);
/* list what's left by fifo2 */
});
c.run();
}
+
+TEST(FIFO, TestTrimExclusive) {
+ ba::io_context c;
+ auto fifo_id = "fifo"sv;
+
+ s::spawn(c, [&](s::yield_context y) mutable {
+ auto r = R::RADOS::Builder{}.build(c, y);
+ auto pool = create_pool(r, get_temp_pool_name(), y);
+ auto sg = make_scope_guard(
+ [&] {
+ r.delete_pool(pool, y);
+ });
+ R::IOContext ioc(pool);
+ auto f = RCf::FIFO::create(r, ioc, fifo_id, y);
+ static constexpr auto max_entries = 10u;
+ for (uint32_t i = 0; i < max_entries; ++i) {
+ cb::list bl;
+ encode(i, bl);
+ f->push(bl, y);
+ }
+
+ {
+ auto [result, more] = f->list(1, std::nullopt, y);
+ auto [val, marker] =
+ decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(0, val);
+ f->trim(marker, true, y);
+ }
+ {
+ auto [result, more] = f->list(max_entries, std::nullopt, y);
+ auto [val, marker] = decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(0, val);
+ f->trim(result[4].marker, true, y);
+ }
+ {
+ auto [result, more] = f->list(max_entries, std::nullopt, y);
+ auto [val, marker] =
+ decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(4, val);
+ f->trim(result.back().marker, true, y);
+ }
+ {
+ auto [result, more] = f->list(max_entries, std::nullopt, y);
+ auto [val, marker] =
+ decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(result.size(), 1);
+ ASSERT_EQ(max_entries - 1, val);
+ }
+ });
+ c.run();
+}
}
/* trim one entry */
- r = f->trim(markers[min_entry], null_yield);
+ r = f->trim(markers[min_entry], false, null_yield);
ASSERT_EQ(0, r);
++min_entry;
ASSERT_EQ(expected_more, more);
marker = result.front().marker;
- r = f->trim(*marker, null_yield);
+ r = f->trim(*marker, false, null_yield);
ASSERT_EQ(0, r);
/* check tail */
auto& entry = result[num - 1];
marker = entry.marker;
- r = f1->trim(marker, null_yield);
+ r = f1->trim(marker, false, null_yield);
/* list what's left by fifo2 */
const auto left = max_entries - num;
marker = result.front().marker;
std::unique_ptr<R::AioCompletion> c(rados.aio_create_completion(nullptr,
nullptr));
- r = f->trim(*marker, c.get());
+ r = f->trim(*marker, false, c.get());
ASSERT_EQ(0, r);
c->wait_for_complete();
r = c->get_return_value();
r = f->get_part_info(info.tail_part_num, &partinfo, null_yield);
ASSERT_EQ(0, r);
}
+
+TEST_F(LegacyFIFO, TestTrimExclusive) {
+ std::unique_ptr<RCf::FIFO> f;
+ auto r = RCf::FIFO::create(ioctx, fifo_id, &f, null_yield);
+ ASSERT_EQ(0, r);
+ std::vector<RCf::list_entry> result;
+ bool more = false;
+
+ static constexpr auto max_entries = 10u;
+ for (uint32_t i = 0; i < max_entries; ++i) {
+ cb::list bl;
+ encode(i, bl);
+ f->push(bl, null_yield);
+ }
+
+ f->list(1, std::nullopt, &result, &more, null_yield);
+ auto [val, marker] = decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(0, val);
+ f->trim(marker, true, null_yield);
+
+ result.clear();
+ f->list(max_entries, std::nullopt, &result, &more, null_yield);
+ std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(0, val);
+ f->trim(result[4].marker, true, null_yield);
+
+ result.clear();
+ f->list(max_entries, std::nullopt, &result, &more, null_yield);
+ std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(4, val);
+ f->trim(result.back().marker, true, null_yield);
+
+ result.clear();
+ f->list(max_entries, std::nullopt, &result, &more, null_yield);
+ std::tie(val, marker) = decode_entry<std::uint32_t>(result.front());
+ ASSERT_EQ(result.size(), 1);
+ ASSERT_EQ(max_entries - 1, val);
+}