]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
kclient: distinguish file vs dir xattrs; move the getvxattr_cb under i_lock
authorSage Weil <sage@newdream.net>
Mon, 28 Sep 2009 16:37:28 +0000 (09:37 -0700)
committerSage Weil <sage@newdream.net>
Mon, 28 Sep 2009 17:09:25 +0000 (10:09 -0700)
src/Makefile.am
src/kernel/Makefile
src/kernel/inode.c
src/kernel/super.h
src/kernel/xattr.c [new file with mode: 0644]

index 0a14b89469bdc7e46d6b413e2ff6649497bbcb32..bdf10bbd96b3570039d6b8c0ada6249d6ae8744b 100644 (file)
@@ -505,6 +505,7 @@ noinst_HEADERS = \
        kernel/super.c\
        kernel/super.h\
        kernel/types.h\
+       kernel/xattr.c\
        mds/locks.c\
        mds/locks.h\
         mds/Anchor.h\
index 80c16b58ed68dd25f68428361a39c2bcb552553d..a15e8a95566fd80a8bfdef9df153f2d1a952da63 100644 (file)
@@ -7,7 +7,7 @@ ifneq ($(KERNELRELEASE),)
 obj-$(CONFIG_CEPH_FS) += ceph.o
 
 ceph-objs := super.o inode.o dir.o file.o addr.o ioctl.o \
-       export.o caps.o snap.o \
+       export.o caps.o snap.o xattr.o \
        messenger.o msgpool.o \
        mds_client.o mdsmap.o \
        mon_client.o \
index ee7ee57947352a351ae1327e0f97c0f8daad7e6c..91c5039a5ff203a6e9f4138232b3359cd8fb068d 100644 (file)
@@ -29,7 +29,6 @@
 static const struct inode_operations ceph_symlink_iops;
 
 static void ceph_inode_invalidate_pages(struct work_struct *work);
-static void __destroy_xattrs(struct ceph_inode_info *ci);
 
 /*
  * find or create an inode, given the ceph ino number
@@ -360,7 +359,7 @@ void ceph_destroy_inode(struct inode *inode)
                kfree(frag);
        }
 
-       __destroy_xattrs(ci);
+       __ceph_destroy_xattrs(ci);
        ceph_buffer_put(ci->i_xattrs.blob);
        ceph_buffer_put(ci->i_xattrs.prealloc_blob);
 
@@ -1631,780 +1630,3 @@ int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
        }
        return err;
 }
-
-/*
- * (virtual) xattrs
- *
- * These define virtual xattrs exposing the recursive directory statistics.
- */
-struct _ceph_vir_xattr_cb {
-       char *name;
-       size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
-                             size_t size);
-};
-
-static size_t _ceph_vir_xattrcb_entries(struct ceph_inode_info *ci, char *val,
-                                       size_t size)
-{
-       return snprintf(val, size, "%lld", ci->i_files + ci->i_subdirs);
-}
-
-static size_t _ceph_vir_xattrcb_files(struct ceph_inode_info *ci, char *val,
-                                     size_t size)
-{
-       return snprintf(val, size, "%lld", ci->i_files);
-}
-
-static size_t _ceph_vir_xattrcb_subdirs(struct ceph_inode_info *ci, char *val,
-                                       size_t size)
-{
-       return snprintf(val, size, "%lld", ci->i_subdirs);
-}
-
-static size_t _ceph_vir_xattrcb_rentries(struct ceph_inode_info *ci, char *val,
-                                        size_t size)
-{
-       return snprintf(val, size, "%lld", ci->i_rfiles + ci->i_rsubdirs);
-}
-
-static size_t _ceph_vir_xattrcb_rfiles(struct ceph_inode_info *ci, char *val,
-                                      size_t size)
-{
-       return snprintf(val, size, "%lld", ci->i_rfiles);
-}
-
-static size_t _ceph_vir_xattrcb_rsubdirs(struct ceph_inode_info *ci, char *val,
-                                        size_t size)
-{
-       return snprintf(val, size, "%lld", ci->i_rsubdirs);
-}
-
-static size_t _ceph_vir_xattrcb_rbytes(struct ceph_inode_info *ci, char *val,
-                                      size_t size)
-{
-       return snprintf(val, size, "%lld", ci->i_rbytes);
-}
-
-static size_t _ceph_vir_xattrcb_rctime(struct ceph_inode_info *ci, char *val,
-                                      size_t size)
-{
-       return snprintf(val, size, "%ld.%ld", (long)ci->i_rctime.tv_sec,
-                       (long)ci->i_rctime.tv_nsec);
-}
-
-static struct _ceph_vir_xattr_cb _ceph_vir_xattr_recs[] = {
-       { "user.ceph.dir.entries", _ceph_vir_xattrcb_entries},
-       { "user.ceph.dir.files", _ceph_vir_xattrcb_files},
-       { "user.ceph.dir.subdirs", _ceph_vir_xattrcb_subdirs},
-       { "user.ceph.dir.rentries", _ceph_vir_xattrcb_rentries},
-       { "user.ceph.dir.rfiles", _ceph_vir_xattrcb_rfiles},
-       { "user.ceph.dir.rsubdirs", _ceph_vir_xattrcb_rsubdirs},
-       { "user.ceph.dir.rbytes", _ceph_vir_xattrcb_rbytes},
-       { "user.ceph.dir.rctime", _ceph_vir_xattrcb_rctime},
-       { NULL, NULL }
-};
-
-static struct _ceph_vir_xattr_cb *_ceph_match_vir_xattr(const char *name)
-{
-       struct _ceph_vir_xattr_cb *xattr_rec = _ceph_vir_xattr_recs;
-
-       do {
-               if (strcmp(xattr_rec->name, name) == 0)
-                       return xattr_rec;
-               xattr_rec++;
-       } while (xattr_rec->name);
-
-       return NULL;
-}
-
-static int __set_xattr(struct ceph_inode_info *ci,
-                          const char *name, int name_len,
-                          const char *val, int val_len,
-                          int dirty,
-                          int should_free_name, int should_free_val,
-                          struct ceph_inode_xattr **newxattr)
-{
-       struct rb_node **p;
-       struct rb_node *parent = NULL;
-       struct ceph_inode_xattr *xattr = NULL;
-       int c;
-       int new = 0;
-
-       p = &ci->i_xattrs.index.rb_node;
-       while (*p) {
-               parent = *p;
-               xattr = rb_entry(parent, struct ceph_inode_xattr, node);
-               c = strncmp(name, xattr->name, min(name_len, xattr->name_len));
-               if (c < 0)
-                       p = &(*p)->rb_left;
-               else if (c > 0)
-                       p = &(*p)->rb_right;
-               else {
-                       if (name_len == xattr->name_len)
-                               break;
-                       else if (name_len < xattr->name_len)
-                               p = &(*p)->rb_left;
-                       else
-                               p = &(*p)->rb_right;
-               }
-               xattr = NULL;
-       }
-
-       if (!xattr) {
-               new = 1;
-               xattr = *newxattr;
-               xattr->name = name;
-               xattr->name_len = name_len;
-               xattr->should_free_name = should_free_name;
-
-               ci->i_xattrs.count++;
-               dout("__set_xattr count=%d\n", ci->i_xattrs.count);
-       } else {
-               kfree(*newxattr);
-               *newxattr = NULL;
-               if (xattr->should_free_val)
-                       kfree((void *)xattr->val);
-
-               if (should_free_name) {
-                       kfree((void *)name);
-                       name = xattr->name;
-               }
-               ci->i_xattrs.names_size -= xattr->name_len;
-               ci->i_xattrs.vals_size -= xattr->val_len;
-       }
-       if (!xattr) {
-               pr_err("__set_xattr ENOMEM on %p %llx.%llx xattr %s=%s\n",
-                      &ci->vfs_inode, ceph_vinop(&ci->vfs_inode), name,
-                      xattr->val);
-               return -ENOMEM;
-       }
-       ci->i_xattrs.names_size += name_len;
-       ci->i_xattrs.vals_size += val_len;
-       if (val)
-               xattr->val = val;
-       else
-               xattr->val = "";
-
-       xattr->val_len = val_len;
-       xattr->dirty = dirty;
-       xattr->should_free_val = (val && should_free_val);
-
-       if (new) {
-               rb_link_node(&xattr->node, parent, p);
-               rb_insert_color(&xattr->node, &ci->i_xattrs.index);
-               dout("__set_xattr_val p=%p\n", p);
-       }
-
-       dout("__set_xattr_val added %llx.%llx xattr %p %s=%.*s\n",
-            ceph_vinop(&ci->vfs_inode), xattr, name, val_len, val);
-
-       return 0;
-}
-
-static struct ceph_inode_xattr *__get_xattr(struct ceph_inode_info *ci,
-                          const char *name)
-{
-       struct rb_node **p;
-       struct rb_node *parent = NULL;
-       struct ceph_inode_xattr *xattr = NULL;
-       int c;
-
-       p = &ci->i_xattrs.index.rb_node;
-       while (*p) {
-               parent = *p;
-               xattr = rb_entry(parent, struct ceph_inode_xattr, node);
-               c = strncmp(name, xattr->name, xattr->name_len);
-               if (c < 0)
-                       p = &(*p)->rb_left;
-               else if (c > 0)
-                       p = &(*p)->rb_right;
-               else {
-                       dout("__get_xattr %s: found %.*s\n", name,
-                            xattr->val_len, xattr->val);
-                       return xattr;
-               }
-       }
-
-       dout("__get_xattr %s: not found\n", name);
-
-       return NULL;
-}
-
-static void __free_xattr(struct ceph_inode_xattr *xattr)
-{
-       BUG_ON(!xattr);
-
-       if (xattr->should_free_name)
-               kfree((void *)xattr->name);
-       if (xattr->should_free_val)
-               kfree((void *)xattr->val);
-
-       kfree(xattr);
-}
-
-static int __remove_xattr(struct ceph_inode_info *ci,
-                         struct ceph_inode_xattr *xattr)
-{
-       if (!xattr)
-               return -EOPNOTSUPP;
-
-       rb_erase(&xattr->node, &ci->i_xattrs.index);
-
-       if (xattr->should_free_name)
-               kfree((void *)xattr->name);
-       if (xattr->should_free_val)
-               kfree((void *)xattr->val);
-
-       ci->i_xattrs.names_size -= xattr->name_len;
-       ci->i_xattrs.vals_size -= xattr->val_len;
-       ci->i_xattrs.count--;
-       kfree(xattr);
-
-       return 0;
-}
-
-static int __remove_xattr_by_name(struct ceph_inode_info *ci,
-                          const char *name)
-{
-       struct rb_node **p;
-       struct ceph_inode_xattr *xattr;
-       int err;
-
-       p = &ci->i_xattrs.index.rb_node;
-       xattr = __get_xattr(ci, name);
-       err = __remove_xattr(ci, xattr);
-       return err;
-}
-
-static char *__copy_xattr_names(struct ceph_inode_info *ci,
-                               char *dest)
-{
-       struct rb_node *p;
-       struct ceph_inode_xattr *xattr = NULL;
-
-       p = rb_first(&ci->i_xattrs.index);
-       dout("__copy_xattr_names count=%d\n", ci->i_xattrs.count);
-
-       while (p) {
-               xattr = rb_entry(p, struct ceph_inode_xattr, node);
-               memcpy(dest, xattr->name, xattr->name_len);
-               dest[xattr->name_len] = '\0';
-
-               dout("dest=%s %p (%s) (%d/%d)\n", dest, xattr, xattr->name,
-                    xattr->name_len, ci->i_xattrs.names_size);
-
-               dest += xattr->name_len + 1;
-               p = rb_next(p);
-       }
-
-       return dest;
-}
-
-static void __destroy_xattrs(struct ceph_inode_info *ci)
-{
-       struct rb_node *p, *tmp;
-       struct ceph_inode_xattr *xattr = NULL;
-
-       p = rb_first(&ci->i_xattrs.index);
-
-       dout("__destroy_xattrs p=%p\n", p);
-
-       while (p) {
-               xattr = rb_entry(p, struct ceph_inode_xattr, node);
-               tmp = p;
-               p = rb_next(tmp);
-               dout("__destroy_xattrs next p=%p (%.*s)\n", p,
-                    xattr->name_len, xattr->name);
-               rb_erase(tmp, &ci->i_xattrs.index);
-
-               __free_xattr(xattr);
-       }
-
-       ci->i_xattrs.names_size = 0;
-       ci->i_xattrs.vals_size = 0;
-       ci->i_xattrs.index_version = 0;
-       ci->i_xattrs.count = 0;
-       ci->i_xattrs.index = RB_ROOT;
-}
-
-static int __build_xattrs(struct inode *inode)
-{
-       u32 namelen;
-       u32 numattr = 0;
-       void *p, *end;
-       u32 len;
-       const char *name, *val;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int xattr_version;
-       struct ceph_inode_xattr **xattrs = NULL;
-       int err;
-       int i;
-
-       dout("__build_xattrs() len=%d\n",
-            ci->i_xattrs.blob ? (int)ci->i_xattrs.blob->vec.iov_len : 0);
-
-       if (ci->i_xattrs.index_version >= ci->i_xattrs.version)
-               return 0; /* already built */
-
-       __destroy_xattrs(ci);
-
-start:
-       /* updated internal xattr rb tree */
-       if (ci->i_xattrs.blob && ci->i_xattrs.blob->vec.iov_len > 4) {
-               p = ci->i_xattrs.blob->vec.iov_base;
-               end = p + ci->i_xattrs.blob->vec.iov_len;
-               ceph_decode_32_safe(&p, end, numattr, bad);
-               xattr_version = ci->i_xattrs.version;
-               spin_unlock(&inode->i_lock);
-
-               xattrs = kcalloc(numattr, sizeof(struct ceph_xattr *),
-                                GFP_NOFS);
-               err = -ENOMEM;
-               if (!xattrs)
-                       goto bad_lock;
-               memset(xattrs, 0, numattr*sizeof(struct ceph_xattr *));
-               for (i = 0; i < numattr; i++) {
-                       xattrs[i] = kmalloc(sizeof(struct ceph_inode_xattr),
-                                           GFP_NOFS);
-                       if (!xattrs[i])
-                               goto bad_lock;
-               }
-
-               spin_lock(&inode->i_lock);
-               if (ci->i_xattrs.version != xattr_version) {
-                       /* lost a race, retry */
-                       for (i = 0; i < numattr; i++)
-                               kfree(xattrs[i]);
-                       kfree(xattrs);
-                       goto start;
-               }
-               err = -EIO;
-               while (numattr--) {
-                       ceph_decode_32_safe(&p, end, len, bad);
-                       namelen = len;
-                       name = p;
-                       p += len;
-                       ceph_decode_32_safe(&p, end, len, bad);
-                       val = p;
-                       p += len;
-
-                       err = __set_xattr(ci, name, namelen, val, len,
-                                         0, 0, 0, &xattrs[numattr]);
-
-                       if (err < 0)
-                               goto bad;
-               }
-               kfree(xattrs);
-       }
-       ci->i_xattrs.index_version = ci->i_xattrs.version;
-       ci->i_xattrs.dirty = false;
-
-       return err;
-bad_lock:
-       spin_lock(&inode->i_lock);
-bad:
-       if (xattrs) {
-               for (i = 0; i < numattr; i++)
-                       kfree(xattrs[i]);
-               kfree(xattrs);
-       }
-       ci->i_xattrs.names_size = 0;
-       return err;
-}
-
-static int __get_required_blob_size(struct ceph_inode_info *ci, int name_size,
-                                   int val_size)
-{
-       /*
-        * 4 bytes for the length, and additional 4 bytes per each xattr name,
-        * 4 bytes per each value
-        */
-       int size = 4 + ci->i_xattrs.count*(4 + 4) +
-                            ci->i_xattrs.names_size +
-                            ci->i_xattrs.vals_size;
-       dout("__get_required_blob_size c=%d names.size=%d vals.size=%d\n",
-            ci->i_xattrs.count, ci->i_xattrs.names_size,
-            ci->i_xattrs.vals_size);
-
-       if (name_size)
-               size += 4 + 4 + name_size + val_size;
-
-       return size;
-}
-
-/*
- * If there are dirty xattrs, reencode xattrs into the prealloc_blob
- * and swap into place.
- */
-void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
-{
-       struct rb_node *p;
-       struct ceph_inode_xattr *xattr = NULL;
-       void *dest;
-
-       dout("__build_xattrs_blob %p\n", &ci->vfs_inode);
-       if (ci->i_xattrs.dirty) {
-               int need = __get_required_blob_size(ci, 0, 0);
-
-               BUG_ON(need > ci->i_xattrs.prealloc_blob->alloc_len);
-
-               p = rb_first(&ci->i_xattrs.index);
-               dest = ci->i_xattrs.prealloc_blob->vec.iov_base;
-
-               ceph_encode_32(&dest, ci->i_xattrs.count);
-               while (p) {
-                       xattr = rb_entry(p, struct ceph_inode_xattr, node);
-
-                       ceph_encode_32(&dest, xattr->name_len);
-                       memcpy(dest, xattr->name, xattr->name_len);
-                       dest += xattr->name_len;
-                       ceph_encode_32(&dest, xattr->val_len);
-                       memcpy(dest, xattr->val, xattr->val_len);
-                       dest += xattr->val_len;
-
-                       p = rb_next(p);
-               }
-
-               /* adjust buffer len; it may be larger than we need */
-               ci->i_xattrs.prealloc_blob->vec.iov_len =
-                       dest - ci->i_xattrs.prealloc_blob->vec.iov_base;
-
-               ceph_buffer_put(ci->i_xattrs.blob);
-               ci->i_xattrs.blob = ci->i_xattrs.prealloc_blob;
-               ci->i_xattrs.prealloc_blob = NULL;
-               ci->i_xattrs.dirty = false;
-       }
-}
-
-ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
-                     size_t size)
-{
-       struct inode *inode = dentry->d_inode;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int err;
-       struct _ceph_vir_xattr_cb *vir_xattr;
-       struct ceph_inode_xattr *xattr;
-
-       /* let's see if a virtual xattr was requested */
-       vir_xattr = _ceph_match_vir_xattr(name);
-       if (vir_xattr)
-               return (vir_xattr->getxattr_cb)(ci, value, size);
-
-       spin_lock(&inode->i_lock);
-       dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
-            ci->i_xattrs.version, ci->i_xattrs.index_version);
-
-       if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
-           (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
-               goto get_xattr;
-       } else {
-               spin_unlock(&inode->i_lock);
-               /* get xattrs from mds (if we don't already have them) */
-               err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
-               if (err)
-                       return err;
-       }
-
-       spin_lock(&inode->i_lock);
-
-       err = -ENODATA;  /* == ENOATTR */
-
-       err = __build_xattrs(inode);
-       if (err < 0)
-               goto out;
-
-get_xattr:
-       err = -ENODATA;
-       xattr = __get_xattr(ci, name);
-       if (!xattr)
-               goto out;
-
-       err = -ERANGE;
-       if (size && size < xattr->val_len)
-               goto out;
-
-       err = xattr->val_len;
-       if (size == 0)
-               goto out;
-
-       memcpy(value, xattr->val, xattr->val_len);
-
-out:
-       spin_unlock(&inode->i_lock);
-       return err;
-}
-
-ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
-{
-       struct inode *inode = dentry->d_inode;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       u32 vir_namelen = 0;
-       u32 namelen;
-       int err;
-       u32 len;
-       int i;
-
-       spin_lock(&inode->i_lock);
-       dout("listxattr %p ver=%lld index_ver=%lld\n", inode,
-            ci->i_xattrs.version, ci->i_xattrs.index_version);
-
-       if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
-           (ci->i_xattrs.index_version > ci->i_xattrs.version)) {
-               goto list_xattr;
-       } else {
-               spin_unlock(&inode->i_lock);
-               err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
-               if (err)
-                       return err;
-       }
-
-       spin_lock(&inode->i_lock);
-
-       err = __build_xattrs(inode);
-       if (err < 0)
-               goto out;
-
-list_xattr:
-       vir_namelen = 0;
-       /* include virtual dir xattrs */
-       if ((inode->i_mode & S_IFMT) == S_IFDIR)
-               for (i = 0; _ceph_vir_xattr_recs[i].name; i++)
-                       vir_namelen += strlen(_ceph_vir_xattr_recs[i].name) + 1;
-       /* adding 1 byte per each variable due to the null termination */
-       namelen = vir_namelen + ci->i_xattrs.names_size + ci->i_xattrs.count;
-       err = -ERANGE;
-       if (size && namelen > size)
-               goto out;
-
-       err = namelen;
-       if (size == 0)
-               goto out;
-
-       names = __copy_xattr_names(ci, names);
-
-       /* virtual xattr names, too */
-       if ((inode->i_mode & S_IFMT) == S_IFDIR)
-               for (i = 0; _ceph_vir_xattr_recs[i].name; i++) {
-                       len = sprintf(names, "%s",
-                                     _ceph_vir_xattr_recs[i].name);
-                       names += len + 1;
-               }
-
-out:
-       spin_unlock(&inode->i_lock);
-       return err;
-}
-
-static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
-                             const char *value, size_t size, int flags)
-{
-       struct ceph_client *client = ceph_client(dentry->d_sb);
-       struct inode *inode = dentry->d_inode;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       struct inode *parent_inode = dentry->d_parent->d_inode;
-       struct ceph_mds_request *req;
-       struct ceph_mds_client *mdsc = &client->mdsc;
-       int err;
-       int i, nr_pages;
-       struct page **pages = NULL;
-       void *kaddr;
-
-       /* copy value into some pages */
-       nr_pages = calc_pages_for(0, size);
-       if (nr_pages) {
-               pages = kmalloc(sizeof(pages[0])*nr_pages, GFP_NOFS);
-               if (!pages)
-                       return -ENOMEM;
-               err = -ENOMEM;
-               for (i = 0; i < nr_pages; i++) {
-                       pages[i] = alloc_page(GFP_NOFS);
-                       if (!pages[i]) {
-                               nr_pages = i;
-                               goto out;
-                       }
-                       kaddr = kmap(pages[i]);
-                       memcpy(kaddr, value + i*PAGE_CACHE_SIZE,
-                              min(PAGE_CACHE_SIZE, size-i*PAGE_CACHE_SIZE));
-               }
-       }
-
-       dout("setxattr value=%.*s\n", (int)size, value);
-
-       /* do request */
-       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR,
-                                      USE_AUTH_MDS);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
-       req->r_inode = igrab(inode);
-       req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
-       req->r_num_caps = 1;
-       req->r_args.setxattr.flags = cpu_to_le32(flags);
-       req->r_path2 = kstrdup(name, GFP_NOFS);
-
-       req->r_pages = pages;
-       req->r_num_pages = nr_pages;
-       req->r_data_len = size;
-
-       dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
-       err = ceph_mdsc_do_request(mdsc, parent_inode, req);
-       ceph_mdsc_put_request(req);
-       dout("xattr.ver (after): %lld\n", ci->i_xattrs.version);
-
-out:
-       if (pages) {
-               for (i = 0; i < nr_pages; i++)
-                       __free_page(pages[i]);
-               kfree(pages);
-       }
-       return err;
-}
-
-int ceph_setxattr(struct dentry *dentry, const char *name,
-                 const void *value, size_t size, int flags)
-{
-       struct inode *inode = dentry->d_inode;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int err;
-       int name_len = strlen(name);
-       int val_len = size;
-       char *newname = NULL;
-       char *newval = NULL;
-       struct ceph_inode_xattr *xattr = NULL;
-       int issued;
-       int required_blob_size;
-
-       if (ceph_snap(inode) != CEPH_NOSNAP)
-               return -EROFS;
-
-       /* only support user.* xattrs, for now */
-       if (strncmp(name, "user.", 5) != 0)
-               return -EOPNOTSUPP;
-
-       if (_ceph_match_vir_xattr(name) != NULL)
-               return -EOPNOTSUPP;
-
-       /* preallocate memory for xattr name, value, index node */
-       err = -ENOMEM;
-       newname = kmalloc(name_len + 1, GFP_NOFS);
-       if (!newname)
-               goto out;
-       memcpy(newname, name, name_len + 1);
-
-       if (val_len) {
-               newval = kmalloc(val_len + 1, GFP_NOFS);
-               if (!newval)
-                       goto out;
-               memcpy(newval, value, val_len);
-               newval[val_len] = '\0';
-       }
-
-       xattr = kmalloc(sizeof(struct ceph_inode_xattr), GFP_NOFS);
-       if (!xattr)
-               goto out;
-
-       spin_lock(&inode->i_lock);
-retry:
-       issued = __ceph_caps_issued(ci, NULL);
-       if (!(issued & CEPH_CAP_XATTR_EXCL))
-               goto do_sync;
-       __build_xattrs(inode);
-
-       required_blob_size = __get_required_blob_size(ci, name_len, val_len);
-
-       if (!ci->i_xattrs.prealloc_blob ||
-           required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
-               struct ceph_buffer *blob = NULL;
-
-               spin_unlock(&inode->i_lock);
-               dout(" preaallocating new blob size=%d\n", required_blob_size);
-               blob = ceph_buffer_new_alloc(required_blob_size, GFP_NOFS);
-               if (!blob)
-                       goto out;
-               spin_lock(&inode->i_lock);
-               ceph_buffer_put(ci->i_xattrs.prealloc_blob);
-               ci->i_xattrs.prealloc_blob = blob;
-               goto retry;
-       }
-
-       dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
-       err = __set_xattr(ci, newname, name_len, newval,
-                         val_len, 1, 1, 1, &xattr);
-       __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
-       ci->i_xattrs.dirty = true;
-       inode->i_ctime = CURRENT_TIME;
-       spin_unlock(&inode->i_lock);
-
-       return err;
-
-do_sync:
-       spin_unlock(&inode->i_lock);
-       err = ceph_sync_setxattr(dentry, name, value, size, flags);
-out:
-       kfree(newname);
-       kfree(newval);
-       kfree(xattr);
-       return err;
-}
-
-static int ceph_send_removexattr(struct dentry *dentry, const char *name)
-{
-       struct ceph_client *client = ceph_client(dentry->d_sb);
-       struct ceph_mds_client *mdsc = &client->mdsc;
-       struct inode *inode = dentry->d_inode;
-       struct inode *parent_inode = dentry->d_parent->d_inode;
-       struct ceph_mds_request *req;
-       int err;
-
-       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RMXATTR,
-                                      USE_AUTH_MDS);
-       if (IS_ERR(req))
-               return PTR_ERR(req);
-       req->r_inode = igrab(inode);
-       req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
-       req->r_num_caps = 1;
-       req->r_path2 = kstrdup(name, GFP_NOFS);
-
-       err = ceph_mdsc_do_request(mdsc, parent_inode, req);
-       ceph_mdsc_put_request(req);
-       return err;
-}
-
-int ceph_removexattr(struct dentry *dentry, const char *name)
-{
-       struct inode *inode = dentry->d_inode;
-       struct ceph_inode_info *ci = ceph_inode(inode);
-       int issued;
-       int err;
-
-       if (ceph_snap(inode) != CEPH_NOSNAP)
-               return -EROFS;
-
-       /* only support user.* xattrs, for now */
-       if (strncmp(name, "user.", 5) != 0)
-               return -EOPNOTSUPP;
-
-       if (_ceph_match_vir_xattr(name) != NULL)
-               return -EOPNOTSUPP;
-
-       spin_lock(&inode->i_lock);
-       __build_xattrs(inode);
-       issued = __ceph_caps_issued(ci, NULL);
-       dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
-
-       if (!(issued & CEPH_CAP_XATTR_EXCL))
-               goto do_sync;
-
-       err = __remove_xattr_by_name(ceph_inode(inode), name);
-       __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
-       ci->i_xattrs.dirty = true;
-       inode->i_ctime = CURRENT_TIME;
-
-       spin_unlock(&inode->i_lock);
-
-       return err;
-do_sync:
-       spin_unlock(&inode->i_lock);
-       err = ceph_send_removexattr(dentry, name);
-       return err;
-}
-
index 15544321cd0cee10043e7bfd00b2ab478053dbe4..bb5bbed7dbea4eba0b38832e26453a653bfb3e89 100644 (file)
@@ -814,12 +814,15 @@ extern int ceph_permission(struct inode *inode, int mask);
 extern int ceph_setattr(struct dentry *dentry, struct iattr *attr);
 extern int ceph_getattr(struct vfsmount *mnt, struct dentry *dentry,
                        struct kstat *stat);
+
+/* xattr.c */
 extern int ceph_setxattr(struct dentry *, const char *, const void *,
                         size_t, int);
 extern ssize_t ceph_getxattr(struct dentry *, const char *, void *, size_t);
 extern ssize_t ceph_listxattr(struct dentry *, char *, size_t);
 extern int ceph_removexattr(struct dentry *, const char *);
 extern void __ceph_build_xattrs_blob(struct ceph_inode_info *ci);
+extern void __ceph_destroy_xattrs(struct ceph_inode_info *ci);
 
 /* caps.c */
 extern const char *ceph_cap_string(int c);
diff --git a/src/kernel/xattr.c b/src/kernel/xattr.c
new file mode 100644 (file)
index 0000000..545f890
--- /dev/null
@@ -0,0 +1,814 @@
+#include "ceph_debug.h"
+#include "super.h"
+#include "decode.h"
+
+/*
+ * These define virtual xattrs exposing the recursive directory
+ * statistics and layout metadata.
+ */
+struct ceph_vxattr_cb {
+       bool readonly;
+       char *name;
+       size_t (*getxattr_cb)(struct ceph_inode_info *ci, char *val,
+                             size_t size);
+};
+
+/* directories */
+
+static size_t ceph_vxattrcb_entries(struct ceph_inode_info *ci, char *val,
+                                       size_t size)
+{
+       return snprintf(val, size, "%lld", ci->i_files + ci->i_subdirs);
+}
+
+static size_t ceph_vxattrcb_files(struct ceph_inode_info *ci, char *val,
+                                     size_t size)
+{
+       return snprintf(val, size, "%lld", ci->i_files);
+}
+
+static size_t ceph_vxattrcb_subdirs(struct ceph_inode_info *ci, char *val,
+                                       size_t size)
+{
+       return snprintf(val, size, "%lld", ci->i_subdirs);
+}
+
+static size_t ceph_vxattrcb_rentries(struct ceph_inode_info *ci, char *val,
+                                        size_t size)
+{
+       return snprintf(val, size, "%lld", ci->i_rfiles + ci->i_rsubdirs);
+}
+
+static size_t ceph_vxattrcb_rfiles(struct ceph_inode_info *ci, char *val,
+                                      size_t size)
+{
+       return snprintf(val, size, "%lld", ci->i_rfiles);
+}
+
+static size_t ceph_vxattrcb_rsubdirs(struct ceph_inode_info *ci, char *val,
+                                        size_t size)
+{
+       return snprintf(val, size, "%lld", ci->i_rsubdirs);
+}
+
+static size_t ceph_vxattrcb_rbytes(struct ceph_inode_info *ci, char *val,
+                                      size_t size)
+{
+       return snprintf(val, size, "%lld", ci->i_rbytes);
+}
+
+static size_t ceph_vxattrcb_rctime(struct ceph_inode_info *ci, char *val,
+                                      size_t size)
+{
+       return snprintf(val, size, "%ld.%ld", (long)ci->i_rctime.tv_sec,
+                       (long)ci->i_rctime.tv_nsec);
+}
+
+static struct ceph_vxattr_cb ceph_dir_vxattrs[] = {
+       { true, "user.ceph.dir.entries", ceph_vxattrcb_entries},
+       { true, "user.ceph.dir.files", ceph_vxattrcb_files},
+       { true, "user.ceph.dir.subdirs", ceph_vxattrcb_subdirs},
+       { true, "user.ceph.dir.rentries", ceph_vxattrcb_rentries},
+       { true, "user.ceph.dir.rfiles", ceph_vxattrcb_rfiles},
+       { true, "user.ceph.dir.rsubdirs", ceph_vxattrcb_rsubdirs},
+       { true, "user.ceph.dir.rbytes", ceph_vxattrcb_rbytes},
+       { true, "user.ceph.dir.rctime", ceph_vxattrcb_rctime},
+       { true, NULL, NULL }
+};
+
+/* files */
+
+static size_t ceph_vxattrcb_layout(struct ceph_inode_info *ci, char *val,
+                                  size_t size)
+{
+       return snprintf(val, size,
+               "chunk_bytes=%lld\nstripe_count=%lld\nobject_size=%lld\n",
+               (unsigned long long)ceph_file_layout_su(ci->i_layout),
+               (unsigned long long)ceph_file_layout_stripe_count(ci->i_layout),
+               (unsigned long long)ceph_file_layout_object_size(ci->i_layout));
+}
+
+static struct ceph_vxattr_cb ceph_file_vxattrs[] = {
+       { false, "user.ceph.layout", ceph_vxattrcb_layout},
+       { NULL, NULL }
+};
+
+static struct ceph_vxattr_cb *ceph_inode_vxattrs(struct inode *inode)
+{
+       if (S_ISDIR(inode->i_mode))
+               return ceph_dir_vxattrs;
+       else if (S_ISREG(inode->i_mode))
+               return ceph_file_vxattrs;
+       return NULL;
+}
+
+static struct ceph_vxattr_cb *ceph_match_vxattr(struct ceph_vxattr_cb *vxattr,
+                                               const char *name)
+{
+       do {
+               if (strcmp(vxattr->name, name) == 0)
+                       return vxattr;
+               vxattr++;
+       } while (vxattr->name);
+       return NULL;
+}
+
+static int __set_xattr(struct ceph_inode_info *ci,
+                          const char *name, int name_len,
+                          const char *val, int val_len,
+                          int dirty,
+                          int should_free_name, int should_free_val,
+                          struct ceph_inode_xattr **newxattr)
+{
+       struct rb_node **p;
+       struct rb_node *parent = NULL;
+       struct ceph_inode_xattr *xattr = NULL;
+       int c;
+       int new = 0;
+
+       p = &ci->i_xattrs.index.rb_node;
+       while (*p) {
+               parent = *p;
+               xattr = rb_entry(parent, struct ceph_inode_xattr, node);
+               c = strncmp(name, xattr->name, min(name_len, xattr->name_len));
+               if (c < 0)
+                       p = &(*p)->rb_left;
+               else if (c > 0)
+                       p = &(*p)->rb_right;
+               else {
+                       if (name_len == xattr->name_len)
+                               break;
+                       else if (name_len < xattr->name_len)
+                               p = &(*p)->rb_left;
+                       else
+                               p = &(*p)->rb_right;
+               }
+               xattr = NULL;
+       }
+
+       if (!xattr) {
+               new = 1;
+               xattr = *newxattr;
+               xattr->name = name;
+               xattr->name_len = name_len;
+               xattr->should_free_name = should_free_name;
+
+               ci->i_xattrs.count++;
+               dout("__set_xattr count=%d\n", ci->i_xattrs.count);
+       } else {
+               kfree(*newxattr);
+               *newxattr = NULL;
+               if (xattr->should_free_val)
+                       kfree((void *)xattr->val);
+
+               if (should_free_name) {
+                       kfree((void *)name);
+                       name = xattr->name;
+               }
+               ci->i_xattrs.names_size -= xattr->name_len;
+               ci->i_xattrs.vals_size -= xattr->val_len;
+       }
+       if (!xattr) {
+               pr_err("__set_xattr ENOMEM on %p %llx.%llx xattr %s=%s\n",
+                      &ci->vfs_inode, ceph_vinop(&ci->vfs_inode), name,
+                      xattr->val);
+               return -ENOMEM;
+       }
+       ci->i_xattrs.names_size += name_len;
+       ci->i_xattrs.vals_size += val_len;
+       if (val)
+               xattr->val = val;
+       else
+               xattr->val = "";
+
+       xattr->val_len = val_len;
+       xattr->dirty = dirty;
+       xattr->should_free_val = (val && should_free_val);
+
+       if (new) {
+               rb_link_node(&xattr->node, parent, p);
+               rb_insert_color(&xattr->node, &ci->i_xattrs.index);
+               dout("__set_xattr_val p=%p\n", p);
+       }
+
+       dout("__set_xattr_val added %llx.%llx xattr %p %s=%.*s\n",
+            ceph_vinop(&ci->vfs_inode), xattr, name, val_len, val);
+
+       return 0;
+}
+
+static struct ceph_inode_xattr *__get_xattr(struct ceph_inode_info *ci,
+                          const char *name)
+{
+       struct rb_node **p;
+       struct rb_node *parent = NULL;
+       struct ceph_inode_xattr *xattr = NULL;
+       int c;
+
+       p = &ci->i_xattrs.index.rb_node;
+       while (*p) {
+               parent = *p;
+               xattr = rb_entry(parent, struct ceph_inode_xattr, node);
+               c = strncmp(name, xattr->name, xattr->name_len);
+               if (c < 0)
+                       p = &(*p)->rb_left;
+               else if (c > 0)
+                       p = &(*p)->rb_right;
+               else {
+                       dout("__get_xattr %s: found %.*s\n", name,
+                            xattr->val_len, xattr->val);
+                       return xattr;
+               }
+       }
+
+       dout("__get_xattr %s: not found\n", name);
+
+       return NULL;
+}
+
+static void __free_xattr(struct ceph_inode_xattr *xattr)
+{
+       BUG_ON(!xattr);
+
+       if (xattr->should_free_name)
+               kfree((void *)xattr->name);
+       if (xattr->should_free_val)
+               kfree((void *)xattr->val);
+
+       kfree(xattr);
+}
+
+static int __remove_xattr(struct ceph_inode_info *ci,
+                         struct ceph_inode_xattr *xattr)
+{
+       if (!xattr)
+               return -EOPNOTSUPP;
+
+       rb_erase(&xattr->node, &ci->i_xattrs.index);
+
+       if (xattr->should_free_name)
+               kfree((void *)xattr->name);
+       if (xattr->should_free_val)
+               kfree((void *)xattr->val);
+
+       ci->i_xattrs.names_size -= xattr->name_len;
+       ci->i_xattrs.vals_size -= xattr->val_len;
+       ci->i_xattrs.count--;
+       kfree(xattr);
+
+       return 0;
+}
+
+static int __remove_xattr_by_name(struct ceph_inode_info *ci,
+                          const char *name)
+{
+       struct rb_node **p;
+       struct ceph_inode_xattr *xattr;
+       int err;
+
+       p = &ci->i_xattrs.index.rb_node;
+       xattr = __get_xattr(ci, name);
+       err = __remove_xattr(ci, xattr);
+       return err;
+}
+
+static char *__copy_xattr_names(struct ceph_inode_info *ci,
+                               char *dest)
+{
+       struct rb_node *p;
+       struct ceph_inode_xattr *xattr = NULL;
+
+       p = rb_first(&ci->i_xattrs.index);
+       dout("__copy_xattr_names count=%d\n", ci->i_xattrs.count);
+
+       while (p) {
+               xattr = rb_entry(p, struct ceph_inode_xattr, node);
+               memcpy(dest, xattr->name, xattr->name_len);
+               dest[xattr->name_len] = '\0';
+
+               dout("dest=%s %p (%s) (%d/%d)\n", dest, xattr, xattr->name,
+                    xattr->name_len, ci->i_xattrs.names_size);
+
+               dest += xattr->name_len + 1;
+               p = rb_next(p);
+       }
+
+       return dest;
+}
+
+void __ceph_destroy_xattrs(struct ceph_inode_info *ci)
+{
+       struct rb_node *p, *tmp;
+       struct ceph_inode_xattr *xattr = NULL;
+
+       p = rb_first(&ci->i_xattrs.index);
+
+       dout("__ceph_destroy_xattrs p=%p\n", p);
+
+       while (p) {
+               xattr = rb_entry(p, struct ceph_inode_xattr, node);
+               tmp = p;
+               p = rb_next(tmp);
+               dout("__ceph_destroy_xattrs next p=%p (%.*s)\n", p,
+                    xattr->name_len, xattr->name);
+               rb_erase(tmp, &ci->i_xattrs.index);
+
+               __free_xattr(xattr);
+       }
+
+       ci->i_xattrs.names_size = 0;
+       ci->i_xattrs.vals_size = 0;
+       ci->i_xattrs.index_version = 0;
+       ci->i_xattrs.count = 0;
+       ci->i_xattrs.index = RB_ROOT;
+}
+
+static int __build_xattrs(struct inode *inode)
+{
+       u32 namelen;
+       u32 numattr = 0;
+       void *p, *end;
+       u32 len;
+       const char *name, *val;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       int xattr_version;
+       struct ceph_inode_xattr **xattrs = NULL;
+       int err;
+       int i;
+
+       dout("__build_xattrs() len=%d\n",
+            ci->i_xattrs.blob ? (int)ci->i_xattrs.blob->vec.iov_len : 0);
+
+       if (ci->i_xattrs.index_version >= ci->i_xattrs.version)
+               return 0; /* already built */
+
+       __ceph_destroy_xattrs(ci);
+
+start:
+       /* updated internal xattr rb tree */
+       if (ci->i_xattrs.blob && ci->i_xattrs.blob->vec.iov_len > 4) {
+               p = ci->i_xattrs.blob->vec.iov_base;
+               end = p + ci->i_xattrs.blob->vec.iov_len;
+               ceph_decode_32_safe(&p, end, numattr, bad);
+               xattr_version = ci->i_xattrs.version;
+               spin_unlock(&inode->i_lock);
+
+               xattrs = kcalloc(numattr, sizeof(struct ceph_xattr *),
+                                GFP_NOFS);
+               err = -ENOMEM;
+               if (!xattrs)
+                       goto bad_lock;
+               memset(xattrs, 0, numattr*sizeof(struct ceph_xattr *));
+               for (i = 0; i < numattr; i++) {
+                       xattrs[i] = kmalloc(sizeof(struct ceph_inode_xattr),
+                                           GFP_NOFS);
+                       if (!xattrs[i])
+                               goto bad_lock;
+               }
+
+               spin_lock(&inode->i_lock);
+               if (ci->i_xattrs.version != xattr_version) {
+                       /* lost a race, retry */
+                       for (i = 0; i < numattr; i++)
+                               kfree(xattrs[i]);
+                       kfree(xattrs);
+                       goto start;
+               }
+               err = -EIO;
+               while (numattr--) {
+                       ceph_decode_32_safe(&p, end, len, bad);
+                       namelen = len;
+                       name = p;
+                       p += len;
+                       ceph_decode_32_safe(&p, end, len, bad);
+                       val = p;
+                       p += len;
+
+                       err = __set_xattr(ci, name, namelen, val, len,
+                                         0, 0, 0, &xattrs[numattr]);
+
+                       if (err < 0)
+                               goto bad;
+               }
+               kfree(xattrs);
+       }
+       ci->i_xattrs.index_version = ci->i_xattrs.version;
+       ci->i_xattrs.dirty = false;
+
+       return err;
+bad_lock:
+       spin_lock(&inode->i_lock);
+bad:
+       if (xattrs) {
+               for (i = 0; i < numattr; i++)
+                       kfree(xattrs[i]);
+               kfree(xattrs);
+       }
+       ci->i_xattrs.names_size = 0;
+       return err;
+}
+
+static int __get_required_blob_size(struct ceph_inode_info *ci, int name_size,
+                                   int val_size)
+{
+       /*
+        * 4 bytes for the length, and additional 4 bytes per each xattr name,
+        * 4 bytes per each value
+        */
+       int size = 4 + ci->i_xattrs.count*(4 + 4) +
+                            ci->i_xattrs.names_size +
+                            ci->i_xattrs.vals_size;
+       dout("__get_required_blob_size c=%d names.size=%d vals.size=%d\n",
+            ci->i_xattrs.count, ci->i_xattrs.names_size,
+            ci->i_xattrs.vals_size);
+
+       if (name_size)
+               size += 4 + 4 + name_size + val_size;
+
+       return size;
+}
+
+/*
+ * If there are dirty xattrs, reencode xattrs into the prealloc_blob
+ * and swap into place.
+ */
+void __ceph_build_xattrs_blob(struct ceph_inode_info *ci)
+{
+       struct rb_node *p;
+       struct ceph_inode_xattr *xattr = NULL;
+       void *dest;
+
+       dout("__build_xattrs_blob %p\n", &ci->vfs_inode);
+       if (ci->i_xattrs.dirty) {
+               int need = __get_required_blob_size(ci, 0, 0);
+
+               BUG_ON(need > ci->i_xattrs.prealloc_blob->alloc_len);
+
+               p = rb_first(&ci->i_xattrs.index);
+               dest = ci->i_xattrs.prealloc_blob->vec.iov_base;
+
+               ceph_encode_32(&dest, ci->i_xattrs.count);
+               while (p) {
+                       xattr = rb_entry(p, struct ceph_inode_xattr, node);
+
+                       ceph_encode_32(&dest, xattr->name_len);
+                       memcpy(dest, xattr->name, xattr->name_len);
+                       dest += xattr->name_len;
+                       ceph_encode_32(&dest, xattr->val_len);
+                       memcpy(dest, xattr->val, xattr->val_len);
+                       dest += xattr->val_len;
+
+                       p = rb_next(p);
+               }
+
+               /* adjust buffer len; it may be larger than we need */
+               ci->i_xattrs.prealloc_blob->vec.iov_len =
+                       dest - ci->i_xattrs.prealloc_blob->vec.iov_base;
+
+               ceph_buffer_put(ci->i_xattrs.blob);
+               ci->i_xattrs.blob = ci->i_xattrs.prealloc_blob;
+               ci->i_xattrs.prealloc_blob = NULL;
+               ci->i_xattrs.dirty = false;
+       }
+}
+
+ssize_t ceph_getxattr(struct dentry *dentry, const char *name, void *value,
+                     size_t size)
+{
+       struct inode *inode = dentry->d_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
+       int err;
+       struct ceph_inode_xattr *xattr;
+       struct ceph_vxattr_cb *vxattr = NULL;
+
+       /* let's see if a virtual xattr was requested */
+       if (vxattrs)
+               vxattr = ceph_match_vxattr(vxattrs, name);
+
+       spin_lock(&inode->i_lock);
+       dout("getxattr %p ver=%lld index_ver=%lld\n", inode,
+            ci->i_xattrs.version, ci->i_xattrs.index_version);
+
+       if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
+           (ci->i_xattrs.index_version >= ci->i_xattrs.version)) {
+               goto get_xattr;
+       } else {
+               spin_unlock(&inode->i_lock);
+               /* get xattrs from mds (if we don't already have them) */
+               err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
+               if (err)
+                       return err;
+       }
+
+       spin_lock(&inode->i_lock);
+
+       if (vxattr && vxattr->readonly) {
+               err = vxattr->getxattr_cb(ci, value, size);
+               goto out;
+       }
+
+       err = __build_xattrs(inode);
+       if (err < 0)
+               goto out;
+
+get_xattr:
+       err = -ENODATA;  /* == ENOATTR */
+       xattr = __get_xattr(ci, name);
+       if (!xattr) {
+               if (vxattr)
+                       err = vxattr->getxattr_cb(ci, value, size);
+               goto out;
+       }
+
+       err = -ERANGE;
+       if (size && size < xattr->val_len)
+               goto out;
+
+       err = xattr->val_len;
+       if (size == 0)
+               goto out;
+
+       memcpy(value, xattr->val, xattr->val_len);
+
+out:
+       spin_unlock(&inode->i_lock);
+       return err;
+}
+
+ssize_t ceph_listxattr(struct dentry *dentry, char *names, size_t size)
+{
+       struct inode *inode = dentry->d_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
+       u32 vir_namelen = 0;
+       u32 namelen;
+       int err;
+       u32 len;
+       int i;
+
+       spin_lock(&inode->i_lock);
+       dout("listxattr %p ver=%lld index_ver=%lld\n", inode,
+            ci->i_xattrs.version, ci->i_xattrs.index_version);
+
+       if (__ceph_caps_issued_mask(ci, CEPH_CAP_XATTR_SHARED, 1) &&
+           (ci->i_xattrs.index_version > ci->i_xattrs.version)) {
+               goto list_xattr;
+       } else {
+               spin_unlock(&inode->i_lock);
+               err = ceph_do_getattr(inode, CEPH_STAT_CAP_XATTR);
+               if (err)
+                       return err;
+       }
+
+       spin_lock(&inode->i_lock);
+
+       err = __build_xattrs(inode);
+       if (err < 0)
+               goto out;
+
+list_xattr:
+       vir_namelen = 0;
+       /* include virtual dir xattrs */
+       if (vxattrs)
+               for (i = 0; vxattrs[i].name; i++)
+                       vir_namelen += strlen(vxattrs[i].name) + 1;
+       /* adding 1 byte per each variable due to the null termination */
+       namelen = vir_namelen + ci->i_xattrs.names_size + ci->i_xattrs.count;
+       err = -ERANGE;
+       if (size && namelen > size)
+               goto out;
+
+       err = namelen;
+       if (size == 0)
+               goto out;
+
+       names = __copy_xattr_names(ci, names);
+
+       /* virtual xattr names, too */
+       if (vxattrs)
+               for (i = 0; vxattrs[i].name; i++) {
+                       len = sprintf(names, "%s", vxattrs[i].name);
+                       names += len + 1;
+               }
+
+out:
+       spin_unlock(&inode->i_lock);
+       return err;
+}
+
+static int ceph_sync_setxattr(struct dentry *dentry, const char *name,
+                             const char *value, size_t size, int flags)
+{
+       struct ceph_client *client = ceph_client(dentry->d_sb);
+       struct inode *inode = dentry->d_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct inode *parent_inode = dentry->d_parent->d_inode;
+       struct ceph_mds_request *req;
+       struct ceph_mds_client *mdsc = &client->mdsc;
+       int err;
+       int i, nr_pages;
+       struct page **pages = NULL;
+       void *kaddr;
+
+       /* copy value into some pages */
+       nr_pages = calc_pages_for(0, size);
+       if (nr_pages) {
+               pages = kmalloc(sizeof(pages[0])*nr_pages, GFP_NOFS);
+               if (!pages)
+                       return -ENOMEM;
+               err = -ENOMEM;
+               for (i = 0; i < nr_pages; i++) {
+                       pages[i] = alloc_page(GFP_NOFS);
+                       if (!pages[i]) {
+                               nr_pages = i;
+                               goto out;
+                       }
+                       kaddr = kmap(pages[i]);
+                       memcpy(kaddr, value + i*PAGE_CACHE_SIZE,
+                              min(PAGE_CACHE_SIZE, size-i*PAGE_CACHE_SIZE));
+               }
+       }
+
+       dout("setxattr value=%.*s\n", (int)size, value);
+
+       /* do request */
+       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_SETXATTR,
+                                      USE_AUTH_MDS);
+       if (IS_ERR(req))
+               return PTR_ERR(req);
+       req->r_inode = igrab(inode);
+       req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
+       req->r_num_caps = 1;
+       req->r_args.setxattr.flags = cpu_to_le32(flags);
+       req->r_path2 = kstrdup(name, GFP_NOFS);
+
+       req->r_pages = pages;
+       req->r_num_pages = nr_pages;
+       req->r_data_len = size;
+
+       dout("xattr.ver (before): %lld\n", ci->i_xattrs.version);
+       err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+       ceph_mdsc_put_request(req);
+       dout("xattr.ver (after): %lld\n", ci->i_xattrs.version);
+
+out:
+       if (pages) {
+               for (i = 0; i < nr_pages; i++)
+                       __free_page(pages[i]);
+               kfree(pages);
+       }
+       return err;
+}
+
+int ceph_setxattr(struct dentry *dentry, const char *name,
+                 const void *value, size_t size, int flags)
+{
+       struct inode *inode = dentry->d_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
+       int err;
+       int name_len = strlen(name);
+       int val_len = size;
+       char *newname = NULL;
+       char *newval = NULL;
+       struct ceph_inode_xattr *xattr = NULL;
+       int issued;
+       int required_blob_size;
+
+       if (ceph_snap(inode) != CEPH_NOSNAP)
+               return -EROFS;
+
+       /* only support user.* xattrs, for now */
+       if (strncmp(name, "user.", 5) != 0)
+               return -EOPNOTSUPP;
+
+       if (vxattrs && ceph_match_vxattr(vxattrs, name) != NULL)
+               return -EOPNOTSUPP;
+
+       /* preallocate memory for xattr name, value, index node */
+       err = -ENOMEM;
+       newname = kmalloc(name_len + 1, GFP_NOFS);
+       if (!newname)
+               goto out;
+       memcpy(newname, name, name_len + 1);
+
+       if (val_len) {
+               newval = kmalloc(val_len + 1, GFP_NOFS);
+               if (!newval)
+                       goto out;
+               memcpy(newval, value, val_len);
+               newval[val_len] = '\0';
+       }
+
+       xattr = kmalloc(sizeof(struct ceph_inode_xattr), GFP_NOFS);
+       if (!xattr)
+               goto out;
+
+       spin_lock(&inode->i_lock);
+retry:
+       issued = __ceph_caps_issued(ci, NULL);
+       if (!(issued & CEPH_CAP_XATTR_EXCL))
+               goto do_sync;
+       __build_xattrs(inode);
+
+       required_blob_size = __get_required_blob_size(ci, name_len, val_len);
+
+       if (!ci->i_xattrs.prealloc_blob ||
+           required_blob_size > ci->i_xattrs.prealloc_blob->alloc_len) {
+               struct ceph_buffer *blob = NULL;
+
+               spin_unlock(&inode->i_lock);
+               dout(" preaallocating new blob size=%d\n", required_blob_size);
+               blob = ceph_buffer_new_alloc(required_blob_size, GFP_NOFS);
+               if (!blob)
+                       goto out;
+               spin_lock(&inode->i_lock);
+               ceph_buffer_put(ci->i_xattrs.prealloc_blob);
+               ci->i_xattrs.prealloc_blob = blob;
+               goto retry;
+       }
+
+       dout("setxattr %p issued %s\n", inode, ceph_cap_string(issued));
+       err = __set_xattr(ci, newname, name_len, newval,
+                         val_len, 1, 1, 1, &xattr);
+       __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
+       ci->i_xattrs.dirty = true;
+       inode->i_ctime = CURRENT_TIME;
+       spin_unlock(&inode->i_lock);
+
+       return err;
+
+do_sync:
+       spin_unlock(&inode->i_lock);
+       err = ceph_sync_setxattr(dentry, name, value, size, flags);
+out:
+       kfree(newname);
+       kfree(newval);
+       kfree(xattr);
+       return err;
+}
+
+static int ceph_send_removexattr(struct dentry *dentry, const char *name)
+{
+       struct ceph_client *client = ceph_client(dentry->d_sb);
+       struct ceph_mds_client *mdsc = &client->mdsc;
+       struct inode *inode = dentry->d_inode;
+       struct inode *parent_inode = dentry->d_parent->d_inode;
+       struct ceph_mds_request *req;
+       int err;
+
+       req = ceph_mdsc_create_request(mdsc, CEPH_MDS_OP_RMXATTR,
+                                      USE_AUTH_MDS);
+       if (IS_ERR(req))
+               return PTR_ERR(req);
+       req->r_inode = igrab(inode);
+       req->r_inode_drop = CEPH_CAP_XATTR_SHARED;
+       req->r_num_caps = 1;
+       req->r_path2 = kstrdup(name, GFP_NOFS);
+
+       err = ceph_mdsc_do_request(mdsc, parent_inode, req);
+       ceph_mdsc_put_request(req);
+       return err;
+}
+
+int ceph_removexattr(struct dentry *dentry, const char *name)
+{
+       struct inode *inode = dentry->d_inode;
+       struct ceph_inode_info *ci = ceph_inode(inode);
+       struct ceph_vxattr_cb *vxattrs = ceph_inode_vxattrs(inode);
+       int issued;
+       int err;
+
+       if (ceph_snap(inode) != CEPH_NOSNAP)
+               return -EROFS;
+
+       /* only support user.* xattrs, for now */
+       if (strncmp(name, "user.", 5) != 0)
+               return -EOPNOTSUPP;
+
+       if (vxattrs && ceph_match_vxattr(vxattrs, name) != NULL)
+               return -EOPNOTSUPP;
+
+       spin_lock(&inode->i_lock);
+       __build_xattrs(inode);
+       issued = __ceph_caps_issued(ci, NULL);
+       dout("removexattr %p issued %s\n", inode, ceph_cap_string(issued));
+
+       if (!(issued & CEPH_CAP_XATTR_EXCL))
+               goto do_sync;
+
+       err = __remove_xattr_by_name(ceph_inode(inode), name);
+       __ceph_mark_dirty_caps(ci, CEPH_CAP_XATTR_EXCL);
+       ci->i_xattrs.dirty = true;
+       inode->i_ctime = CURRENT_TIME;
+
+       spin_unlock(&inode->i_lock);
+
+       return err;
+do_sync:
+       spin_unlock(&inode->i_lock);
+       err = ceph_send_removexattr(dentry, name);
+       return err;
+}
+