]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
libcephfs: Add test for lazyio via libcephfs 28834/head
authorSidharth Anupkrishnan <sanupkri@redhat.com>
Wed, 26 Jun 2019 15:54:36 +0000 (21:24 +0530)
committerSidharth Anupkrishnan <sanupkri@redhat.com>
Thu, 19 Sep 2019 12:15:25 +0000 (17:45 +0530)
Signed-off-by: Sidharth Anupkrishnan <sanupkri@redhat.com>
qa/workunits/libcephfs/test.sh
src/client/Client.cc
src/client/Client.h
src/include/cephfs/libcephfs.h
src/libcephfs.cc
src/test/libcephfs/CMakeLists.txt
src/test/libcephfs/lazyio.cc [new file with mode: 0644]

index e2cbd6a529b7577dcb7f8a905e3f5df0e779896d..9d3656be735c2fb2890448897694601668e5d0fd 100755 (executable)
@@ -3,5 +3,6 @@
 ceph_test_libcephfs
 ceph_test_libcephfs_access
 ceph_test_libcephfs_reclaim
+ceph_test_libcephfs_lazyio
 
 exit 0
index 7210b745420e3bcb819ae251c9641234aa955963..50b7de4b2a0300baa28cb3e8472735c2cbd16152 100644 (file)
@@ -10484,10 +10484,10 @@ int Client::ll_lazyio(Fh *fh, int enable)
   return _lazyio(fh, enable);
 }
 
-int Client::lazyio_propogate(int fd, loff_t offset, size_t count)
+int Client::lazyio_propagate(int fd, loff_t offset, size_t count)
 {
   std::lock_guard l(client_lock);
-  ldout(cct, 3) << "op: client->lazyio_propogate(" << fd
+  ldout(cct, 3) << "op: client->lazyio_propagate(" << fd
           << ", " << offset << ", " << count << ")" << dendl;
   
   Fh *f = get_filehandle(fd);
index d95ffe3c6409ee8483b5051471ded940ecf0bdb9..612c1ec50170ba31743dcb9195bd94a23e38733c 100644 (file)
@@ -454,7 +454,7 @@ public:
 
   // hpc lazyio
   int lazyio(int fd, int enable);
-  int lazyio_propogate(int fd, loff_t offset, size_t count);
+  int lazyio_propagate(int fd, loff_t offset, size_t count);
   int lazyio_synchronize(int fd, loff_t offset, size_t count);
 
   // expose file layout
index 6cc5c111b3fccd9d369045ae66dc654e0afd611b..974b59907319ad163fcc7a557d43ccdbf189a7b9 100644 (file)
@@ -1119,11 +1119,11 @@ int ceph_lazyio(struct ceph_mount_info *cmount, int fd, int enable);
  * @param offset a boolean to enable lazyio or disable lazyio.
  * @returns 0 on success or a negative error code on failure.
  */
-int ceph_lazyio_propogate(struct ceph_mount_info *cmount, int fd, int64_t offset, size_t count);
+int ceph_lazyio_propagate(struct ceph_mount_info *cmount, int fd, int64_t offset, size_t count);
 
 
 /**
- * Flushes the write buffer for the file and invalidate the read cache. This allows a subsequent read operation to read and cache data directly from the file and hence everyone's propogated writes would be visible. 
+ * Flushes the write buffer for the file and invalidate the read cache. This allows a subsequent read operation to read and cache data directly from the file and hence everyone's propagated writes would be visible. 
  *
  * @param cmount the ceph mount handle to use for performing the fsync.
  * @param fd the file descriptor of the file to sync.
index 2283d00882756b8da1c9f976373f1db1f31cf8e7..8a6b14d1ed8dda06e322a32fce0348188eaeb1ec 100644 (file)
@@ -1078,12 +1078,12 @@ extern "C" int ceph_lazyio(class ceph_mount_info *cmount,
   return (cmount->get_client()->lazyio(fd, enable));
 }
 
-extern "C" int ceph_lazyio_propogate(class ceph_mount_info *cmount,
+extern "C" int ceph_lazyio_propagate(class ceph_mount_info *cmount,
                            int fd, int64_t offset, size_t count)
 {
   if (!cmount->is_mounted())
     return -ENOTCONN;
-  return (cmount->get_client()->lazyio_propogate(fd, offset, count));
+  return (cmount->get_client()->lazyio_propagate(fd, offset, count));
 }
 
 extern "C" int ceph_lazyio_synchronize(class ceph_mount_info *cmount,
index a342a8a470c9b5c8b038d61781687981af47e805..f093324e7941fed453f68688a8802ab147a4276d 100644 (file)
@@ -32,6 +32,19 @@ if(${WITH_CEPHFS})
   install(TARGETS ceph_test_libcephfs_reclaim
     DESTINATION ${CMAKE_INSTALL_BINDIR})
 
+  add_executable(ceph_test_libcephfs_lazyio
+    lazyio.cc
+  )
+  target_link_libraries(ceph_test_libcephfs_lazyio
+    cephfs
+    librados
+    ${UNITTEST_LIBS}
+    ${EXTRALIBS}
+    ${CMAKE_DL_LIBS}
+    )   
+  install(TARGETS ceph_test_libcephfs_lazyio
+    DESTINATION ${CMAKE_INSTALL_BINDIR})
+
   add_executable(ceph_test_libcephfs_access
     test.cc
     access.cc
diff --git a/src/test/libcephfs/lazyio.cc b/src/test/libcephfs/lazyio.cc
new file mode 100644 (file)
index 0000000..16e8e0a
--- /dev/null
@@ -0,0 +1,353 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2019 Red Hat Ltd
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "gtest/gtest.h"
+#include "include/cephfs/libcephfs.h"
+#include "include/rados/librados.h"
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <sys/xattr.h>
+
+rados_t cluster;
+
+TEST(LibCephFS, LazyIOOneWriterMulipleReaders) {
+  struct ceph_mount_info *ca, *cb;
+  ASSERT_EQ(ceph_create(&ca, NULL), 0);
+  ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
+  ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
+  ASSERT_EQ(ceph_mount(ca, NULL), 0);
+
+  ASSERT_EQ(ceph_create(&cb, NULL), 0);
+  ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
+  ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
+  ASSERT_EQ(ceph_mount(cb, NULL), 0);
+
+  char name[20];
+  snprintf(name, sizeof(name), "foo.%d", getpid());
+
+  int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
+  ASSERT_LE(0, fda);
+
+  int fdb = ceph_open(cb, name, O_RDONLY, 0644);
+  ASSERT_LE(0, fdb);
+
+  ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
+  ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
+  char out_buf[] = "fooooooooo";
+
+  /* Client a issues a write and propagates/flushes the buffer */
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
+  ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
+
+  /* Client a issues a write and propagates/flushes the buffer */
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 10));
+  ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
+
+  char in_buf[40];
+  /* Calling ceph_lazyio_synchronize here will invalidate client b's cache and hence enable client a to fetch the propagated write of client a in the subsequent read */
+  ASSERT_EQ(0, ceph_lazyio_synchronize(cb, fdb, 0, 0));
+  ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
+  ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
+
+  /* Client a does not need to call ceph_lazyio_synchronize here because it is the latest writer and fda holds the updated inode*/
+  ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
+  ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
+
+  ceph_close(ca, fda);
+  ceph_close(cb, fdb);
+
+  ceph_shutdown(ca);
+  ceph_shutdown(cb);
+}
+
+TEST(LibCephFS, LazyIOMultipleWritersMulipleReaders) {
+  struct ceph_mount_info *ca, *cb;
+  ASSERT_EQ(ceph_create(&ca, NULL), 0); 
+  ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0); 
+  ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
+  ASSERT_EQ(ceph_mount(ca, NULL), 0); 
+
+  ASSERT_EQ(ceph_create(&cb, NULL), 0); 
+  ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0); 
+  ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
+  ASSERT_EQ(ceph_mount(cb, NULL), 0); 
+
+  char name[20];
+  snprintf(name, sizeof(name), "foo2.%d", getpid());
+
+  int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
+  ASSERT_LE(0, fda);
+
+  int fdb = ceph_open(cb, name, O_RDWR, 0644);
+  ASSERT_LE(0, fdb);
+
+  ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
+  ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
+
+  char out_buf[] = "fooooooooo";
+  /* Client a issues a write and propagates/flushes the buffer */
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
+  ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
+  
+  /* Client b issues a write and propagates/flushes the buffer*/
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 10));
+  ASSERT_EQ(0, ceph_lazyio_propagate(cb, fdb, 0, 0));
+
+  char in_buf[40];
+  /* Calling ceph_lazyio_synchronize here will invalidate client a's cache and hence enable client a to fetch the propagated writes of client b in the subsequent read */
+  ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
+  ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
+  ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
+  
+  /* Client b does not need to call ceph_lazyio_synchronize here because it is the latest writer and the writes before it have already been propagated*/
+  ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
+  ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
+
+  /* Client a issues a write */
+  char wait_out_buf[] = "foobarbars";
+  ASSERT_EQ((int)sizeof(wait_out_buf), ceph_write(ca, fda, wait_out_buf, sizeof(wait_out_buf), 20));
+  ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
+
+  /* Client a does not need to call ceph_lazyio_synchronize here because it is the latest writer and the writes before it have already been propagated*/
+  ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+strlen(wait_out_buf)+1);
+  ASSERT_STREQ(in_buf, "fooooooooofooooooooofoobarbars");
+
+  /* Calling ceph_lazyio_synchronize here will invalidate client b's cache and hence enable client a to fetch the propagated write of client a in the subsequent read */
+  ASSERT_EQ(0, ceph_lazyio_synchronize(cb, fdb, 0, 0));
+  ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+strlen(wait_out_buf)+1);
+  ASSERT_STREQ(in_buf, "fooooooooofooooooooofoobarbars");
+
+  ceph_close(ca, fda);
+  ceph_close(cb, fdb);
+
+  ceph_shutdown(ca);
+  ceph_shutdown(cb);
+}
+
+TEST(LibCephFS, LazyIOMultipleWritersOneReader) {
+  struct ceph_mount_info *ca, *cb;
+  ASSERT_EQ(ceph_create(&ca, NULL), 0);
+  ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
+  ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
+  ASSERT_EQ(ceph_mount(ca, NULL), 0);
+
+  ASSERT_EQ(ceph_create(&cb, NULL), 0);
+  ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
+  ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
+  ASSERT_EQ(ceph_mount(cb, NULL), 0);
+
+  char name[20];
+  snprintf(name, sizeof(name), "foo3.%d", getpid());
+
+  int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
+  ASSERT_LE(0, fda);
+
+  int fdb = ceph_open(cb, name, O_RDWR, 0644);
+  ASSERT_LE(0, fdb);
+  ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
+  ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
+
+  char out_buf[] = "fooooooooo";
+  /* Client a issues a write and propagates/flushes the buffer */
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
+  ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
+  
+  /* Client b issues a write and propagates/flushes the buffer*/
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 10));
+  ASSERT_EQ(0, ceph_lazyio_propagate(cb, fdb, 0, 0));
+
+  char in_buf[40];
+  /* Client a reads the file and verifies that it only reads it's propagated writes and not Client b's*/
+  ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), strlen(out_buf)+1);
+  ASSERT_STREQ(in_buf, "fooooooooo");
+  
+  /* Client a reads the file again, this time with a lazyio_synchronize to check if the cache gets invalidated and data is refetched i.e all the propagated writes are being read*/
+  ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
+  ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 2*strlen(out_buf)+1);
+  ASSERT_STREQ(in_buf, "fooooooooofooooooooo");
+
+  ceph_close(ca, fda);
+  ceph_close(cb, fdb);
+
+  ceph_shutdown(ca);
+  ceph_shutdown(cb);
+}
+
+TEST(LibCephFS, LazyIOSynchronizeFlush) {
+  /* Test to make sure lazyio_synchronize flushes dirty buffers */
+  struct ceph_mount_info *ca, *cb;
+  ASSERT_EQ(ceph_create(&ca, NULL), 0);
+  ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
+  ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
+  ASSERT_EQ(ceph_mount(ca, NULL), 0);
+
+  ASSERT_EQ(ceph_create(&cb, NULL), 0);
+  ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
+  ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
+  ASSERT_EQ(ceph_mount(cb, NULL), 0);
+
+  char name[20];
+  snprintf(name, sizeof(name), "foo4.%d", getpid());
+
+  int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
+  ASSERT_LE(0, fda);
+
+  int fdb = ceph_open(cb, name, O_RDWR, 0644);
+  ASSERT_LE(0, fdb);
+
+  ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
+  ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
+
+  char out_buf[] = "fooooooooo";
+
+  /* Client a issues a write and propagates it*/
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 0));
+  ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
+
+  /* Client b issues writes and without lazyio_propagate*/
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 10));
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 20));
+  
+  char in_buf[40];
+  /* Calling ceph_lazyio_synchronize here will first flush the possibly pending buffered write of client b and invalidate client b's cache and hence enable client b to fetch all the propagated writes */
+  ASSERT_EQ(0, ceph_lazyio_synchronize(cb, fdb, 0, 0));
+  ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), 3*strlen(out_buf)+1);
+  ASSERT_STREQ(in_buf, "fooooooooofooooooooofooooooooo");
+
+  /* Required to call ceph_lazyio_synchronize here since client b is the latest writer and client a is out of sync with updated file*/ 
+  ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
+  ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), 3*strlen(out_buf)+1);
+  ASSERT_STREQ(in_buf, "fooooooooofooooooooofooooooooo");
+
+  ceph_close(ca, fda);
+  ceph_close(cb, fdb);
+
+  ceph_shutdown(ca);
+  ceph_shutdown(cb);
+}
+
+TEST(LibCephFS, WithoutandWithLazyIO) {
+  struct ceph_mount_info *ca, *cb;
+  ASSERT_EQ(ceph_create(&ca, NULL), 0);
+  ASSERT_EQ(ceph_conf_read_file(ca, NULL), 0);
+  ASSERT_EQ(0, ceph_conf_parse_env(ca, NULL));
+  ASSERT_EQ(ceph_mount(ca, NULL), 0);
+
+  ASSERT_EQ(ceph_create(&cb, NULL), 0);
+  ASSERT_EQ(ceph_conf_read_file(cb, NULL), 0);
+  ASSERT_EQ(0, ceph_conf_parse_env(cb, NULL));
+  ASSERT_EQ(ceph_mount(cb, NULL), 0);
+
+  char name[20];
+  snprintf(name, sizeof(name), "foo5.%d", getpid());
+
+  int fda = ceph_open(ca, name, O_CREAT|O_RDWR, 0644);
+  ASSERT_LE(0, fda);
+
+  int fdb = ceph_open(cb, name, O_RDWR, 0644);
+  ASSERT_LE(0, fdb);
+
+  char out_buf_w[] = "1234567890";
+  /* Doing some non lazyio writes and read*/
+  ASSERT_EQ((int)sizeof(out_buf_w), ceph_write(ca, fda, out_buf_w, sizeof(out_buf_w), 0));
+
+  ASSERT_EQ((int)sizeof(out_buf_w), ceph_write(cb, fdb, out_buf_w, sizeof(out_buf_w), 10));
+
+  char in_buf_w[30];
+  ASSERT_EQ(ceph_read(ca, fda, in_buf_w, sizeof(in_buf_w), 0), 2*strlen(out_buf_w)+1);
+
+  /* Enable lazyio*/
+  ASSERT_EQ(0, ceph_lazyio(ca, fda, 1));
+  ASSERT_EQ(0, ceph_lazyio(cb, fdb, 1));
+
+  char out_buf[] = "fooooooooo";
+
+  /* Client a issues a write and propagates/flushes the buffer*/
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(ca, fda, out_buf, sizeof(out_buf), 20));
+  ASSERT_EQ(0, ceph_lazyio_propagate(ca, fda, 0, 0));
+  
+  /* Client b issues a write and propagates/flushes the buffer*/
+  ASSERT_EQ((int)sizeof(out_buf), ceph_write(cb, fdb, out_buf, sizeof(out_buf), 30));
+  ASSERT_EQ(0, ceph_lazyio_propagate(cb, fdb, 0, 0));
+
+  char in_buf[50];
+  /* Calling ceph_lazyio_synchronize here will invalidate client a's cache and hence enable client a to fetch the propagated writes of client b in the subsequent read */
+  ASSERT_EQ(0, ceph_lazyio_synchronize(ca, fda, 0, 0));
+  ASSERT_EQ(ceph_read(ca, fda, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+(2*(strlen(out_buf_w)))+1);
+  ASSERT_STREQ(in_buf, "12345678901234567890fooooooooofooooooooo");
+
+  /* Client b does not need to call ceph_lazyio_synchronize here because it is the latest writer and the writes before it have already been propagated*/
+  ASSERT_EQ(ceph_read(cb, fdb, in_buf, sizeof(in_buf), 0), (2*(strlen(out_buf)))+(2*(strlen(out_buf_w)))+1);
+  ASSERT_STREQ(in_buf, "12345678901234567890fooooooooofooooooooo");
+
+  ceph_close(ca, fda);
+  ceph_close(cb, fdb);
+
+  ceph_shutdown(ca);
+  ceph_shutdown(cb);
+}
+
+static int update_root_mode()
+{
+  struct ceph_mount_info *admin;
+  int r = ceph_create(&admin, NULL);
+  if (r < 0)
+    return r;
+  ceph_conf_read_file(admin, NULL);
+  ceph_conf_parse_env(admin, NULL);
+  ceph_conf_set(admin, "client_permissions", "false");
+  r = ceph_mount(admin, "/");
+  if (r < 0)
+    goto out;
+  r = ceph_chmod(admin, "/", 0777);
+out:
+  ceph_shutdown(admin);
+  return r;
+}
+
+int main(int argc, char **argv)
+{
+  int r = update_root_mode();
+  if (r < 0)
+    exit(1);
+
+  ::testing::InitGoogleTest(&argc, argv);
+
+  srand(getpid());
+
+  r = rados_create(&cluster, NULL);
+  if (r < 0)
+    exit(1);
+
+  r = rados_conf_read_file(cluster, NULL);
+  if (r < 0)
+    exit(1);
+
+  rados_conf_parse_env(cluster, NULL);
+  r = rados_connect(cluster);
+  if (r < 0)
+    exit(1);
+
+  r = RUN_ALL_TESTS();
+
+  rados_shutdown(cluster);
+
+  return r;
+}