#include <time.h>
#include <fcntl.h>
#include <unistd.h>
+#include <thread>
#include "global/global_init.h"
#include "common/ceph_argparse.h"
#include "include/stringify.h"
return fn;
}
+char* gen_buffer(uint64_t size)
+{
+ char *buffer = new char[size];
+ boost::random::random_device rand;
+ rand.generate(buffer, buffer + size);
+ return buffer;
+}
+
+
void rm_temp_bdev(string f)
{
::unlink(f.c_str());
rm_temp_bdev(fn);
}
+#define ALLOC_SIZE 4096
+
+void write_data(BlueFS &fs, uint64_t rationed_bytes)
+{
+ BlueFS::FileWriter *h;
+ int j=0, r=0;
+ uint64_t written_bytes = 0;
+ rationed_bytes -= ALLOC_SIZE;
+ stringstream ss;
+ string dir = "dir.";
+ ss << std::this_thread::get_id();
+ dir.append(ss.str());
+ dir.append(".");
+ dir.append(to_string(j));
+ ASSERT_EQ(0, fs.mkdir(dir));
+ while (1) {
+ string file = "file.";
+ file.append(to_string(j));
+ ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
+ bufferlist bl;
+ char *buf = gen_buffer(ALLOC_SIZE);
+ bufferptr bp = buffer::claim_char(ALLOC_SIZE, buf);
+ bl.push_back(bp);
+ h->append(bl);
+ r = fs.fsync(h);
+ if (r < 0) {
+ std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
+ fs.close_writer(h);
+ break;
+ }
+ written_bytes += g_conf->bluefs_alloc_size;
+ fs.close_writer(h);
+ j++;
+ if ((rationed_bytes - written_bytes) <= g_conf->bluefs_alloc_size) {
+ std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
+ break;
+ }
+ }
+}
+
+void create_single_file(BlueFS &fs)
+{
+ BlueFS::FileWriter *h;
+ stringstream ss;
+ string dir = "dir.test";
+ ASSERT_EQ(0, fs.mkdir(dir));
+ string file = "testfile";
+ ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
+ bufferlist bl;
+ char *buf = gen_buffer(ALLOC_SIZE);
+ bufferptr bp = buffer::claim_char(ALLOC_SIZE, buf);
+ bl.push_back(bp);
+ h->append(bl);
+ fs.fsync(h);
+ fs.close_writer(h);
+}
+
+void write_single_file(BlueFS &fs, uint64_t rationed_bytes)
+{
+ BlueFS::FileWriter *h;
+ stringstream ss;
+ string dir = "dir.test";
+ string file = "testfile";
+ int r=0, j=0;
+ uint64_t written_bytes = 0;
+ rationed_bytes -= ALLOC_SIZE;
+ while (1) {
+ ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
+ bufferlist bl;
+ char *buf = gen_buffer(ALLOC_SIZE);
+ bufferptr bp = buffer::claim_char(ALLOC_SIZE, buf);
+ bl.push_back(bp);
+ h->append(bl);
+ r = fs.fsync(h);
+ if (r < 0) {
+ std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
+ fs.close_writer(h);
+ break;
+ }
+ written_bytes += g_conf->bluefs_alloc_size;
+ fs.close_writer(h);
+ j++;
+ if ((rationed_bytes - written_bytes) <= g_conf->bluefs_alloc_size) {
+ std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
+ break;
+ }
+ }
+}
+
+bool writes_done = false;
+
+void sync_fs(BlueFS &fs)
+{
+ while (1) {
+ if (writes_done == true)
+ break;
+ fs.sync_metadata();
+ sleep(1);
+ }
+}
+
+
+void do_join(std::thread& t)
+{
+ t.join();
+}
+
+void join_all(std::vector<std::thread>& v)
+{
+ std::for_each(v.begin(),v.end(),do_join);
+}
+
+#define NUM_WRITERS 3
+#define NUM_SYNC_THREADS 1
+
+#define NUM_SINGLE_FILE_WRITERS 1
+#define NUM_MULTIPLE_FILE_WRITERS 2
+
+TEST(BlueFS, test_flush_1) {
+ uint64_t size = 1048476 * 128;
+ string fn = get_temp_bdev(size);
+ g_ceph_context->_conf->set_val(
+ "bluefs_alloc_size",
+ "65536");
+ g_ceph_context->_conf->apply_changes(NULL);
+
+ BlueFS fs;
+ ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+ fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+ uuid_d fsid;
+ ASSERT_EQ(0, fs.mkfs(fsid));
+ ASSERT_EQ(0, fs.mount());
+ {
+ std::vector<std::thread> write_thread_multiple;
+ uint64_t effective_size = size - (32 * 1048576); // leaving the last 32 MB for log compaction
+ uint64_t per_thread_bytes = (effective_size/(NUM_MULTIPLE_FILE_WRITERS + NUM_SINGLE_FILE_WRITERS));
+ for (int i=0; i<NUM_MULTIPLE_FILE_WRITERS ; i++) {
+ write_thread_multiple.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
+ }
+
+ create_single_file(fs);
+ std::vector<std::thread> write_thread_single;
+ for (int i=0; i<NUM_SINGLE_FILE_WRITERS; i++) {
+ write_thread_single.push_back(std::thread(write_single_file, std::ref(fs), per_thread_bytes));
+ }
+
+ join_all(write_thread_single);
+ join_all(write_thread_multiple);
+ }
+ fs.umount();
+ rm_temp_bdev(fn);
+}
+
+TEST(BlueFS, test_flush_2) {
+ uint64_t size = 1048476 * 128;
+ string fn = get_temp_bdev(size);
+ g_ceph_context->_conf->set_val(
+ "bluefs_alloc_size",
+ "65536");
+ g_ceph_context->_conf->apply_changes(NULL);
+
+ BlueFS fs;
+ ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+ fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+ uuid_d fsid;
+ ASSERT_EQ(0, fs.mkfs(fsid));
+ ASSERT_EQ(0, fs.mount());
+ {
+ uint64_t effective_size = size - (32 * 1048576); // leaving the last 32 MB for log compaction
+ uint64_t per_thread_bytes = (effective_size/(NUM_WRITERS));
+ std::vector<std::thread> write_thread_multiple;
+ for (int i=0; i<NUM_WRITERS; i++) {
+ write_thread_multiple.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
+ }
+
+ join_all(write_thread_multiple);
+ }
+ fs.umount();
+ rm_temp_bdev(fn);
+}
+
+TEST(BlueFS, test_flush_3) {
+ uint64_t size = 1048476 * 128;
+ string fn = get_temp_bdev(size);
+ g_ceph_context->_conf->set_val(
+ "bluefs_alloc_size",
+ "65536");
+ g_ceph_context->_conf->apply_changes(NULL);
+
+ BlueFS fs;
+ ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+ fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+ uuid_d fsid;
+ ASSERT_EQ(0, fs.mkfs(fsid));
+ ASSERT_EQ(0, fs.mount());
+ {
+ std::vector<std::thread> write_threads;
+ uint64_t effective_size = size - (11 * 1048576); // leaving the last 11 MB for log compaction
+ uint64_t per_thread_bytes = (effective_size/(NUM_WRITERS));
+ for (int i=0; i<NUM_WRITERS; i++) {
+ write_threads.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
+ }
+
+ std::vector<std::thread> sync_threads;
+ for (int i=0; i<NUM_SYNC_THREADS; i++) {
+ sync_threads.push_back(std::thread(sync_fs, std::ref(fs)));
+ }
+
+ join_all(write_threads);
+ writes_done = true;
+ join_all(sync_threads);
+ }
+ fs.umount();
+ rm_temp_bdev(fn);
+}
+
+
int main(int argc, char **argv) {
vector<const char*> args;
argv_to_vec(argc, (const char **)argv, args);