]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
rbd-fuse: add simple RBD FUSE client
authorDan Mick <dan.mick@inktank.com>
Tue, 30 Oct 2012 21:02:53 +0000 (14:02 -0700)
committerDan Mick <dan.mick@inktank.com>
Sat, 26 Jan 2013 05:27:53 +0000 (21:27 -0800)
Currently written in C on FUSE hi-level interfaces, so error reporting
could be better.  No serious work done for performance.  But it's
usable as it stands.

Specify -c <conf> and a mountpoint, and images show up as files in
that mountpoint.  You can create new images; they'll be created
with attributes stored in xattrs:

user.rbdfuse.imagesize: default 1GB
user.rbdfuse.imageorder: default 22
user.rbdfuse.imagefeatures: default 1 (layering)

Images may be truncated or extended by rewriting.  Currently
once an image is opened, it's not closed, so it can't be deleted
or changed outside of the fuse path.

Signed-off-by: Dan Mick <dan.mick@inktank.com>
src/.gitignore
src/Makefile.am
src/ps-ceph.pl
src/rbd_fuse/rbd-fuse.c

index 7548b5e47ae1ea1c2d5282c1a7271e7b21639b5a..d3cab1a4d1f132854a0dcc5f2687ca3f29d38bc7 100644 (file)
@@ -54,6 +54,7 @@
 /fsconverter
 /xattr_bench
 /rest-bench
+/rbd-fuse
 dev
 mondata
 mnt
index 82d585c450773ff2dd928833a1e8795ce5cf1380..c30c0c1a70562d29d48c76b6dd37d02b3fe22967 100644 (file)
@@ -161,6 +161,11 @@ ceph_fuse_LDADD = -lfuse libclient.la $(LIBGLOBAL_LDA)
 ceph_fuse_CXXFLAGS = ${AM_CXXFLAGS}
 bin_PROGRAMS += ceph-fuse
 
+rbd_fuse_SOURCES = rbd_fuse/rbd-fuse.c
+rbd_fuse_LDADD = -lfuse librados.la librbd.la $(LIBGLOBAL_LDA)
+rbd_fuse_CXXFLAGS = ${AM_CXXFLAGS}
+bin_PROGRAMS += rbd-fuse
+
 endif
 
 # tcmalloc?
index dc23629649485c56c8fc652d62f1e55116119a50..03fc6061373161321635c9b06ccc63bcd33c1bd4 100755 (executable)
@@ -18,6 +18,7 @@ sub is_ceph_proc {
 
         return 1 if $cmdline =~ /\bceph\b/;
         return 1 if $cmdline =~ /\bceph-fuse\b/;
+        return 1 if $cmdline =~ /\brbd-fuse\b/;
         return 1 if $cmdline =~ /\bceph-mds\b/;
         return 1 if $cmdline =~ /\bceph-mon\b/;
         return 1 if $cmdline =~ /\bceph-osd\b/;
index 878e8c8957b8acb5c9e907f7b5cf980708948e8c..b3e318f0781e4afedd7bfb1e2eaec21d463c5243 100644 (file)
@@ -1,12 +1,6 @@
 /*
  * rbd-fuse
- * 
- * compile with
- * cc -D_FILE_OFFSET_BITS=64 rbd-fuse.c -o rbd-fuse -lfuse -lrbd
  */
-#define PACKAGE_VERSION "0.1"
-
-#define _GNU_SOURCE
 #define FUSE_USE_VERSION 26
 
 #include <stdio.h>
 #include <unistd.h>
 #include <getopt.h>
 
-#include <rbd/librbd.h>
+#include "include/rbd/librbd.h"
 
-static pthread_mutex_t readdir_lock;
+static int gotrados = 0;
+char *pool_name;
+rados_t cluster;
+rados_ioctx_t ioctx;
 
-void dump_stat(char *label, struct stat *st);
+static pthread_mutex_t readdir_lock;
 
-/* access to rbd */
-struct radosStat {
-       /* fill with    int rados_stat(rados_ioctx_t io, const char *o, uint64_t *psize, time_t *pmtime); */
-       u_char          valid;
-       char objname[RBD_MAX_BLOCK_NAME_SIZE];   /* object name */
-       uint64_t        size;       /* object size */
-       time_t      mtime;      /* modification time */
+struct rbd_stat {
+       u_char valid;
+       rbd_image_info_t rbd_info;
 };
 
-struct rbdStat {
-       u_char          valid;
-       /* filll with int rbd_stat(rbd_image_t image, rbd_image_info_t *info, size_t infosize); */
-       rbd_image_info_t        rbd_info;
+struct rbd_options {
+       char *ceph_config;
+       char *pool_name;
 };
 
-struct rbdOptions {
-       char    *ceph_config;           /* ceph configuration file */
-       char    *pool_name;                     /* name of the pool our test image is in */
-       char    *image_name;            /* name of rbd image */
+struct rbd_image {
+       char *image_name;
+       struct rbd_image *next;
 };
+struct rbd_image *rbd_images;
 
-rados_t cluster;
-
-struct radosPool {
-       char    *pool_name;                             /* name of the pool */
-       rados_t                 cluster;
-       rados_ioctx_t   ioctx;                  /* handle for our test pool */
-       struct radosStat rados_stat;
-       struct radosPool        *next;
+struct rbd_openimage {
+       char *image_name;
+       rbd_image_t image;
+       struct rbd_stat rbd_stat;
 };
+#define MAX_RBD_IMAGES         128
+struct rbd_openimage opentbl[MAX_RBD_IMAGES];
 
-struct rbdImage {
-       int             rbd_in_use;
-       struct radosPool *pool;
-       char    *image_name;
-       rbd_image_t     image;                  /* handle for our test image */
-       struct rbdStat   rbd_stat;
-       struct rbdImage *next;
-};
-
-struct rbdOptions rbdOptions;
-
-#define RADOS_MAX_STRING       1024
-#define MAX_RADOS_POOLS                4
-struct radosPool radosPools[MAX_RADOS_POOLS];
+struct rbd_options rbd_options = {"/etc/ceph/ceph.conf", "rbd"};
 
-#define MAX_RBD_IMAGES         16
+#define rbdsize(fd)    opentbl[fd].rbd_stat.rbd_info.size
+#define rbdblksize(fd) opentbl[fd].rbd_stat.rbd_info.obj_size
+#define rbdblkcnt(fd)  opentbl[fd].rbd_stat.rbd_info.num_objs
 
-struct rbdFd {
-       int fd;
-       struct rbdImage *rbd;
-};
+uint64_t imagesize = 1024ULL * 1024 * 1024;
+uint64_t imageorder = 22ULL;
+uint64_t imagefeatures = 1ULL;
 
-struct rbdFd rbdFds[MAX_RBD_IMAGES];
-struct rbdImage rbdImages[MAX_RBD_IMAGES];
+// Minimize calls to rbd_list: marks bracketing of opendir/<ops>/releasedir
+int in_opendir;
 
 /* prototypes */
-void radosPools_init(void);
-void rbdImages_init(void);
-void simple_err(const char *msg, int err);
-int connect_to_cluster();
-int count_rbd_devices() { return 1; }
+int connect_to_cluster(rados_t *pcluster);
+void enumerate_images(struct rbd_image **head);
+int open_rbd_image(const char *image_name);
+int find_openrbd(const char *path);
 
-int radosPool_start(rados_t cluster, char *pool_name);
-int lookup_radosPool(char *pool_name);
-int allocate_radosPool(rados_t cluster, char *pool_name);
-void deallocate_radosPool(int poolid) { return; }   // stub
-void radosPool_initImages(struct radosPool *pool);
+void simple_err(const char *msg, int err);
 
-int lookup_rbdImage(char *image_name);
-int allocate_rbdImage(struct radosPool *pool);
-void deallocate_rbdImage(int imageId);
+void
+enumerate_images(struct rbd_image **head)
+{
+       char *ibuf;
+       size_t ibuf_len;
+       struct rbd_image *im, *next;
+       char *ip;
+       int actual_len;
+
+       if (*head != NULL) {
+               for (im = *head; im != NULL;) {
+                       next = im->next;
+                       free(im);
+                       im = next;
+               }
+               *head = NULL;
+       }
 
-static int open_volume_dir(const char *volume)
-{
-       int dirfd;
-       dirfd = lookup_rbdImage((char *)volume);
+       ibuf_len = 1024;
+       ibuf = malloc(ibuf_len);
+       actual_len = rbd_list(ioctx, ibuf, &ibuf_len);
+       if (actual_len < 0) {
+               simple_err("rbd_list: error %d\n", actual_len);
+               return;
+       }
 
-       return dirfd;
+       fprintf(stderr, "pool %s: ", pool_name);
+       for (ip = ibuf; *ip != '\0' && ip < &ibuf[actual_len];
+            ip += strlen(ip) + 1)  {
+               fprintf(stderr, "%s, ", ip);
+               im = malloc(sizeof(*im));
+               im->image_name = ip;
+               im->next = *head;
+               *head = im;
+       }
+       fprintf(stderr, "\n");
+       return;
 }
 
-static void
-iter_volumes(void *cookie, void (*iter)(void *cookie, char *volume, int dirfd))
+int
+find_openrbd(const char *path)
 {
-       int fd;
-       DIR *dirp;
-       struct dirent entry;
-       struct dirent *result;
-       struct rbdImage *rbdList;
-
-       pthread_mutex_lock(&readdir_lock);
+       int i;
 
-       for (rbdList = &(rbdImages[0]); rbdList != NULL; rbdList = rbdList->next){
-               if (rbdList->image_name == NULL) {
-                       continue;
-               }
-               if (rbdList->rbd_stat.valid == 0) {
-                       continue;
+       /* find in opentbl[] entry if already open */
+       for (i = 0; i < MAX_RBD_IMAGES; i++) {
+               if ((opentbl[i].image_name != NULL) &&
+                   (strcmp(opentbl[i].image_name, path) == 0)) {
+                       return i;
+                       break;
                }
-
-               iter(cookie, rbdList->image_name, -1);
        }
-
-out:
-       pthread_mutex_unlock(&readdir_lock);
+       return -1;
 }
 
-static int read_property(int dirfd, char *name, unsigned int *_value)
+int
+open_rbd_image(const char *image_name)
 {
-       int fd, errors;
-       /* FILE *f; */
-       unsigned int value;
-       struct rbdImage *rbd;
-
+       struct rbd_image *im;
+       struct rbd_openimage *rbd;
+       int fd, i;
+       int ret;
 
-       errors = 0;
-       rbd = rbdFds[dirfd].rbd;
+       if (image_name == (char *)NULL) 
+               return -1;
 
-       if (rbd == (struct rbdImage *)NULL) {
-               errors++;
-       }
-       if (!errors) {
-               if (rbd->rbd_in_use == 0) {
-                       errors++;
-               }
-       }
-       if (!errors) {
-               if (strncmp(name, "obj_siz", 7) == 0) {
-                       value = rbd->rbd_stat.rbd_info.obj_size;
-               } else if (strncmp(name, "num_obj", 7) == 0) {
-                       value = rbd->rbd_stat.rbd_info.num_objs;
-               } else {
-                       errors++;
+       // relies on caller to keep rbd_images up to date
+       for (im = rbd_images; im != NULL; i++, im = im->next) {
+               if (strcmp(im->image_name, image_name) == 0) {
+                       break;
                }
        }
+       if (im == NULL)
+               return -1;
 
-       if (!errors) {
-               *_value = value;
-               return 0;
+       /* find in opentbl[] entry if already open */
+       if ((fd = find_openrbd(image_name)) != -1) {
+               rbd = &opentbl[fd];
        } else {
-               return -1;
+               // allocate an opentbl[] and open the image
+               for (i = 0; i < MAX_RBD_IMAGES; i++) {
+                       if (opentbl[i].image == NULL) {
+                               fd = i;
+                               rbd = &opentbl[fd];
+                               rbd->image_name = strdup(image_name);
+                               break;
+                       }
+               }
+               if (i == MAX_RBD_IMAGES)
+                       return -1;
+               ret = rbd_open(ioctx, rbd->image_name, &(rbd->image), NULL);
+               if (ret < 0) {
+                       simple_err("open_rbd_image: can't open: ", ret);
+                       return ret;
+               }
        }
+       rbd_stat(rbd->image, &(rbd->rbd_stat.rbd_info),
+                sizeof(rbd_image_info_t));
+       rbd->rbd_stat.valid = 1;
+       return fd;
 }
 
-static void count_volumes_cb(void *cookie, char *volume, int dirfd)
+static void
+iter_images(void *cookie,
+           void (*iter)(void *cookie, const char *image))
 {
-       (*((unsigned int *)cookie))++;
-}
+       struct rbd_image *im;
 
-static int count_volumes(void)
-{
-       unsigned int count = 0;
+       pthread_mutex_lock(&readdir_lock);
 
-       iter_volumes(&count, count_volumes_cb);
-       return count;
+       for (im = rbd_images; im != NULL; im = im->next)
+               iter(cookie, im->image_name);
+       pthread_mutex_unlock(&readdir_lock);
 }
 
-struct blockfs_getattr_info {
-       unsigned int part_size;
-       int part_size_mismatches;
-       struct timespec st_atim;
-       struct timespec st_mtim;
-       struct timespec st_ctim;
-};
-
-static int timespec_cmp(struct timespec *a, struct timespec *b)
+static void count_images_cb(void *cookie, const char *image)
 {
-       if (a->tv_sec > b->tv_sec)
-               return 1;
-
-       if (a->tv_sec < b->tv_sec)
-               return -1;
+       (*((unsigned int *)cookie))++;
+}
 
-       if (a->tv_nsec > b->tv_nsec)
-               return 1;
+static int count_images(void)
+{
+       unsigned int count = 0;
 
-       if (a->tv_nsec < b->tv_nsec)
-               return -1;
+       pthread_mutex_lock(&readdir_lock);
+       enumerate_images(&rbd_images);
+       pthread_mutex_unlock(&readdir_lock);
 
-       return 0;
+       iter_images(&count, count_images_cb);
+       return count;
 }
 
-static int blockfs_getattr(const char *path, struct stat *stbuf)
+static int rbdfs_getattr(const char *path, struct stat *stbuf)
 {
-       struct stat buf;
-       int dirfd;
+       int fd;
        time_t now;
-       unsigned int num_parts;
-       unsigned int part_size;
-       struct blockfs_getattr_info info;
+
+       if (!gotrados)
+               return -ENXIO;
 
        if (path[0] == 0)
                return -ENOENT;
@@ -225,11 +216,10 @@ static int blockfs_getattr(const char *path, struct stat *stbuf)
        memset(stbuf, 0, sizeof(struct stat));
 
        if (strcmp(path, "/") == 0) {
-               int ret;
 
                now = time(NULL);
                stbuf->st_mode = S_IFDIR + 0755;
-               stbuf->st_nlink = 2+count_rbd_devices();
+               stbuf->st_nlink = 2+count_images();
                stbuf->st_uid = getuid();
                stbuf->st_gid = getgid();
                stbuf->st_size = 1024;
@@ -239,164 +229,113 @@ static int blockfs_getattr(const char *path, struct stat *stbuf)
                stbuf->st_mtime = now;
                stbuf->st_ctime = now;
 
-               dump_stat("derived stat", stbuf);
-
                return 0;
        }
 
-       dirfd = open_volume_dir(path + 1);
-       if (dirfd < 0)
-               return -ENOENT;
-
-       if (read_property(dirfd, "num_objs", &num_parts) < 0) {
-               return -EINVAL;
-       }
-
-       if (read_property(dirfd, "obj_size", &part_size) < 0) {
-               return -EINVAL;
+       if (!in_opendir) {
+               pthread_mutex_lock(&readdir_lock);
+               enumerate_images(&rbd_images);
+               pthread_mutex_unlock(&readdir_lock);
        }
+       fd = open_rbd_image(path + 1);
+       if (fd < 0)
+               return -ENOENT;
 
        now = time(NULL);
        stbuf->st_mode = S_IFREG | 0666;
        stbuf->st_nlink = 1;
        stbuf->st_uid = getuid();
        stbuf->st_gid = getgid();
-       stbuf->st_size = (uint64_t)num_parts * part_size;
-       stbuf->st_blksize = part_size;
-       stbuf->st_blocks = ((uint64_t)num_parts * part_size + 511) / 512;
+       stbuf->st_size = rbdsize(fd);
+       stbuf->st_blksize = rbdblksize(fd);
+       stbuf->st_blocks = rbdblkcnt(fd);
        stbuf->st_atime = now;
        stbuf->st_mtime = now;
        stbuf->st_ctime = now;
 
-       dump_stat("derived stat", stbuf);
-
-       /* close(dirfd); */
-
        return 0;
 }
 
-static int blockfs_truncate(const char *path, off_t length)
-{
-       /* Silently fail.  */
-       return 0;
-}
 
-static int blockfs_open(const char *path, struct fuse_file_info *fi)
+static int rbdfs_open(const char *path, struct fuse_file_info *fi)
 {
-       int dirfd;
-       unsigned int num_parts;
-       unsigned int part_size;
-       int mode;
-       int i;
+       int fd;
+
+       if (!gotrados)
+               return -ENXIO;
 
        if (path[0] == 0)
                return -ENOENT;
 
-       dirfd = open_volume_dir(path + 1);
-       if (dirfd < 0)
+       pthread_mutex_lock(&readdir_lock);
+       enumerate_images(&rbd_images);
+       pthread_mutex_unlock(&readdir_lock);
+       fd = open_rbd_image(path + 1);
+       if (fd < 0)
                return -ENOENT;
 
-       if (read_property(dirfd, "num_objs", &num_parts) < 0) {
-               return -EINVAL;
-       }
-
-       if (read_property(dirfd, "obj_size", &part_size) < 0) {
-               return -EINVAL;
-       }
-
+       fi->fh = fd;
        return 0;
 }
 
-static int blockfs_read(const char *path, char *buf, size_t size,
+static int rbdfs_read(const char *path, char *buf, size_t size,
                        off_t offset, struct fuse_file_info *fi)
 {
-       int dirfd;
        size_t numread;
-       unsigned int num_parts;
-       unsigned int part_size;
-       struct rbdImage *rbd;
-
-       if (path[0] == 0)
-               return -ENOENT;
-
-       dirfd = open_volume_dir(path + 1);
-       if (dirfd < 0)
-               return -ENOENT;
+       struct rbd_openimage *rbd;
 
-       if (read_property(dirfd, "num_objs", &num_parts) < 0) {
-               return -EINVAL;
-       }
+       if (!gotrados)
+               return -ENXIO;
 
-       if (read_property(dirfd, "obj_size", &part_size) < 0) {
-               return -EINVAL;
-       }
-
-       /*
-       if (read_property(dirfd, "num_parts", &num_parts) < 0) {
-               close(dirfd);
-               return -EINVAL;
-       }
-
-       if (read_property(dirfd, "part_size", &part_size) < 0) {
-               close(dirfd);
-               return -EINVAL;
-       }
-       */
-       rbd = rbdFds[dirfd].rbd;
+       rbd = &opentbl[fi->fh];
        numread = 0;
        while (size > 0) {
                ssize_t ret;
 
-               /* ssize_t rbd_read(rbd_image_t image, uint64_t ofs, size_t len, char *buf); */
                ret = rbd_read(rbd->image, offset, size, buf);
 
+               if (ret <= 0)
+                       break;
                buf += ret;
                size -= ret;
                offset += ret;
                numread += ret;
        }
 
-       /* close(dirfd); */
-
        return numread;
 }
 
-static int blockfs_write(const char *path, const char *buf, size_t size,
+static int rbdfs_write(const char *path, const char *buf, size_t size,
                         off_t offset, struct fuse_file_info *fi)
 {
-       int dirfd;
        size_t numwritten;
-       unsigned int num_parts;
-       unsigned int part_size;
-       struct rbdImage *rbd;
+       struct rbd_openimage *rbd;
 
-       if (path[0] == 0)
-               return -ENOENT;
-
-       dirfd = open_volume_dir(path + 1);
-       if (dirfd < 0)
-               return -ENOENT;
+       if (!gotrados)
+               return -ENXIO;
 
-       if (read_property(dirfd, "num_objs", &num_parts) < 0) {
-               return -EINVAL;
-       }
-
-       if (read_property(dirfd, "obj_size", &part_size) < 0) {
-               return -EINVAL;
-       }
-
-       rbd = rbdFds[dirfd].rbd;
+       rbd = &opentbl[fi->fh];
        numwritten = 0;
        while (size > 0) {
                ssize_t ret;
 
-               /* ssize_t rbd_write(rbd_image_t image, uint64_t ofs, size_t len, const char *buf); */
+               if (offset + size > rbdsize(fi->fh)) {
+                       int r;
+                       fprintf(stderr, "rbdfs_write resizing %s to 0x%lx\n",
+                               path, offset+size);
+                       r = rbd_resize(rbd->image, offset+size);
+                       if (r < 0)
+                               return r;
+
+                       r = rbd_stat(rbd->image, &(rbd->rbd_stat.rbd_info),
+                                sizeof(rbd_image_info_t));
+                       if (r < 0)
+                               return r;
+               }
                ret = rbd_write(rbd->image, offset, size, buf);
-               if (ret < 0) {
-                       if (numwritten == 0)
-                               numwritten = ret;
+
+               if (ret < 0)
                        break;
-               }
                buf += ret;
                size -= ret;
                offset += ret;
@@ -406,34 +345,35 @@ static int blockfs_write(const char *path, const char *buf, size_t size,
        return numwritten;
 }
 
-static void blockfs_statfs_volume_cb(void *num, char *volume, int dirfd)
+static void rbdfs_statfs_image_cb(void *num, const char *image)
 {
-       unsigned int num_parts, part_size;
-       int     rbdfd;
+       int     fd;
 
        ((uint64_t *)num)[0]++;
 
-       rbdfd = lookup_rbdImage(volume);
-       if (rbdfd >= 0) {
-               if (read_property(dirfd, "num_objs", &num_parts) >= 0) {
-                       if (read_property(dirfd, "obj_size", &part_size) >= 0) {
-                               ((uint64_t *)num)[1] += (uint64_t)num_parts * (uint64_t)part_size;
-                       }
-               }
-       }
+       fd = open_rbd_image(image);
+       if (fd >= 0)
+               ((uint64_t *)num)[1] += rbdsize(fd);
 }
 
-static int blockfs_statfs(const char *path, struct statvfs *buf)
+static int rbdfs_statfs(const char *path, struct statvfs *buf)
 {
        uint64_t num[2];
 
+       if (!gotrados)
+               return -ENXIO;
+
        num[0] = 1;
        num[1] = 0;
-       iter_volumes(num, blockfs_statfs_volume_cb);
+       pthread_mutex_lock(&readdir_lock);
+       enumerate_images(&rbd_images);
+       pthread_mutex_unlock(&readdir_lock);
+       iter_images(num, rbdfs_statfs_image_cb);
 
-       buf->f_bsize = 4096;
-       buf->f_frsize = 4096;
-       buf->f_blocks = num[1] / 4096;
+#define        RBDFS_BSIZE     4096
+       buf->f_bsize = RBDFS_BSIZE;
+       buf->f_frsize = RBDFS_BSIZE;
+       buf->f_blocks = num[1] / RBDFS_BSIZE;
        buf->f_bfree = 0;
        buf->f_bavail = 0;
        buf->f_files = num[0];
@@ -446,51 +386,245 @@ static int blockfs_statfs(const char *path, struct statvfs *buf)
        return 0;
 }
 
-static int blockfs_fsync(const char *path, int datasync,
+static int rbdfs_fsync(const char *path, int datasync,
                         struct fuse_file_info *fi)
 {
-       /* FIXME.  */
-       sync();
+       if (!gotrados)
+               return -ENXIO;
+       rbd_flush(opentbl[fi->fh].image);
+       return 0;
+}
 
+static int rbdfs_opendir(const char *path, struct fuse_file_info *fi)
+{
+       // only one directory, so global "in_opendir" flag should be fine
+       pthread_mutex_lock(&readdir_lock);
+       in_opendir++;
+       enumerate_images(&rbd_images);
+       pthread_mutex_unlock(&readdir_lock);
        return 0;
 }
 
-struct blockfs_readdir_info {
+struct rbdfs_readdir_info {
        void *buf;
        fuse_fill_dir_t filler;
 };
 
-static void blockfs_readdir_cb(void *_info, char *name, int dirfd)
+static void rbdfs_readdir_cb(void *_info, const char *name)
 {
-       struct blockfs_readdir_info *info = _info;
+       struct rbdfs_readdir_info *info = _info;
 
        info->filler(info->buf, name, NULL, 0);
 }
 
-static int blockfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler,
+static int rbdfs_readdir(const char *path, void *buf, fuse_fill_dir_t filler,
                           off_t offset, struct fuse_file_info *fi)
 {
-       struct blockfs_readdir_info info = { buf, filler };
+       struct rbdfs_readdir_info info = { buf, filler };
+
+       if (!gotrados)
+               return -ENXIO;
+       if (!in_opendir)
+               fprintf(stderr, "in readdir, but not inside opendir?\n");
 
        if (strcmp(path, "/") != 0)
                return -ENOENT;
 
        filler(buf, ".", NULL, 0);
        filler(buf, "..", NULL, 0);
-       iter_volumes(&info, blockfs_readdir_cb);
+       iter_images(&info, rbdfs_readdir_cb);
+
+       return 0;
+}
+static int rbdfs_releasedir(const char *path, struct fuse_file_info *fi)
+{
+       // see opendir comments
+       pthread_mutex_lock(&readdir_lock);
+       in_opendir--;
+       pthread_mutex_unlock(&readdir_lock);
+       return 0;
+}
+
+void *
+rbdfs_init(struct fuse_conn_info *conn)
+{
+       int ret;
+
+       // init cannot fail, so if we fail here, gotrados remains at 0,
+       // causing other operations to fail immediately with ENXIO
 
+       ret = connect_to_cluster(&cluster);
+       if (ret < 0)
+               exit(90);
+
+       pool_name = rbd_options.pool_name;
+       ret = rados_ioctx_create(cluster, pool_name, &ioctx);
+       if (ret < 0)
+               exit(91);
+
+       conn->want |= FUSE_CAP_BIG_WRITES;
+       gotrados = 1;
+
+       // init's return value shows up in fuse_context.private_data,
+       // also to void (*destroy)(void *); useful?
+       return NULL;
+}
+
+// return -errno on error.  fi->fh is not set until open time
+
+int
+rbdfs_create(const char *path, mode_t mode, struct fuse_file_info *fi)
+{
+       int r;
+       int order = imageorder;
+
+       r = rbd_create2(ioctx, path+1, imagesize, imagefeatures, &order);
+       return r;
+}
+
+int
+rbdfs_utime(const char *path, struct utimbuf *utime)
+{
+       // called on create; not relevant
        return 0;
 }
 
-static struct fuse_operations blockfs_oper = {
-       .getattr        = blockfs_getattr,
-       .truncate       = blockfs_truncate,
-       .open           = blockfs_open,
-       .read           = blockfs_read,
-       .write          = blockfs_write,
-       .statfs         = blockfs_statfs,
-       .fsync          = blockfs_fsync,
-       .readdir        = blockfs_readdir,
+int
+rbdfs_unlink(const char *path)
+{
+       int fd = find_openrbd(path);
+       if (fd != -1) {
+               struct rbd_openimage *rbd = &opentbl[fd];
+               rbd_close(rbd->image);
+               rbd->image = 0;
+               free(rbd->image_name);
+               rbd->rbd_stat.valid = 0;
+       }
+       return rbd_remove(ioctx, path+1);
+}
+
+
+int
+rbdfs_truncate(const char *path, off_t size)
+{
+       int fd;
+       int r;
+       struct rbd_openimage *rbd;
+
+       if ((fd = open_rbd_image(path+1)) < 0)
+               return -ENOENT;
+
+       rbd = &opentbl[fd];
+       fprintf(stderr, "truncate %s to %ld (0x%lx)\n", path, size, size);
+       r = rbd_resize(rbd->image, size);
+       if (r < 0)
+               return r;
+
+       r = rbd_stat(rbd->image, &(rbd->rbd_stat.rbd_info),
+                sizeof(rbd_image_info_t));
+       if (r < 0)
+               return r;
+       return 0;
+}
+
+/**
+ * set an xattr on path, with name/value, length size.
+ * Presumably flags are from Linux, as in XATTR_CREATE or 
+ * XATTR_REPLACE (both "set", but fail if exist vs fail if not exist.
+ *
+ * We accept xattrs only on the root node.
+ *
+ * All values converted with strtoull, so can be expressed in any base
+ */
+
+struct rbdfuse_attr {
+       char *attrname;
+       uint64_t *attrvalp;
+} attrs[] = {
+       { "user.rbdfuse.imagesize", &imagesize },
+       { "user.rbdfuse.imageorder", &imageorder },
+       { "user.rbdfuse.imagefeatures", &imagefeatures },
+       { NULL }
+};
+
+int
+rbdfs_setxattr(const char *path, const char *name, const char *value,
+                size_t size, int flags)
+{
+       struct rbdfuse_attr *ap;
+       if (strcmp(path, "/") != 0)
+               return -EINVAL;
+
+       for (ap = attrs; ap->attrname != NULL; ap++) {
+               if (strcmp(name, ap->attrname) == 0) {
+                       *ap->attrvalp = strtoull(value, NULL, 0);
+                       fprintf(stderr, "rbd-fuse: %s set to 0x%lx\n",
+                               ap->attrname, *ap->attrvalp);
+                       return 0;
+               }
+       }
+       return -EINVAL;
+}
+
+int
+rbdfs_getxattr(const char *path, const char *name, char *value,
+                size_t size)
+{
+       struct rbdfuse_attr *ap;
+       char buf[128];
+       // allow gets on other files; ls likes to ask for things like
+       // security.*
+
+       for (ap = attrs; ap->attrname != NULL; ap++) {
+               if (strcmp(name, ap->attrname) == 0) {
+                       sprintf(buf, "%lu", *ap->attrvalp);
+                       if (value != NULL && size >= strlen(buf))
+                               strcpy(value, buf);
+                       fprintf(stderr, "rbd-fuse: get %s\n", ap->attrname);
+                       return (strlen(buf));
+               }
+       }
+       return 0;
+}
+
+int
+rbdfs_listxattr(const char *path, char *list, size_t len)
+{
+       struct rbdfuse_attr *ap;
+       int required_len = 0;
+
+       if (strcmp(path, "/") != 0)
+               return -EINVAL;
+
+       for (ap = attrs; ap->attrname != NULL; ap++)
+               required_len += strlen(ap->attrname) + 1;
+       if (len >= required_len) {
+               for (ap = attrs; ap->attrname != NULL; ap++) {
+                       sprintf(list, "%s", ap->attrname);
+                       list += strlen(ap->attrname) + 1;
+               }
+       }
+       return required_len;
+}
+
+static struct fuse_operations rbdfs_oper = {
+       .create         = rbdfs_create,
+       .fsync          = rbdfs_fsync,
+       .getattr        = rbdfs_getattr,
+       .getxattr       = rbdfs_getxattr,
+       .init           = rbdfs_init,
+       .listxattr      = rbdfs_listxattr,
+       .open           = rbdfs_open,
+       .opendir        = rbdfs_opendir,
+       .read           = rbdfs_read,
+       .readdir        = rbdfs_readdir,
+       .releasedir     = rbdfs_releasedir,
+       .setxattr       = rbdfs_setxattr,
+       .statfs         = rbdfs_statfs,
+       .truncate       = rbdfs_truncate,
+       .unlink         = rbdfs_unlink,
+       .utime          = rbdfs_utime,
+       .write          = rbdfs_write,
 };
 
 enum {
@@ -502,21 +636,23 @@ enum {
        KEY_RADOS_POOLNAME_LONG
 };
 
-static struct fuse_opt blockfs_opts[] = {
-               FUSE_OPT_KEY("-h",                KEY_HELP),
-               FUSE_OPT_KEY("--help",    KEY_HELP),
-               FUSE_OPT_KEY("-V",                KEY_VERSION),
-               FUSE_OPT_KEY("--version", KEY_VERSION),
-               {"-c %s",           offsetof(struct rbdOptions, ceph_config), KEY_CEPH_CONFIG},
-               {"--configfile=%s", offsetof(struct rbdOptions, ceph_config), KEY_CEPH_CONFIG_LONG},
-               {"-p %s",               offsetof(struct rbdOptions, pool_name),   KEY_RADOS_POOLNAME},
-               {"--poolname=%s",   offsetof(struct rbdOptions, pool_name),   KEY_RADOS_POOLNAME_LONG},
+static struct fuse_opt rbdfs_opts[] = {
+       FUSE_OPT_KEY("-h", KEY_HELP),
+       FUSE_OPT_KEY("--help", KEY_HELP),
+       FUSE_OPT_KEY("-V", KEY_VERSION),
+       FUSE_OPT_KEY("--version", KEY_VERSION),
+       {"-c %s", offsetof(struct rbd_options, ceph_config), KEY_CEPH_CONFIG},
+       {"--configfile=%s", offsetof(struct rbd_options, ceph_config),
+        KEY_CEPH_CONFIG_LONG},
+       {"-p %s", offsetof(struct rbd_options, pool_name), KEY_RADOS_POOLNAME},
+       {"--poolname=%s", offsetof(struct rbd_options, pool_name),
+        KEY_RADOS_POOLNAME_LONG},
 };
 
 static void usage(const char *progname)
 {
        fprintf(stderr,
-"Usage: %s backingdir mountpoint [options]\n"
+"Usage: %s mountpoint [options]\n"
 "\n"
 "General options:\n"
 "    -h   --help            print help\n"
@@ -526,45 +662,37 @@ static void usage(const char *progname)
 "\n", progname);
 }
 
-static int blockfs_opt_proc(void *data, const char *arg, int key,
+static int rbdfs_opt_proc(void *data, const char *arg, int key,
                            struct fuse_args *outargs)
 {
-       if (key == FUSE_OPT_KEY_NONOPT) {
-               if (rbdOptions.image_name == NULL) {
-                       rbdOptions.image_name = strdup(arg);
-                       return 0;
-               }
-               return 1;
-       }
-
        if (key == KEY_HELP) {
                usage(outargs->argv[0]);
                fuse_opt_add_arg(outargs, "-ho");
-               fuse_main(outargs->argc, outargs->argv, &blockfs_oper, NULL);
+               fuse_main(outargs->argc, outargs->argv, &rbdfs_oper, NULL);
                exit(1);
        }
 
        if (key == KEY_VERSION) {
                fuse_opt_add_arg(outargs, "--version");
-               fuse_main(outargs->argc, outargs->argv, &blockfs_oper, NULL);
+               fuse_main(outargs->argc, outargs->argv, &rbdfs_oper, NULL);
                exit(0);
        }
 
        if (key == KEY_CEPH_CONFIG) {
-               if (rbdOptions.ceph_config != NULL) {
-                       free(rbdOptions.ceph_config);
-                       rbdOptions.ceph_config = NULL;
+               if (rbd_options.ceph_config != NULL) {
+                       free(rbd_options.ceph_config);
+                       rbd_options.ceph_config = NULL;
                }
-               rbdOptions.ceph_config = strdup(arg+2);
+               rbd_options.ceph_config = strdup(arg+2);
                return 0;
        }
 
        if (key == KEY_RADOS_POOLNAME) {
-               if (rbdOptions.pool_name != NULL) {
-                       free(rbdOptions.pool_name);
-                       rbdOptions.pool_name = NULL;
+               if (rbd_options.pool_name != NULL) {
+                       free(rbd_options.pool_name);
+                       rbd_options.pool_name = NULL;
                }
-               rbdOptions.pool_name = strdup(arg+2);
+               rbd_options.pool_name = strdup(arg+2);
                return 0;
        }
 
@@ -574,29 +702,27 @@ static int blockfs_opt_proc(void *data, const char *arg, int key,
 void
 simple_err(const char *msg, int err)
 {
-    fprintf(stderr, "%s: %s", msg, strerror(-err));
+    fprintf(stderr, "%s: %s\n", msg, strerror(-err));
     return;
 }
 
 int
-connect_to_cluster()
+connect_to_cluster(rados_t *pcluster)
 {
        int r;
-       int order = 0;
-
 
-       r = rados_create(&(cluster), NULL);
+       r = rados_create(pcluster, NULL);
        if (r < 0) {
                simple_err("Could not create cluster handle", r);
                return r;
        }
-       rados_conf_parse_env(cluster, NULL);
-       r = rados_conf_read_file(cluster, rbdOptions.ceph_config);
+       rados_conf_parse_env(*pcluster, NULL);
+       r = rados_conf_read_file(*pcluster, rbd_options.ceph_config);
        if (r < 0) {
-               simple_err("Error reading ceph config file", r);
+               simple_err("Error reading Ceph config file", r);
                goto failed_shutdown;
        }
-       r = rados_connect(cluster);
+       r = rados_connect(*pcluster);
        if (r < 0) {
                simple_err("Error connecting to cluster", r);
                goto failed_shutdown;
@@ -605,304 +731,20 @@ connect_to_cluster()
        return 0;
 
 failed_shutdown:
-       rados_shutdown(cluster);
+       rados_shutdown(*pcluster);
        return r;
 }
 
-int
-radosPool_start(rados_t cluster, char *pool_name)
-{
-       int r = 0;
-       int poolfd;
-       struct radosPool *pool;
-       struct radosStat *sbuf;
-
-       poolfd = lookup_radosPool(pool_name);
-       pool = NULL;
-
-       if (poolfd >= 0) {
-               simple_err("Pool already started", poolfd);
-               poolfd = -EEXIST;
-       } else {
-               poolfd = allocate_radosPool(cluster, pool_name);
-               if (poolfd >= 0) {
-                       pool = &(radosPools[poolfd]);
-               } else {
-                       poolfd = -ENOSPC;
-               }
-       }
-
-       if (pool != (struct radosPool *)NULL) {
-               r = rados_ioctx_create(pool->cluster, pool_name, &(pool->ioctx));
-               if (r < 0) {
-                       simple_err("Error creating ioctx", r);
-                       poolfd = r;
-               } else {
-                       sbuf = &(pool->rados_stat);
-                       rados_stat(pool->ioctx, &(sbuf->objname[0]), &(sbuf->size), &(sbuf->mtime));
-                       pool->rados_stat.valid = 1;
-               }
-
-       }
-       return poolfd;
-}
-
-void
-rbdOptions_init(void)
-{
-       rbdOptions.ceph_config = strdup("/etc/ceph/ceph.conf");
-       rbdOptions.pool_name = strdup("rbd");
-       return;
-}
-
-void
-radosPools_init(void)
-{
-       int i;
-
-       for (i = 0; i < MAX_RADOS_POOLS; i++) {
-               radosPools[i].pool_name = NULL;
-               radosPools[i].rados_stat.valid = 0;
-               if ((i+1) < MAX_RADOS_POOLS) {
-                       radosPools[i].next = &(radosPools[i+1]);
-               } else {
-                       radosPools[i].next = (struct radosPool *)NULL;
-               }
-       }
-       return;
-}
-
-void
-radosPool_initImages(struct radosPool *pool)
-{
-       char *imagenames;
-       char *iname, *p, *fullname;
-       int  name_len, ret;
-       size_t  expected_size;
-       int  rbd_fd;
-       struct rbdImage *rbd_image;
-       struct rbdStat  *sbuf;
-
-
-       expected_size = 0;
-       rbd_list(pool->ioctx, imagenames, &expected_size);
-       imagenames = (char *)malloc(expected_size);
-       if (imagenames == (char *) NULL) {
-               return;
-       }
-       rbd_list(pool->ioctx, imagenames, &expected_size);
-       p = imagenames;
-       while (p < &(imagenames[expected_size])) {
-               iname = p;
-               if (strlen(iname) == 0) break;
-               rbd_fd = allocate_rbdImage(pool);
-               if (rbd_fd >= 0) {
-                       rbd_image = rbdFds[rbd_fd].rbd;
-                       name_len = strlen(iname) + 1;
-                       fullname = malloc(name_len);
-                       snprintf(fullname, name_len, "%s", iname);
-                       rbd_image->image_name = fullname;
-                       ret = rbd_open(pool->ioctx, rbd_image->image_name, &(rbd_image->image), NULL);
-                       if (ret < 0) {
-                               simple_err("radosPool_initImages: error opening image", ret);
-                               deallocate_rbdImage(rbd_fd);
-                       } else {
-                               sbuf = &(rbd_image->rbd_stat);
-                               rbd_stat(rbd_image->image, &(sbuf->rbd_info), sizeof(rbd_image_info_t));
-                               sbuf->valid = 1;
-
-                               fprintf(stderr, "DEBUG(andy): rbd_stat (prefix %s, parent_poolid %d, parent_name %s)\n", sbuf->rbd_info.block_name_prefix, sbuf->rbd_info.parent_pool, sbuf->rbd_info.parent_name ? sbuf->rbd_info.parent_name : "NULL");
-                       }
-               } else {
-                       simple_err("radosPool_initImages: failed to allocate rbd Image", rbd_fd);
-                       break;
-               }
-               p += strlen(iname) + 1;
-       }
-       free(imagenames);
-       return;
-}
-
-void
-rbdImages_init(void)
-{
-       int i;
-
-       for (i = 0; i < MAX_RBD_IMAGES; i++) {
-               rbdFds[i].fd = -1;
-               rbdFds[i].rbd = (struct rbdImage *)NULL;
-               rbdImages[i].rbd_stat.valid = 0;
-               if (i+1 < MAX_RBD_IMAGES) {
-                       rbdImages[i].next = &(rbdImages[i+1]);
-               } else {
-                       rbdImages[i].next = (struct rbdImage *)NULL;
-               }
-               rbdImages[i].rbd_in_use = 0;
-       }
-       return;
-}
-
-int
-lookup_radosPool(char *pool_name)
-{
-       int i, poolfd;
-
-       poolfd = -1;
-
-       for (i = 0; i < MAX_RADOS_POOLS; i++) {
-               if (radosPools[i].pool_name == NULL) {
-                       continue;
-               }
-               if (strcmp(pool_name, radosPools[i].pool_name) == 0) {
-                       poolfd = i;
-                       break;
-               }
-       }
-
-       return poolfd;
-}
-
-int
-allocate_radosPool(rados_t cluster, char *pool_name)
-{
-       int i, poolfd, errors;
-
-       poolfd = -1;
-       errors = 0;
-
-       if (lookup_radosPool(pool_name) < 0) {
-               for (i = 0; i < MAX_RADOS_POOLS; i++) {
-                       if (radosPools[i].pool_name == NULL) {
-                               radosPools[i].pool_name = strdup(pool_name);
-                               radosPools[i].cluster = cluster;
-                               poolfd = i;
-                               break;
-                       }
-               }
-       }
-
-       return poolfd;
-}
-
-int lookup_rbdImage(char *image_name)
-{
-       int fd, errors;
-       struct rbdImage *rbd;
-
-       errors = 0;
-       fd = -1;
-       if (image_name == (char *)NULL) {
-               errors++;
-       }
-
-       if (!errors) {
-               for (fd = 0; fd < MAX_RBD_IMAGES; fd++) {
-                       rbd = rbdFds[fd].rbd;
-                       if (rbd == NULL) {
-                               continue;
-                       }
-                       if (rbd->rbd_in_use == 0) {
-                               continue;
-                       }
-                       if (rbd->image_name == (char *)NULL)
-                       {
-                               continue;
-                       }
-                       if (strcmp(rbd->image_name, image_name) == 0) {
-                               break;
-                       }
-               }
-               if (fd >= MAX_RBD_IMAGES) {
-                       fd = -1;
-               }
-       }
-       return fd;
-}
-
-int
-allocate_rbdImage(struct radosPool *pool)
-{
-       int fd;
-
-       for (fd = 0; fd < MAX_RBD_IMAGES; fd++) {
-               if (rbdFds[fd].fd == -1) {
-                       rbdFds[fd].fd = fd;
-                       rbdFds[fd].rbd = &(rbdImages[fd]);
-                       rbdImages[fd].rbd_in_use = 1;
-                       rbdImages[fd].pool = pool;
-                       break;
-               }
-       }
-
-       if (fd >= MAX_RBD_IMAGES) {
-               return -1;
-       } else {
-               return fd;
-       }
-}
-
-void
-deallocate_rbdImage(int imageId)
-{
-       struct rbdImage *rbd;
-       if (imageId < 0 || imageId >= MAX_RBD_IMAGES) {
-               return;
-       }
-       rbdFds[imageId].fd = -1;
-       rbd = rbdFds[imageId].rbd;
-       rbdFds[imageId].rbd= (struct rbdImage *)NULL;
-
-       if (rbd->image_name != NULL) {
-               free(rbd->image_name);
-               rbd->image_name = NULL;
-       }
-       rbd->rbd_in_use = 0;
-
-       return;
-}
-
-void
-dump_stat(char *label, struct stat *st)
-{
-       if (st == NULL) return;
-       // fprintf(stderr, "dumpstat for '%s'\n", label ? label : "unknown");
-       // fprintf(stderr, "\tdev %d ino %d mode %d\n", st->st_dev, st->st_ino, st->st_mode);
-       // fprintf(stderr, "\tlinks %d uid %d gid %d\n", st->st_nlink, st->st_uid, st->st_gid);
-       // fprintf(stderr, "\tspecial dev %ld size %d blksize %d blkcnt %d\n", (uint64_t)(st->st_rdev), st->st_size, st->st_blksize, st->st_blocks);
-       // fprintf(stderr, "\tatime %ld mtime %ld ctime %ld\n", (uint64_t)(st->st_atime), (uint64_t)(st->st_mtime), (uint64_t)(st->st_ctime));
-   
-       return;
-}
-
 int main(int argc, char *argv[])
 {
        struct fuse_args args = FUSE_ARGS_INIT(argc, argv);
-       int ret, poolfd;
-       struct radosPool *rados_pool;
 
-       rbdOptions_init();
-       radosPools_init();
-       rbdImages_init();
-
-       if (fuse_opt_parse(&args, &rbdOptions, blockfs_opts, blockfs_opt_proc) == -1)
+       if (fuse_opt_parse(&args, &rbd_options, rbdfs_opts, rbdfs_opt_proc)
+           == -1) {
                exit(1);
-
-       ret = connect_to_cluster();
-       if (ret < 0) {
-               simple_err("Error accessing ceph cluster", ret);
-               exit(90);
        }
 
-       poolfd = radosPool_start(cluster, rbdOptions.pool_name);
-       if (poolfd < 0) {
-               simple_err("error starting pool", poolfd);
-               exit(91);
-       }
-
-       rados_pool = &(radosPools[poolfd]);
-       radosPool_initImages(rados_pool);
-
        pthread_mutex_init(&readdir_lock, NULL);
 
-       return fuse_main(args.argc, args.argv, &blockfs_oper, NULL);
+       return fuse_main(args.argc, args.argv, &rbdfs_oper, NULL);
 }