]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
os/bluestore/Bluefs: Unit test cases for bluefs async flush
authorVarada Kari <varada.kari@sandisk.com>
Wed, 29 Jun 2016 13:13:55 +0000 (18:43 +0530)
committerSage Weil <sage@redhat.com>
Thu, 30 Jun 2016 17:07:36 +0000 (13:07 -0400)
Signed-off-by: Varada Kari <varada.kari@sandisk.com>
src/test/objectstore/test_bluefs.cc

index 8f31407b4a319efe55a668db09d207020a4cb903..7ffdfb8dee9489006de664853af6d2eff208c2d0 100644 (file)
@@ -7,6 +7,7 @@
 #include <time.h>
 #include <fcntl.h>
 #include <unistd.h>
+#include <thread>
 #include "global/global_init.h"
 #include "common/ceph_argparse.h"
 #include "include/stringify.h"
@@ -28,6 +29,15 @@ string get_temp_bdev(uint64_t size)
   return fn;
 }
 
+char* gen_buffer(uint64_t size)
+{
+    char *buffer = new char[size];
+    boost::random::random_device rand;
+    rand.generate(buffer, buffer + size);
+    return buffer;
+}
+
+
 void rm_temp_bdev(string f)
 {
   ::unlink(f.c_str());
@@ -131,6 +141,223 @@ TEST(BlueFS, small_appends) {
   rm_temp_bdev(fn);
 }
 
+#define ALLOC_SIZE 4096
+
+void write_data(BlueFS &fs, uint64_t rationed_bytes)
+{
+    BlueFS::FileWriter *h;
+    int j=0, r=0;
+    uint64_t written_bytes = 0;
+    rationed_bytes -= ALLOC_SIZE;
+    stringstream ss;
+    string dir = "dir.";
+    ss << std::this_thread::get_id();
+    dir.append(ss.str());
+    dir.append(".");
+    dir.append(to_string(j));
+    ASSERT_EQ(0, fs.mkdir(dir));
+    while (1) {
+      string file = "file.";
+      file.append(to_string(j));
+      ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
+      bufferlist bl;
+      char *buf = gen_buffer(ALLOC_SIZE);
+      bufferptr bp = buffer::claim_char(ALLOC_SIZE, buf);
+      bl.push_back(bp);
+      h->append(bl);
+      r = fs.fsync(h);
+      if (r < 0) {
+         std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
+         fs.close_writer(h);
+         break;
+      }
+      written_bytes += g_conf->bluefs_alloc_size;
+      fs.close_writer(h);
+      j++;
+      if ((rationed_bytes - written_bytes) <= g_conf->bluefs_alloc_size) {
+        std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
+        break;
+      }
+    }
+}
+
+void create_single_file(BlueFS &fs)
+{
+    BlueFS::FileWriter *h;
+    stringstream ss;
+    string dir = "dir.test";
+    ASSERT_EQ(0, fs.mkdir(dir));
+    string file = "testfile";
+    ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
+    bufferlist bl;
+    char *buf = gen_buffer(ALLOC_SIZE);
+    bufferptr bp = buffer::claim_char(ALLOC_SIZE, buf);
+    bl.push_back(bp);
+    h->append(bl);
+    fs.fsync(h);
+    fs.close_writer(h);
+}
+
+void write_single_file(BlueFS &fs, uint64_t rationed_bytes)
+{
+    BlueFS::FileWriter *h;
+    stringstream ss;
+    string dir = "dir.test";
+    string file = "testfile";
+    int r=0, j=0;
+    uint64_t written_bytes = 0;
+    rationed_bytes -= ALLOC_SIZE;
+    while (1) {
+      ASSERT_EQ(0, fs.open_for_write(dir, file, &h, false));
+      bufferlist bl;
+      char *buf = gen_buffer(ALLOC_SIZE);
+      bufferptr bp = buffer::claim_char(ALLOC_SIZE, buf);
+      bl.push_back(bp);
+      h->append(bl);
+      r = fs.fsync(h);
+      if (r < 0) {
+         std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
+         fs.close_writer(h);
+         break;
+      }
+      written_bytes += g_conf->bluefs_alloc_size;
+      fs.close_writer(h);
+      j++;
+      if ((rationed_bytes - written_bytes) <= g_conf->bluefs_alloc_size) {
+        std::cout << " rationed_bytes: " << rationed_bytes << " written_bytes: " << written_bytes << std::endl;
+        break;
+      }
+    }
+}
+
+bool writes_done = false;
+
+void sync_fs(BlueFS &fs)
+{
+    while (1) {
+      if (writes_done == true)
+        break;
+      fs.sync_metadata();
+      sleep(1);
+    }
+}
+
+
+void do_join(std::thread& t)
+{
+    t.join();
+}
+
+void join_all(std::vector<std::thread>& v)
+{
+    std::for_each(v.begin(),v.end(),do_join);
+}
+
+#define NUM_WRITERS 3
+#define NUM_SYNC_THREADS 1
+
+#define NUM_SINGLE_FILE_WRITERS 1
+#define NUM_MULTIPLE_FILE_WRITERS 2
+
+TEST(BlueFS, test_flush_1) {
+  uint64_t size = 1048476 * 128;
+  string fn = get_temp_bdev(size);
+  g_ceph_context->_conf->set_val(
+    "bluefs_alloc_size",
+    "65536");
+  g_ceph_context->_conf->apply_changes(NULL);
+
+  BlueFS fs;
+  ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+  fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+  uuid_d fsid;
+  ASSERT_EQ(0, fs.mkfs(fsid));
+  ASSERT_EQ(0, fs.mount());
+  {
+    std::vector<std::thread> write_thread_multiple;
+    uint64_t effective_size = size - (32 * 1048576); // leaving the last 32 MB for log compaction
+    uint64_t per_thread_bytes = (effective_size/(NUM_MULTIPLE_FILE_WRITERS + NUM_SINGLE_FILE_WRITERS));
+    for (int i=0; i<NUM_MULTIPLE_FILE_WRITERS ; i++) {
+      write_thread_multiple.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
+    }
+
+    create_single_file(fs);
+    std::vector<std::thread> write_thread_single;
+    for (int i=0; i<NUM_SINGLE_FILE_WRITERS; i++) {
+      write_thread_single.push_back(std::thread(write_single_file, std::ref(fs), per_thread_bytes));
+    }
+
+    join_all(write_thread_single);
+    join_all(write_thread_multiple);
+  }
+  fs.umount();
+  rm_temp_bdev(fn);
+}
+
+TEST(BlueFS, test_flush_2) {
+  uint64_t size = 1048476 * 128;
+  string fn = get_temp_bdev(size);
+  g_ceph_context->_conf->set_val(
+    "bluefs_alloc_size",
+    "65536");
+  g_ceph_context->_conf->apply_changes(NULL);
+
+  BlueFS fs;
+  ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+  fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+  uuid_d fsid;
+  ASSERT_EQ(0, fs.mkfs(fsid));
+  ASSERT_EQ(0, fs.mount());
+  {
+    uint64_t effective_size = size - (32 * 1048576); // leaving the last 32 MB for log compaction
+    uint64_t per_thread_bytes = (effective_size/(NUM_WRITERS));
+    std::vector<std::thread> write_thread_multiple;
+    for (int i=0; i<NUM_WRITERS; i++) {
+      write_thread_multiple.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
+    }
+
+    join_all(write_thread_multiple);
+  }
+  fs.umount();
+  rm_temp_bdev(fn);
+}
+
+TEST(BlueFS, test_flush_3) {
+  uint64_t size = 1048476 * 128;
+  string fn = get_temp_bdev(size);
+  g_ceph_context->_conf->set_val(
+    "bluefs_alloc_size",
+    "65536");
+  g_ceph_context->_conf->apply_changes(NULL);
+
+  BlueFS fs;
+  ASSERT_EQ(0, fs.add_block_device(BlueFS::BDEV_DB, fn));
+  fs.add_block_extent(BlueFS::BDEV_DB, 1048576, size - 1048576);
+  uuid_d fsid;
+  ASSERT_EQ(0, fs.mkfs(fsid));
+  ASSERT_EQ(0, fs.mount());
+  {
+    std::vector<std::thread> write_threads;
+    uint64_t effective_size = size - (11 * 1048576); // leaving the last 11 MB for log compaction
+    uint64_t per_thread_bytes = (effective_size/(NUM_WRITERS));
+    for (int i=0; i<NUM_WRITERS; i++) {
+      write_threads.push_back(std::thread(write_data, std::ref(fs), per_thread_bytes));
+    }
+
+    std::vector<std::thread> sync_threads;
+    for (int i=0; i<NUM_SYNC_THREADS; i++) {
+      sync_threads.push_back(std::thread(sync_fs, std::ref(fs)));
+    }
+
+    join_all(write_threads);
+    writes_done = true;
+    join_all(sync_threads);
+  }
+  fs.umount();
+  rm_temp_bdev(fn);
+}
+
+
 int main(int argc, char **argv) {
   vector<const char*> args;
   argv_to_vec(argc, (const char **)argv, args);