h->append(bl);
r = fs.fsync(h);
if (r < 0) {
- std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
fs.close_writer(h);
break;
}
fs.close_writer(h);
j++;
if ((rationed_bytes - written_bytes) <= g_conf->bluefs_alloc_size) {
- std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
break;
}
}
h->append(bl);
r = fs.fsync(h);
if (r < 0) {
- std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
fs.close_writer(h);
break;
}
fs.close_writer(h);
j++;
if ((rationed_bytes - written_bytes) <= g_conf->bluefs_alloc_size) {
- std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
break;
}
}
rm_temp_bdev(fn);
}
+TEST(BlueFS, test_simple_compaction_sync) {
+ g_ceph_context->_conf->set_val(
+ "bluefs_compact_log_sync",
+ "true");
+ uint64_t size = 1048476 * 128;
+ string fn = get_temp_bdev(size);
+
+ BlueFS fs;
+ ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+ fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+ uuid_d fsid;
+ ASSERT_EQ(0, fs.mkfs(fsid));
+ ASSERT_EQ(0, fs.mount());
+ {
+ BlueFS::FileWriter *h;
+ for (int i=0; i<10; i++) {
+ string dir = "dir.";
+ dir.append(to_string(i));
+ ASSERT_EQ(0, fs.mkdir(dir));
+ for (int j=0; j<10; j++) {
+ string file = "file.";
+ file.append(to_string(j));
+ ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
+ bufferlist bl;
+ char *buf = gen_buffer(4096);
+ bufferptr bp = buffer::claim_char(4096, buf);
+ bl.push_back(bp);
+ h->append(bl);
+ fs.fsync(h);
+ fs.close_writer(h);
+ }
+ }
+ }
+ // Don't remove all
+ {
+ for (int i=0; i<10; i+=2) {
+ string dir = "dir.";
+ dir.append(to_string(i));
+ for (int j=0; j<10; j+=2) {
+ string file = "file.";
+ file.append(to_string(j));
+ fs.unlink(dir, file);
+ fs.flush_log();
+ }
+ fs.rmdir(dir);
+ fs.flush_log();
+ }
+ }
+ fs.compact_log();
+ fs.umount();
+ rm_temp_bdev(fn);
+}
+
+TEST(BlueFS, test_simple_compaction_async) {
+ g_ceph_context->_conf->set_val(
+ "bluefs_compact_log_sync",
+ "false");
+ uint64_t size = 1048476 * 128;
+ string fn = get_temp_bdev(size);
+
+ BlueFS fs;
+ ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+ fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+ uuid_d fsid;
+ ASSERT_EQ(0, fs.mkfs(fsid));
+ ASSERT_EQ(0, fs.mount());
+ {
+ BlueFS::FileWriter *h;
+ for (int i=0; i<10; i++) {
+ string dir = "dir.";
+ dir.append(to_string(i));
+ ASSERT_EQ(0, fs.mkdir(dir));
+ for (int j=0; j<10; j++) {
+ string file = "file.";
+ file.append(to_string(j));
+ ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
+ bufferlist bl;
+ char *buf = gen_buffer(4096);
+ bufferptr bp = buffer::claim_char(4096, buf);
+ bl.push_back(bp);
+ h->append(bl);
+ fs.fsync(h);
+ fs.close_writer(h);
+ }
+ }
+ }
+ // Don't remove all
+ {
+ for (int i=0; i<10; i+=2) {
+ string dir = "dir.";
+ dir.append(to_string(i));
+ for (int j=0; j<10; j+=2) {
+ string file = "file.";
+ file.append(to_string(j));
+ fs.unlink(dir, file);
+ fs.flush_log();
+ }
+ fs.rmdir(dir);
+ fs.flush_log();
+ }
+ }
+ fs.compact_log();
+ fs.umount();
+ rm_temp_bdev(fn);
+}
+
+TEST(BlueFS, test_compaction_sync) {
+ uint64_t size = 1048476 * 128;
+ string fn = get_temp_bdev(size);
+ g_ceph_context->_conf->set_val(
+ "bluefs_alloc_size",
+ "65536");
+ g_ceph_context->_conf->set_val(
+ "bluefs_compact_log_sync",
+ "true");
+
+ BlueFS fs;
+ ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+ fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+ uuid_d fsid;
+ ASSERT_EQ(0, fs.mkfs(fsid));
+ ASSERT_EQ(0, fs.mount());
+ {
+ std::vector<std::thread> write_threads;
+ uint64_t effective_size = size - (32 * 1048576); // leaving the last 32 MB for log compaction
+ uint64_t per_thread_bytes = (effective_size/(NUM_WRITERS));
+ for (int i=0; i<NUM_WRITERS; i++) {
+ write_threads.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
+ }
+
+ std::vector<std::thread> sync_threads;
+ for (int i=0; i<NUM_SYNC_THREADS; i++) {
+ sync_threads.push_back(std::thread(sync_fs, std::ref(fs)));
+ }
+
+ join_all(write_threads);
+ writes_done = true;
+ join_all(sync_threads);
+ fs.compact_log();
+ }
+ fs.umount();
+ rm_temp_bdev(fn);
+}
+
+TEST(BlueFS, test_compaction_async) {
+ uint64_t size = 1048476 * 128;
+ string fn = get_temp_bdev(size);
+ g_ceph_context->_conf->set_val(
+ "bluefs_alloc_size",
+ "65536");
+ g_ceph_context->_conf->set_val(
+ "bluefs_compact_log_sync",
+ "false");
+
+ BlueFS fs;
+ ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+ fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+ uuid_d fsid;
+ ASSERT_EQ(0, fs.mkfs(fsid));
+ ASSERT_EQ(0, fs.mount());
+ {
+ std::vector<std::thread> write_threads;
+ uint64_t effective_size = size - (32 * 1048576); // leaving the last 32 MB for log compaction
+ uint64_t per_thread_bytes = (effective_size/(NUM_WRITERS));
+ for (int i=0; i<NUM_WRITERS; i++) {
+ write_threads.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
+ }
+
+ std::vector<std::thread> sync_threads;
+ for (int i=0; i<NUM_SYNC_THREADS; i++) {
+ sync_threads.push_back(std::thread(sync_fs, std::ref(fs)));
+ }
+
+ join_all(write_threads);
+ writes_done = true;
+ join_all(sync_threads);
+ fs.compact_log();
+ }
+ fs.umount();
+ rm_temp_bdev(fn);
+}
+
+TEST(BlueFS, test_replay) {
+ uint64_t size = 1048476 * 128;
+ string fn = get_temp_bdev(size);
+ g_ceph_context->_conf->set_val(
+ "bluefs_alloc_size",
+ "65536");
+ g_ceph_context->_conf->set_val(
+ "bluefs_compact_log_sync",
+ "false");
+
+ BlueFS fs;
+ ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+ fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+ uuid_d fsid;
+ ASSERT_EQ(0, fs.mkfs(fsid));
+ ASSERT_EQ(0, fs.mount());
+ {
+ std::vector<std::thread> write_threads;
+ uint64_t effective_size = size - (32 * 1048576); // leaving the last 32 MB for log compaction
+ uint64_t per_thread_bytes = (effective_size/(NUM_WRITERS));
+ for (int i=0; i<NUM_WRITERS; i++) {
+ write_threads.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
+ }
+
+ std::vector<std::thread> sync_threads;
+ for (int i=0; i<NUM_SYNC_THREADS; i++) {
+ sync_threads.push_back(std::thread(sync_fs, std::ref(fs)));
+ }
+
+ join_all(write_threads);
+ writes_done = true;
+ join_all(sync_threads);
+ fs.compact_log();
+ }
+ fs.umount();
+ // remount and check log can replay safe?
+ fs.mount();
+ fs.umount();
+ rm_temp_bdev(fn);
+}
int main(int argc, char **argv) {
vector<const char*> args;