return print_invalid(r_header);
}
} else if (r_header.committed_to.segment_seq != cursor.seq.segment_seq) {
+ /*
+ * Assuing that seastore issues several records using submit_recods()
+ * as shown in the following example.
+ *
+ * Example )
+ * a. submit_record(a);
+ * b. submit_record(b);
+ * c. submit_record(c);
+ * d. roll to begin
+ * e. submit_record(d);
+ * f. submit_record(e);
+ * g. submit_record(f);
+ *
+ * In this example, we need to consider the two cases.
+ * case 1)
+ * records a - e were issued in a batch manner
+ * case 2)
+ * When starts to submit_record(e) at step 6, submit(b) has completed its finalize phase,
+ * so the header of e's committed_to points to the end of b.
+ *
+ * To handle these cases correctly, the following condition is added.
+ */
+ if ((r_header.committed_to.offset >= cursor.last_committed.offset &&
+ r_header.committed_to.segment_seq == cursor.last_committed.segment_seq) &&
+ r_header.committed_to.segment_seq == cursor.seq.segment_seq - 1) {
+ return false;
+ }
return print_invalid(r_header);
}
return false;
get_records_used_size());
});
}
+
+TEST_F(cbjournal_test_t, multiple_submit_at_end)
+{
+ run_async([this] {
+ record_t rec {
+ { generate_extent(1), generate_extent(2) },
+ { generate_delta(20), generate_delta(21) }
+ };
+ auto r_size = record_group_size_t(rec.size, block_size);
+ auto record_total_size = r_size.get_encoded_length();
+ submit_record(std::move(rec));
+ while (is_available_size(record_total_size)) {
+ submit_record(
+ record_t {
+ { generate_extent(1), generate_extent(2) },
+ { generate_delta(20), generate_delta(21) }
+ });
+ }
+ update_journal_tail(entries.front().addr, record_total_size * 8);
+ for (int i = 0; i < 8; i++) {
+ entries.erase(entries.begin());
+ }
+ seastar::parallel_for_each(
+ boost::make_counting_iterator(0u),
+ boost::make_counting_iterator(4u),
+ [&](auto) {
+ return seastar::async([&] {
+ auto writes = 0;
+ while (writes < 2) {
+ record_t rec {
+ { generate_extent(1) },
+ { generate_delta(20) } };
+ submit_record(std::move(rec));
+ writes++;
+ }
+ });
+ }).get0();
+ auto old_written_to = get_written_to();
+ cbj->close().unsafe_get0();
+ cbj->replay(
+ [](const auto &offsets,
+ const auto &e,
+ auto &dirty_seq,
+ auto &alloc_seq,
+ auto last_modified) {
+ return Journal::replay_ertr::make_ready_future<bool>(true);
+ }).unsafe_get0();
+ assert(get_written_to() == old_written_to);
+ });
+}