#include <atomic>
#include <mutex>
+#include <limits>
#include "bluefs_types.h"
#include "common/RefCountedObj.h"
// to use buffer_appender exclusively here (e.g., it's notion of
// offset will remain accurate).
void append(const char *buf, size_t len) {
+ uint64_t l0 = buffer.length();
+ ceph_assert(l0 + len <= std::numeric_limits<unsigned>::max());
buffer_appender.append(buf, len);
}
// note: used internally only, for ino 1 or 0.
- void append(bufferlist& bl) {
+ void append(ceph::buffer::list& bl) {
+ uint64_t l0 = buffer.length();
+ ceph_assert(l0 + bl.length() <= std::numeric_limits<unsigned>::max());
buffer.claim_append(bl);
}
int r = _flush(h, force, l);
ceph_assert(r == 0);
}
- void try_flush(FileWriter *h) {
- h->buffer_appender.flush();
- if (h->buffer.length() >= cct->_conf->bluefs_min_flush_size) {
- flush(h, true);
+
+ void append_try_flush(FileWriter *h, const char* buf, size_t len) {
+ size_t max_size = 1ull << 30; // cap to 1GB
+ while (len > 0) {
+ bool need_flush = true;
+ auto l0 = h->buffer.length();
+ if (l0 < max_size) {
+ size_t l = std::min(len, max_size - l0);
+ h->append(buf, l);
+ buf += l;
+ len -= l;
+ need_flush = h->buffer.length() >= cct->_conf->bluefs_min_flush_size;
+ }
+ if (need_flush) {
+ flush(h, true);
+ // make sure we've made any progress with flush hence the
+ // loop doesn't iterate forever
+ ceph_assert(h->buffer.length() < max_size);
+ }
}
}
void flush_range(FileWriter *h, uint64_t offset, uint64_t length) {
}
TEST(BlueFS, very_large_write) {
- // we'll write a ~3G file, so allocate more than that for the whole fs
- uint64_t size = 1048576 * 1024 * 8ull;
+ // we'll write a ~5G file, so allocate more than that for the whole fs
+ uint64_t size = 1048576 * 1024 * 6ull;
TempBdev bdev{size};
BlueFS fs(g_ceph_context);
g_ceph_context->_conf.set_val("bluefs_buffered_io", stringify((int)old));
}
+TEST(BlueFS, very_large_write2) {
+ // we'll write a ~5G file, so allocate more than that for the whole fs
+ uint64_t size_full = 1048576 * 1024 * 6ull;
+ uint64_t size = 1048576 * 1024 * 5ull;
+ TempBdev bdev{ size_full };
+ BlueFS fs(g_ceph_context);
+
+ bool old = g_ceph_context->_conf.get_val<bool>("bluefs_buffered_io");
+ g_ceph_context->_conf.set_val("bluefs_buffered_io", "false");
+ uint64_t total_written = 0;
+
+ ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, bdev.path, false, 1048576));
+ fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size_full - 1048576);
+ uuid_d fsid;
+ ASSERT_EQ(0, fs.mkfs(fsid));
+ ASSERT_EQ(0, fs.mount());
+
+ char fill_arr[1 << 20]; // 1M
+ for (size_t i = 0; i < sizeof(fill_arr); ++i) {
+ fill_arr[i] = (char)i;
+ }
+ std::unique_ptr<char[]> buf;
+ buf.reset(new char[size]);
+ for (size_t i = 0; i < size; i += sizeof(fill_arr)) {
+ memcpy(buf.get() + i, fill_arr, sizeof(fill_arr));
+ }
+ {
+ BlueFS::FileWriter* h;
+ ASSERT_EQ(0, fs.mkdir("dir"));
+ ASSERT_EQ(0, fs.open_for_write("dir", "bigfile", &h, false));
+ fs.append_try_flush(h, buf.get(), size);
+ total_written = size;
+ fs.fsync(h);
+ fs.close_writer(h);
+ }
+ memset(buf.get(), 0, size);
+ {
+ BlueFS::FileReader* h;
+ ASSERT_EQ(0, fs.open_for_read("dir", "bigfile", &h));
+ ASSERT_EQ(h->file->fnode.size, total_written);
+ auto l = h->file->fnode.size;
+ BlueFS::FileReaderBuffer readbuf(10485760);
+ int64_t r = fs.read(h, &readbuf, 0, l, NULL, buf.get());
+ ASSERT_EQ(r, (int64_t)l);
+ for (size_t i = 0; i < size; i += sizeof(fill_arr)) {
+ ceph_assert(memcmp(buf.get() + i, fill_arr, sizeof(fill_arr)) == 0);
+ }
+ delete h;
+ }
+ fs.umount();
+
+ g_ceph_context->_conf.set_val("bluefs_buffered_io", stringify((int)old));
+}
+
#define ALLOC_SIZE 4096
void write_data(BlueFS &fs, uint64_t rationed_bytes)