]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
test_librbd_fsx: add krbd mode support
authorIlya Dryomov <ilya.dryomov@inktank.com>
Fri, 2 May 2014 11:13:11 +0000 (15:13 +0400)
committerIlya Dryomov <ilya.dryomov@inktank.com>
Wed, 7 May 2014 13:30:05 +0000 (17:30 +0400)
Add krbd mode support (-K) to test krbd in the same way librbd is
tested.  This introduces a dependency on libkrbd and, because it's
a C++ static library, requires C++ linking.  The rbd_operations
framework can be extended in the future to also test rbd_fuse.

Signed-off-by: Ilya Dryomov <ilya.dryomov@inktank.com>
src/test/Makefile.am
src/test/librbd/fsx.c

index 71f2a7feb9f36258bb1f43102226d09b51320640..e2fbf48e7eccb6e8b262f6250868600e213c9cb9 100644 (file)
@@ -640,7 +640,8 @@ bin_DEBUGPROGRAMS += ceph_test_librbd
 
 if LINUX
 ceph_test_librbd_fsx_SOURCES = test/librbd/fsx.c
-ceph_test_librbd_fsx_LDADD = $(LIBRBD) $(LIBRADOS) -lm
+nodist_EXTRA_ceph_test_librbd_fsx_SOURCES = dummy.cc # force c++ linking
+ceph_test_librbd_fsx_LDADD = $(LIBKRBD) $(LIBRBD) $(LIBRADOS)
 ceph_test_librbd_fsx_CFLAGS = ${AM_CFLAGS} -Wno-format
 bin_DEBUGPROGRAMS += ceph_test_librbd_fsx
 endif
index c34e8ce95a001920c4a84cc2c4bb3ea7437c29de..f3291c1a7bcdaa8a281a1fbbf683a6e3d135330f 100644 (file)
@@ -22,6 +22,8 @@
 #include <sys/file.h>
 #include <sys/stat.h>
 #include <sys/mman.h>
+#include <linux/fs.h>
+#include <sys/ioctl.h>
 #ifdef HAVE_ERR_H
 #include <err.h>
 #endif
 #include <stdlib.h>
 #include <string.h>
 #include <stdarg.h>
+#include <assert.h>
 #include <errno.h>
 #include <math.h>
 
 #include "include/intarith.h"
+#include "include/krbd.h"
 #include "include/rados/librados.h"
 #include "include/rbd/librbd.h"
 
@@ -109,11 +113,6 @@ int                        logcount = 0;   /* total ops */
 char   *original_buf;                  /* a pointer to the original data */
 char   *good_buf;                      /* a pointer to the correct data */
 char   *temp_buf;                      /* a pointer to the current data */
-char   *pool;                          /* name of the pool our test image is in */
-char   *iname;                         /* name of our test image */
-rados_t cluster;                        /* handle for our test cluster */
-rados_ioctx_t  ioctx;                  /* handle for our test pool */
-rbd_image_t    image;                  /* handle for our test image */
 
 char   dirpath[1024];
 
@@ -227,6 +226,479 @@ simple_err(const char *msg, int err)
     fprintf(stderr, "%s: %s\n", msg, strerror(-err));
 }
 
+/*
+ * rbd
+ */
+struct rbd_ctx {
+       const char *name;       /* image name */
+       rbd_image_t image;      /* image handle */
+       const char *krbd_name;  /* image /dev/rbd<id> name */
+       int krbd_fd;            /* image /dev/rbd<id> fd */
+};
+
+#define RBD_CTX_INIT   (struct rbd_ctx) { NULL, NULL, NULL, -1 }
+
+struct rbd_operations {
+       int (*open)(const char *name, struct rbd_ctx *ctx);
+       int (*close)(struct rbd_ctx *ctx);
+       ssize_t (*read)(struct rbd_ctx *ctx, uint64_t off, size_t len, void *buf);
+       ssize_t (*write)(struct rbd_ctx *ctx, uint64_t off, size_t len, void *buf);
+       int (*flush)(struct rbd_ctx *ctx);
+       int (*discard)(struct rbd_ctx *ctx, uint64_t off, uint64_t len);
+       int (*get_size)(struct rbd_ctx *ctx, uint64_t *size);
+       int (*resize)(struct rbd_ctx *ctx, uint64_t size);
+       int (*clone)(struct rbd_ctx *ctx, const char *src_snapname,
+                    const char *dst_imagename, int *order, int stripe_unit,
+                    int stripe_count);
+       int (*flatten)(struct rbd_ctx *ctx);
+};
+
+char *pool;                    /* name of the pool our test image is in */
+char *iname;                   /* name of our test image */
+rados_t cluster;               /* handle for our test cluster */
+rados_ioctx_t ioctx;           /* handle for our test pool */
+struct krbd_ctx *krbd;         /* handle for libkrbd */
+
+/*
+ * librbd/krbd rbd_operations handlers.  Given the rest of fsx.c, no
+ * attempt to do error handling is made in these handlers.
+ */
+
+int
+__librbd_open(const char *name, struct rbd_ctx *ctx)
+{
+       rbd_image_t image;
+       int ret;
+
+       assert(!ctx->name && !ctx->image &&
+              !ctx->krbd_name && ctx->krbd_fd < 0);
+
+       ret = rbd_open(ioctx, name, &image, NULL);
+       if (ret < 0) {
+               prt("rbd_open(%s) failed\n", name);
+               return ret;
+       }
+
+       ctx->name = strdup(name);
+       ctx->image = image;
+       ctx->krbd_name = NULL;
+       ctx->krbd_fd = -1;
+
+       return 0;
+}
+
+int
+librbd_open(const char *name, struct rbd_ctx *ctx)
+{
+       return __librbd_open(name, ctx);
+}
+
+int
+__librbd_close(struct rbd_ctx *ctx)
+{
+       int ret;
+
+       assert(ctx->name && ctx->image);
+
+       ret = rbd_close(ctx->image);
+       if (ret < 0) {
+               prt("rbd_close(%s) failed\n", ctx->name);
+               return ret;
+       }
+
+       free((void *)ctx->name);
+
+       ctx->name = NULL;
+       ctx->image = NULL;
+
+       return 0;
+}
+
+int
+librbd_close(struct rbd_ctx *ctx)
+{
+       return __librbd_close(ctx);
+}
+
+ssize_t
+librbd_read(struct rbd_ctx *ctx, uint64_t off, size_t len, void *buf)
+{
+       ssize_t n;
+
+       n = rbd_read(ctx->image, off, len, buf);
+       if (n < 0)
+               prt("rbd_read(%llu, %zu) failed\n", off, len);
+
+       return n;
+}
+
+ssize_t
+librbd_write(struct rbd_ctx *ctx, uint64_t off, size_t len, void *buf)
+{
+       ssize_t n;
+
+       n = rbd_write(ctx->image, off, len, buf);
+       if (n < 0)
+               prt("rbd_write(%llu, %zu) failed\n", off, len);
+
+       return n;
+}
+
+int
+librbd_flush(struct rbd_ctx *ctx)
+{
+       int ret;
+
+       ret = rbd_flush(ctx->image);
+       if (ret < 0) {
+               prt("rbd_flush failed\n");
+               return ret;
+       }
+
+       return 0;
+}
+
+int
+librbd_discard(struct rbd_ctx *ctx, uint64_t off, uint64_t len)
+{
+       int ret;
+
+       ret = rbd_discard(ctx->image, off, len);
+       if (ret < 0) {
+               prt("rbd_discard(%llu, %llu) failed\n", off, len);
+               return ret;
+       }
+
+       return 0;
+}
+
+int
+librbd_get_size(struct rbd_ctx *ctx, uint64_t *size)
+{
+       rbd_image_info_t info;
+       int ret;
+
+       ret = rbd_stat(ctx->image, &info, sizeof(info));
+       if (ret < 0) {
+               prt("rbd_stat failed\n");
+               return ret;
+       }
+
+       *size = info.size;
+
+       return 0;
+}
+
+int
+__librbd_resize(struct rbd_ctx *ctx, uint64_t size)
+{
+       int ret;
+
+       ret = rbd_resize(ctx->image, size);
+       if (ret < 0) {
+               prt("rbd_resize(%llu) failed\n", size);
+               return ret;
+       }
+
+       return 0;
+}
+
+int
+librbd_resize(struct rbd_ctx *ctx, uint64_t size)
+{
+       return __librbd_resize(ctx, size);
+}
+
+int
+__librbd_clone(struct rbd_ctx *ctx, const char *src_snapname,
+              const char *dst_imagename, int *order, int stripe_unit,
+              int stripe_count)
+{
+       int ret;
+
+       ret = rbd_snap_create(ctx->image, src_snapname);
+       if (ret < 0) {
+               prt("rbd_snap_create(%s@%s) failed\n", ctx->name,
+                   src_snapname);
+               return ret;
+       }
+
+       ret = rbd_snap_protect(ctx->image, src_snapname);
+       if (ret < 0) {
+               prt("rbd_snap_protect(%s@%s) failed\n", ctx->name,
+                   src_snapname);
+               return ret;
+       }
+
+       ret = rbd_clone2(ioctx, ctx->name, src_snapname, ioctx,
+                        dst_imagename, RBD_FEATURES_ALL, order,
+                        stripe_unit, stripe_count);
+       if (ret < 0) {
+               prt("rbd_clone2(%s@%s -> %s) failed\n", ctx->name,
+                   src_snapname, dst_imagename);
+               return ret;
+       }
+
+       return 0;
+}
+
+int
+librbd_clone(struct rbd_ctx *ctx, const char *src_snapname,
+            const char *dst_imagename, int *order, int stripe_unit,
+            int stripe_count)
+{
+       return __librbd_clone(ctx, src_snapname, dst_imagename, order,
+                             stripe_unit, stripe_count);
+}
+
+int
+__librbd_flatten(struct rbd_ctx *ctx)
+{
+       int ret;
+
+       ret = rbd_flatten(ctx->image);
+       if (ret < 0) {
+               prt("rbd_flatten failed\n");
+               return ret;
+       }
+
+       return 0;
+}
+
+int
+librbd_flatten(struct rbd_ctx *ctx)
+{
+       return __librbd_flatten(ctx);
+}
+
+const struct rbd_operations librbd_operations = {
+       .open   = librbd_open,
+       .close  = librbd_close,
+       .read   = librbd_read,
+       .write  = librbd_write,
+       .flush  = librbd_flush,
+       .discard = librbd_discard,
+       .get_size = librbd_get_size,
+       .resize = librbd_resize,
+       .clone  = librbd_clone,
+       .flatten = librbd_flatten,
+};
+
+int
+krbd_open(const char *name, struct rbd_ctx *ctx)
+{
+       char *devnode;
+       int fd;
+       int ret;
+
+       ret = __librbd_open(name, ctx);
+       if (ret < 0)
+               return ret;
+
+       ret = krbd_map(krbd, pool, name, NULL, NULL, &devnode);
+       if (ret < 0) {
+               prt("krbd_map(%s) failed\n", name);
+               return ret;
+       }
+
+       fd = open(devnode, O_RDWR);
+       if (fd < 0) {
+               ret = -errno;
+               prt("open(%s) failed\n", devnode);
+               return ret;
+       }
+
+       ctx->krbd_name = devnode;
+       ctx->krbd_fd = fd;
+
+       return 0;
+}
+
+int
+krbd_close(struct rbd_ctx *ctx)
+{
+       int ret;
+
+       assert(ctx->krbd_name && ctx->krbd_fd >= 0);
+
+       if (close(ctx->krbd_fd) < 0) {
+               ret = -errno;
+               prt("close(%s) failed\n", ctx->krbd_name);
+               return ret;
+       }
+
+       ret = krbd_unmap(krbd, ctx->krbd_name);
+       if (ret < 0) {
+               prt("krbd_unmap(%s) failed\n", ctx->krbd_name);
+               return ret;
+       }
+
+       free((void *)ctx->krbd_name);
+
+       ctx->krbd_name = NULL;
+       ctx->krbd_fd = -1;
+
+       return __librbd_close(ctx);
+}
+
+ssize_t
+krbd_read(struct rbd_ctx *ctx, uint64_t off, size_t len, void *buf)
+{
+       ssize_t n;
+
+       n = pread(ctx->krbd_fd, buf, len, off);
+       if (n < 0) {
+               n = -errno;
+               prt("pread(%llu, %zu) failed\n", off, len);
+               return n;
+       }
+
+       return n;
+}
+
+ssize_t
+krbd_write(struct rbd_ctx *ctx, uint64_t off, size_t len, void *buf)
+{
+       ssize_t n;
+
+       n = pwrite(ctx->krbd_fd, buf, len, off);
+       if (n < 0) {
+               n = -errno;
+               prt("pwrite(%llu, %zu) failed\n", off, len);
+               return n;
+       }
+
+       return n;
+}
+
+int
+__krbd_flush(struct rbd_ctx *ctx)
+{
+       int ret;
+
+       /*
+        * fsync(2) on the block device does not sync the filesystem
+        * mounted on top of it, but that's OK - we control the entire
+        * lifetime of the block device and write directly to it.
+        */
+       if (fsync(ctx->krbd_fd) < 0) {
+               ret = -errno;
+               prt("fsync failed\n");
+               return ret;
+       }
+
+       return 0;
+}
+
+int
+krbd_flush(struct rbd_ctx *ctx)
+{
+       return __krbd_flush(ctx);
+}
+
+int
+krbd_discard(struct rbd_ctx *ctx, uint64_t off, uint64_t len)
+{
+       uint64_t range[2] = { off, len };
+       int ret;
+
+       /*
+        * off and len must be 512-byte aligned, otherwise BLKDISCARD
+        * will fail with -EINVAL.  This means that -K (enable krbd
+        * mode) requires -h 512 or similar.
+        */
+       if (ioctl(ctx->krbd_fd, BLKDISCARD, &range) < 0) {
+               ret = -errno;
+               prt("BLKDISCARD(%llu, %llu) failed\n", off, len);
+               return ret;
+       }
+
+       return 0;
+}
+
+int
+krbd_get_size(struct rbd_ctx *ctx, uint64_t *size)
+{
+       uint64_t bytes;
+       int ret;
+
+       if (ioctl(ctx->krbd_fd, BLKGETSIZE64, &bytes) < 0) {
+               ret = -errno;
+               prt("BLKGETSIZE64 failed\n");
+               return ret;
+       }
+
+       *size = bytes;
+
+       return 0;
+}
+
+int
+krbd_resize(struct rbd_ctx *ctx, uint64_t size)
+{
+       int ret;
+
+       /*
+        * This is essential: when krbd detects a size change, it calls
+        * revalidate_disk(), which ends up calling invalidate_bdev(),
+        * which invalidates only clean buffers.  The cache flush makes
+        * it invalidate everything, which is what we need if we are
+        * shrinking.
+        */
+       ret = __krbd_flush(ctx);
+       if (ret < 0)
+               return ret;
+
+       return __librbd_resize(ctx, size);
+}
+
+int
+krbd_clone(struct rbd_ctx *ctx, const char *src_snapname,
+          const char *dst_imagename, int *order, int stripe_unit,
+          int stripe_count)
+{
+       int ret;
+
+       ret = __krbd_flush(ctx);
+       if (ret < 0)
+               return ret;
+
+       return __librbd_clone(ctx, src_snapname, dst_imagename, order,
+                             stripe_unit, stripe_count);
+}
+
+int
+krbd_flatten(struct rbd_ctx *ctx)
+{
+       int ret;
+
+       ret = __krbd_flush(ctx);
+       if (ret < 0)
+               return ret;
+
+       return __librbd_flatten(ctx);
+}
+
+const struct rbd_operations krbd_operations = {
+       .open   = krbd_open,
+       .close  = krbd_close,
+       .read   = krbd_read,
+       .write  = krbd_write,
+       .flush  = krbd_flush,
+       .discard = krbd_discard,
+       .get_size = krbd_get_size,
+       .resize = krbd_resize,
+       .clone  = krbd_clone,
+       .flatten = krbd_flatten,
+};
+
+struct rbd_ctx ctx = RBD_CTX_INIT;
+const struct rbd_operations *ops = &librbd_operations;
+
+/*
+ * fsx
+ */
+
 void
 log4(int operation, int arg0, int arg1, int arg2)
 {
@@ -451,16 +923,17 @@ check_buffers(char *good_buf, char *temp_buf, unsigned offset, unsigned size)
 void
 check_size(void)
 {
-       rbd_image_info_t statbuf;
+       uint64_t size;
        int ret;
 
-       if ((ret = rbd_stat(image, &statbuf, sizeof(statbuf))) < 0) {
-               prterrcode("check_size: fstat", ret);
-       }
-       if ((uint64_t)file_size != statbuf.size) {
+       ret = ops->get_size(&ctx, &size);
+       if (ret < 0)
+               prterrcode("check_size: ops->get_size", ret);
+
+       if ((uint64_t)file_size != size) {
                prt("Size error: expected 0x%llx stat 0x%llx\n",
                    (unsigned long long)file_size,
-                   (unsigned long long)statbuf.size);
+                   (unsigned long long)size);
                report_failure(120);
        }
 }
@@ -470,16 +943,29 @@ check_size(void)
 void
 check_trunc_hack(void)
 {
-       rbd_image_info_t statbuf;
-
-       rbd_resize(image, 0ULL);
-       rbd_resize(image, TRUNC_HACK_SIZE);
-       rbd_stat(image, &statbuf, sizeof(statbuf));
-       if (statbuf.size != TRUNC_HACK_SIZE) {
-               prt("no extend on truncate! not posix!\n");
-               exit(130);
-       }
-       rbd_resize(image, 0ULL);
+       uint64_t size;
+       int ret;
+
+       ret = ops->resize(&ctx, 0ULL);
+       if (ret < 0)
+               prterrcode("check_trunc_hack: ops->resize pre", ret);
+
+       ret = ops->resize(&ctx, TRUNC_HACK_SIZE);
+       if (ret < 0)
+               prterrcode("check_trunc_hack: ops->resize actual", ret);
+
+       ret = ops->get_size(&ctx, &size);
+       if (ret < 0)
+               prterrcode("check_trunc_hack: ops->get_size", ret);
+
+       if (size != TRUNC_HACK_SIZE) {
+               prt("no extend on truncate! not posix!\n");
+               exit(130);
+       }
+
+       ret = ops->resize(&ctx, 0ULL);
+       if (ret < 0)
+               prterrcode("check_trunc_hack: ops->resize post", ret);
 }
 
 int
@@ -487,6 +973,7 @@ create_image()
 {
        int r;
        int order = 0;
+
        r = rados_create(&cluster, NULL);
        if (r < 0) {
                simple_err("Could not create cluster handle", r);
@@ -503,15 +990,21 @@ create_image()
                simple_err("Error connecting to cluster", r);
                goto failed_shutdown;
        }
+       r = krbd_create_from_context(rados_cct(cluster), &krbd);
+       if (r < 0) {
+               simple_err("Could not create libkrbd handle", r);
+               goto failed_shutdown;
+       }
+
        r = rados_pool_create(cluster, pool);
        if (r < 0 && r != -EEXIST) {
                simple_err("Error creating pool", r);
-               goto failed_shutdown;
+               goto failed_krbd;
        }
        r = rados_ioctx_create(cluster, pool, &ioctx);
        if (r < 0) {
                simple_err("Error creating ioctx", r);
-               goto failed_shutdown;
+               goto failed_krbd;
        }
        if (clone_calls) {
                r = rbd_create2(ioctx, iname, 0, RBD_FEATURE_LAYERING, &order);
@@ -527,6 +1020,8 @@ create_image()
 
  failed_open:
        rados_ioctx_destroy(ioctx);
+ failed_krbd:
+       krbd_destroy(krbd);
  failed_shutdown:
        rados_shutdown(cluster);
        return r;
@@ -535,10 +1030,14 @@ create_image()
 void
 doflush(unsigned offset, unsigned size)
 {
+       int ret;
+
        if (o_direct == O_DIRECT)
                return;
 
-       rbd_flush(image);
+       ret = ops->flush(&ctx);
+       if (ret < 0)
+               prterrcode("doflush: ops->flush", ret);
 }
 
 void
@@ -575,15 +1074,17 @@ doread(unsigned offset, unsigned size)
                        (monitorend == -1 || offset <= monitorend))))))
                prt("%lu read\t0x%x thru\t0x%x\t(0x%x bytes)\n", testcalls,
                    offset, offset + size - 1, size);
-       ret = rbd_read(image, offset, size, temp_buf);
+
+       ret = ops->read(&ctx, offset, size, temp_buf);
        if (ret != (int)size) {
                if (ret < 0)
-                       prterrcode("doread: read", ret);
+                       prterrcode("doread: ops->read", ret);
                else
                        prt("short read: 0x%x bytes instead of 0x%x\n",
                            ret, size);
                report_failure(141);
        }
+
        check_buffers(good_buf, temp_buf, offset, size);
 }
 
@@ -655,9 +1156,9 @@ dowrite(unsigned offset, unsigned size)
                        warn("Lite file size bug in fsx!");
                        report_failure(149);
                }
-               ret = rbd_resize(image, newsize);
+               ret = ops->resize(&ctx, newsize);
                if (ret < 0) {
-                       prterrcode("dowrite: resize", ret);
+                       prterrcode("dowrite: ops->resize", ret);
                        report_failure(150);
                }
        }
@@ -674,18 +1175,18 @@ dowrite(unsigned offset, unsigned size)
                prt("%lu write\t0x%x thru\t0x%x\t(0x%x bytes)\n", testcalls,
                    offset, offset + size - 1, size);
 
-       ret = rbd_write(image, offset, size, good_buf + offset);
+       ret = ops->write(&ctx, offset, size, good_buf + offset);
        if (ret != size) {
                if (ret < 0)
-                       prterrcode("dowrite: rbd_write", ret);
+                       prterrcode("dowrite: ops->write", ret);
                else
                        prt("short write: 0x%x bytes instead of 0x%x\n",
                            ret, size);
                report_failure(151);
        }
-       if (flush) {
+
+       if (flush)
                doflush(offset, size);
-       }
 }
 
 
@@ -712,14 +1213,15 @@ dotruncate(unsigned size)
 
        if (testcalls <= simulatedopcount)
                return;
-       
+
        if ((progressinterval && testcalls % progressinterval == 0) ||
            (debug && (monitorstart == -1 || monitorend == -1 ||
                      size <= monitorend)))
                prt("%lu trunc\tfrom 0x%x to 0x%x\n", testcalls, oldsize, size);
-       if ((ret = rbd_resize(image, size)) < 0) {
-               prt("rbd_resize: %x\n", size);
-               prterrcode("dotruncate: ftruncate", ret);
+
+       ret = ops->resize(&ctx, size);
+       if (ret < 0) {
+               prterrcode("dotruncate: ops->resize", ret);
                report_failure(160);
        }
 }
@@ -761,14 +1263,14 @@ do_punch_hole(unsigned offset, unsigned length)
                prt("%lu punch\tfrom 0x%x to 0x%x, (0x%x bytes)\n", testcalls,
                        offset, offset+length, length);
        }
-       if ((ret = rbd_discard(image, (unsigned long long) offset,
-                              (unsigned long long) length)) < 0) {
-               prt("%punch hole: %x to %x\n", offset, length);
-               prterrcode("do_punch_hole: discard", ret);
+
+       ret = ops->discard(&ctx, (unsigned long long)offset,
+                          (unsigned long long)length);
+       if (ret < 0) {
+               prterrcode("do_punch_hole: ops->discard", ret);
                report_failure(161);
        }
 
-
        max_offset = offset < file_size ? offset : file_size;
        max_len = max_offset + length <= file_size ? length :
                        file_size - max_offset;
@@ -824,31 +1326,22 @@ do_clone()
        clone_imagename(imagename, sizeof(imagename), num_clones);
        clone_imagename(lastimagename, sizeof(lastimagename),
                        num_clones - 1);
+       assert(strcmp(lastimagename, ctx.name) == 0);
 
-       if ((ret = rbd_snap_create(image, "snap")) < 0) {
-               simple_err("do_clone: rbd create snap", ret);
-               exit(164);
-       }
-
-       if ((ret = rbd_snap_protect(image, "snap")) < 0) {
-               simple_err("do_clone: rbd protect snap", ret);
-               exit(164);
-       }
-
-       ret = rbd_clone2(ioctx, lastimagename, "snap", ioctx, imagename,
-                        RBD_FEATURES_ALL, &order, stripe_unit, stripe_count);
+       ret = ops->clone(&ctx, "snap", imagename, &order, stripe_unit,
+                        stripe_count);
        if (ret < 0) {
-               simple_err("do_clone: rbd clone", ret);
+               prterrcode("do_clone: ops->clone", ret);
                exit(165);
        }
 
-       if ((ret = rbd_close(image)) < 0) {
-               simple_err("do_clone: rbd close", ret);
+       if ((ret = ops->close(&ctx)) < 0) {
+               prterrcode("do_clone: ops->close", ret);
                exit(174);
        }
 
-       if ((ret = rbd_open(ioctx, imagename, &image, NULL)) < 0) {
-               simple_err("do_clone: rbd open", ret);
+       if ((ret = ops->open(imagename, &ctx)) < 0) {
+               prterrcode("do_clone: ops->open", ret);
                exit(166);
        }
 
@@ -862,13 +1355,13 @@ check_clone(int clonenum)
        char filename[128];
        char imagename[128];
        int ret, fd;
-       rbd_image_t cur_image;
+       struct rbd_ctx cur_ctx = RBD_CTX_INIT;
        struct stat file_info;
        char *good_buf, *temp_buf;
 
        clone_imagename(imagename, sizeof(imagename), clonenum);
-       if ((ret = rbd_open(ioctx, imagename, &cur_image, NULL)) < 0) {
-               simple_err("check_clone: rbd open", ret);
+       if ((ret = ops->open(imagename, &cur_ctx)) < 0) {
+               prterrcode("check_clone: ops->open", ret);
                exit(167);
        }
 
@@ -892,13 +1385,13 @@ check_clone(int clonenum)
                simple_err("check_clone: pread", -errno);
                exit(170);
        }
-       if ((ret = rbd_read(cur_image, 0, file_info.st_size, temp_buf)) < 0) {
-               simple_err("check_clone: rbd_read", ret);
+       if ((ret = ops->read(&cur_ctx, 0, file_info.st_size, temp_buf)) < 0) {
+               prterrcode("check_clone: ops->read", ret);
                exit(171);
        }
        close(fd);
-       if ((ret = rbd_close(cur_image)) < 0) {
-               simple_err("check_clone: rbd close", ret);
+       if ((ret = ops->close(&cur_ctx)) < 0) {
+               prterrcode("check_clone: ops->close", ret);
                exit(174);
        }
        check_buffers(good_buf, temp_buf, 0, file_info.st_size);
@@ -914,19 +1407,22 @@ writefileimage()
 {
        ssize_t ret;
 
-       ret = rbd_write(image, 0, file_size, good_buf);
+       ret = ops->write(&ctx, 0, file_size, good_buf);
        if (ret != file_size) {
                if (ret < 0)
-                       prterrcode("writefileimage: write", ret);
+                       prterrcode("writefileimage: ops->write", ret);
                else
                        prt("short write: 0x%x bytes instead of 0x%llx\n",
                            ret, (unsigned long long)file_size);
                report_failure(172);
        }
-       if (lite ? 0 : (ret = rbd_resize(image, file_size)) < 0) {
-               prt("rbd_resize: %llx\n", (unsigned long long)file_size);
-               prterrcode("writefileimage: rbd_resize", ret);
-               report_failure(173);
+
+       if (!lite) {
+               ret = ops->resize(&ctx, file_size);
+               if (ret < 0) {
+                       prterrcode("writefileimage: ops->resize", ret);
+                       report_failure(173);
+               }
        }
 }
 
@@ -936,16 +1432,16 @@ do_flatten()
        int ret;
 
        if (num_clones == 0 ||
-           (rbd_get_parent_info(image, NULL, 0, NULL, 0, NULL, 0)
-           == -ENOENT)) {
+           (rbd_get_parent_info(ctx.image, NULL, 0, NULL, 0, NULL, 0) == -ENOENT)) {
                log4(OP_SKIPPED, OP_FLATTEN, 0, 0);
                return;
        }
        log4(OP_FLATTEN, 0, 0, 0);
        prt("%lu flatten\n", testcalls);
 
-       if ((ret = rbd_flatten(image)) < 0) {
-               simple_err("do_flatten: rbd flatten", ret);
+       ret = ops->flatten(&ctx);
+       if (ret < 0) {
+               prterrcode("writefileimage: ops->flatten", ret);
                exit(177);
        }
 }
@@ -960,13 +1456,16 @@ docloseopen(void)
 
        if (debug)
                prt("%lu close/open\n", testcalls);
-       if ((ret = rbd_close(image)) < 0) {
-               prterrcode("docloseopen: close", ret);
+
+       ret = ops->close(&ctx);
+       if (ret < 0) {
+               prterrcode("docloseopen: ops->close", ret);
                report_failure(180);
        }
-       ret = rbd_open(ioctx, iname, &image, NULL);
+
+       ret = ops->open(iname, &ctx);
        if (ret < 0) {
-               prterrcode("docloseopen: open", ret);
+               prterrcode("docloseopen: ops->open", ret);
                report_failure(181);
        }
 }
@@ -1111,7 +1610,7 @@ void
 usage(void)
 {
        fprintf(stdout, "usage: %s",
-               "fsx [-dfnqxyACFHLORUWZ] [-b opnum] [-c Prob] [-h holebdy] [-l flen] [-m start:end] [-o oplen] [-p progressinterval] [-r readbdy] [-s style] [-t truncbdy] [-w writebdy] [-D startingop] [-N numops] [-P dirpath] [-S seed] pname iname\n\
+               "fsx [-dfnqxyACFHKLORUWZ] [-b opnum] [-c Prob] [-h holebdy] [-l flen] [-m start:end] [-o oplen] [-p progressinterval] [-r readbdy] [-s style] [-t truncbdy] [-w writebdy] [-D startingop] [-N numops] [-P dirpath] [-S seed] pname iname\n\
        -b opnum: beginning operation number (default 1)\n\
        -c P: 1 in P chance of file close+open at each op (default infinity)\n\
        -d: debug output for all operations\n\
@@ -1139,6 +1638,7 @@ usage(void)
 "      -F: Do not use fallocate (preallocation) calls\n"
 #endif
 "      -H: do not use punch hole calls\n\
+       -K: enable krbd mode (use -t and -h too)\n\
        -L: fsxLite - no file creations & no file size changes\n\
        -N numops: total # operations to do (default infinity)\n\
        -O: use oplen (see -o flag) for every op (default random)\n\
@@ -1325,7 +1825,7 @@ main(int argc, char **argv)
 
        setvbuf(stdout, (char *)0, _IOLBF, 0); /* line buffered stdout */
 
-       while ((ch = getopt(argc, argv, "b:c:dfh:l:m:no:p:qr:s:t:w:xyACD:FHLN:OP:RS:UWZ"))
+       while ((ch = getopt(argc, argv, "b:c:dfh:l:m:no:p:qr:s:t:w:xyACD:FHKLN:OP:RS:UWZ"))
               != EOF)
                switch (ch) {
                case 'b':
@@ -1434,6 +1934,10 @@ main(int argc, char **argv)
                case 'H':
                        punch_hole_calls = 0;
                        break;
+               case 'K':
+                       prt("krbd mode enabled\n");
+                       ops = &krbd_operations;
+                       break;
                case 'L':
                        prt("lite mode not supported for rbd\n");
                        exit(1);
@@ -1506,7 +2010,7 @@ main(int argc, char **argv)
                prterrcode(iname, ret);
                exit(90);
        }
-       ret = rbd_open(ioctx, iname, &image, NULL);
+       ret = ops->open(iname, &ctx);
        if (ret < 0) {
                simple_err("Error opening image", ret);
                exit(91);
@@ -1562,7 +2066,7 @@ main(int argc, char **argv)
        if (lite) {     /* zero entire existing file */
                ssize_t written;
 
-               written = rbd_write(image, 0, (size_t)maxfilelen, good_buf);
+               written = ops->write(&ctx, 0, (size_t)maxfilelen, good_buf);
                if (written != (ssize_t)maxfilelen) {
                        if (written < 0) {
                                prterrcode(iname, written);
@@ -1582,8 +2086,9 @@ main(int argc, char **argv)
        while (numops == -1 || numops--)
                test();
 
-       if ((ret = rbd_close(image)) < 0) {
-               prterrcode("rbd_close", ret);
+       ret = ops->close(&ctx);
+       if (ret < 0) {
+               prterrcode("ops->close", ret);
                report_failure(99);
        }
 
@@ -1592,8 +2097,10 @@ main(int argc, char **argv)
 
        while (num_clones >= 0) {
                static int first = 1;
+               rbd_image_t image;
                char clonename[128];
                char errmsg[128];
+
                clone_imagename(clonename, 128, num_clones);
                if ((ret = rbd_open(ioctx, clonename, &image, NULL)) < 0) {
                        sprintf(errmsg, "rbd_open %s", clonename);
@@ -1605,7 +2112,7 @@ main(int argc, char **argv)
                                sprintf(errmsg, "rbd_snap_unprotect %s@snap",
                                        clonename);
                                prterrcode(errmsg, ret);
-                       report_failure(102);
+                               report_failure(102);
                        }
                        if ((ret = rbd_snap_remove(image, "snap")) < 0) {
                                sprintf(errmsg, "rbd_snap_remove %s@snap",
@@ -1630,16 +2137,17 @@ main(int argc, char **argv)
                num_clones--;
        }
 
+       prt("All operations completed A-OK!\n");
+       fclose(fsxlogf);
+
        rados_ioctx_destroy(ioctx);
+       krbd_destroy(krbd);
        rados_shutdown(cluster);
 
        free(original_buf);
        free(good_buf);
        free(temp_buf);
 
-       prt("All operations completed A-OK!\n");
-       fclose(fsxlogf);
-
        exit(0);
        return 0;
 }