]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: split xattrs to multiple chunks
authorYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 22 Oct 2010 17:09:32 +0000 (10:09 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Fri, 22 Oct 2010 22:55:24 +0000 (15:55 -0700)
src/os/FileStore.cc
src/rados.cc

index 7257d8f7a9e50c0a8916d14fe043e4bc1ee7b592..fcfe01a6e126843ea2f92f5c218fb8f213dfb2fb 100644 (file)
@@ -47,7 +47,8 @@
 #include <sstream>
 
 
-#define ATTR_MAX 80
+#define ATTR_MAX_NAME_LEN  128
+#define ATTR_MAX_BLOCK_LEN 512 // (3*1024)
 
 #define COMMIT_SNAP_ITEM "snap_%lld"
 
 
 #include <map>
 
-
-/*
- * xattr portability stupidity.  hide errno, while we're at it.
- */ 
-
-int do_getxattr(const char *fn, const char *name, void *val, size_t size) {
 #ifdef DARWIN
+static int sys_getxattr(const char *fn, const char *name, void *val, size_t size)
+{
   int r = ::getxattr(fn, name, val, size, 0, 0);
-#else
-  int r = ::getxattr(fn, name, val, size);
-#endif
-  return r < 0 ? -errno:r;
+  return (r < 0 ? -errno : r);
 }
-int do_setxattr(const char *fn, const char *name, const void *val, size_t size) {
-#ifdef DARWIN
+
+static int sys_setxattr(const char *fn, const char *name, const void *val, size_t size)
+{
   int r = ::setxattr(fn, name, val, size, 0, 0);
-#else
-  int r = ::setxattr(fn, name, val, size, 0);
-#endif
-  return r < 0 ? -errno:r;
+  return (r < 0 ? -errno : r);
 }
-int do_removexattr(const char *fn, const char *name) {
-#ifdef DARWIN
+
+static int sys_removexattr(const char *fn, const char *name)
+{
   int r = ::removexattr(fn, name, 0);
-#else
-  int r = ::removexattr(fn, name);
-#endif
-  return r < 0 ? -errno:r;
+  return (r < 0 ? -errno : r);
 }
-int do_listxattr(const char *fn, char *names, size_t len) {
-#ifdef DARWIN
+
+
+int sys_removexattr(const char *fn, const char *name)
+{
+  int r = ::removexattr(fn, name, 0);
+  return (r < 0 ? -errno : r);
+}
+
+int sys_listxattr(const char *fn, char *names, size_t len)
+{
   int r = ::listxattr(fn, names, len, 0);
+  return (r < 0 ? -errno : r);
+}
 #else
+
+static int sys_getxattr(const char *fn, const char *name, void *val, size_t size)
+{
+  int r = ::getxattr(fn, name, val, size);
+  return (r < 0 ? -errno : r);
+}
+
+static int sys_setxattr(const char *fn, const char *name, const void *val, size_t size)
+{
+  int r = ::setxattr(fn, name, val, size, 0);
+  return (r < 0 ? -errno : r);
+}
+
+static int sys_removexattr(const char *fn, const char *name)
+{
+  int r = ::removexattr(fn, name);
+  return (r < 0 ? -errno : r);
+}
+
+int sys_listxattr(const char *fn, char *names, size_t len)
+{
   int r = ::listxattr(fn, names, len);
+  return (r < 0 ? -errno : r);
+}
 #endif
-  return r < 0 ? -errno:r;
+
+static void get_raw_xattr_name(const char *name, int i, char *raw_name, int raw_len)
+{
+  int r;
+
+  if (!i)
+    r = snprintf(raw_name, raw_len, "%s", name);
+  else
+    r = snprintf(raw_name, raw_len, "%s@%d", name, i);
+
+  assert(r < raw_len);
+}
+
+
+int do_getxattr_len(const char *fn, const char *name)
+{
+  int i = 0, total = 0;
+  char raw_name[ATTR_MAX_NAME_LEN * 2 + 16];
+  int r;
+
+  do {
+    get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
+    r = sys_getxattr(fn, raw_name, 0, 0);
+    if (!i && r < 0) {
+      return r;
+    }
+    if (r < 0)
+      break;
+    total += r;
+    i++;
+  } while (r == ATTR_MAX_BLOCK_LEN);
+
+  return total;
+}
+
+int do_getxattr(const char *fn, const char *name, void *val, size_t size)
+{
+  int i = 0, pos = 0;
+  char raw_name[ATTR_MAX_NAME_LEN * 2 + 16];
+  int ret = 0;
+  int r;
+  size_t chunk_size;
+
+  if (!size)
+    return do_getxattr_len(fn, name);
+
+  do {
+    chunk_size = (size < ATTR_MAX_BLOCK_LEN ? size : ATTR_MAX_BLOCK_LEN);
+    get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
+    size -= chunk_size;
+
+    r = sys_getxattr(fn, raw_name, (char *)val + pos, chunk_size);
+    if (r < 0) {
+      ret = r;
+      break;
+    }
+
+    if (r > 0)
+      pos += r;
+
+    i++;
+  } while (size && r == ATTR_MAX_BLOCK_LEN);
+
+  if (r >= 0) {
+    ret = pos;
+    /* is there another chunk? that can happen if the last read size span over
+       exactly one block */
+    if (chunk_size == ATTR_MAX_BLOCK_LEN) {
+      get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
+      r = sys_getxattr(fn, raw_name, 0, 0);
+      if (r > 0) { // there's another chunk.. the original buffer was too small
+        ret = -ERANGE;
+      }
+    }
+  }
+  return ret;
+}
+
+int do_setxattr(const char *fn, const char *name, const void *val, size_t size) {
+  int i = 0, pos = 0;
+  char raw_name[ATTR_MAX_NAME_LEN * 2 + 16];
+  int ret = 0;
+  size_t chunk_size;
+
+  do {
+    chunk_size = (size < ATTR_MAX_BLOCK_LEN ? size : ATTR_MAX_BLOCK_LEN);
+    get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
+    size -= chunk_size;
+
+    int r = sys_setxattr(fn, raw_name, (char *)val + pos, chunk_size);
+    if (r < 0) {
+      ret = r;
+      break;
+    }
+    pos  += chunk_size;
+    ret = pos;
+    i++;
+  } while (size);
+
+  /* if we're exactly at a chunk size, remove the next one (if wasn't removed
+     before) */
+  if (ret >= 0 && chunk_size == ATTR_MAX_BLOCK_LEN) {
+    get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
+    sys_removexattr(fn, raw_name);
+  }
+  
+  return ret;
+}
+
+int do_removexattr(const char *fn, const char *name) {
+  int i = 0;
+  char raw_name[ATTR_MAX_NAME_LEN * 2 + 16];
+  int r;
+
+  do {
+    get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
+    r = sys_removexattr(fn, raw_name);
+    if (!i && r < 0) {
+      return r;
+    }
+    i++;
+  } while (r >= 0);
+  return 0;
+}
+
+int do_listxattr(const char *fn, char *names, size_t len) {
+  int r;
+
+  if (!len)
+   return sys_listxattr(fn, names, len);
+
+  r = sys_listxattr(fn, 0, 0);
+  if (r < 0)
+    return r;
+
+  size_t total_len = r  * 2; // should be enough
+  char *full_buf = (char *)malloc(total_len * 2);
+  if (!full_buf)
+    return -ENOMEM;
+
+  r = sys_listxattr(fn, full_buf, total_len);
+  if (r < 0)
+    return r;
+
+  char *p = full_buf;
+  char *end = full_buf + r;
+  char *dest = names;
+  char *dest_end = names + len;
+
+  while (p < end) {
+    int attr_len = strlen(p);
+    if (!strchr(p, '@')) {
+      if (dest + attr_len > dest_end) {
+        r = -ERANGE;
+        goto done;
+      }
+      strcpy(dest, p);
+      dest += attr_len + 1;
+    }
+    p += attr_len + 1;
+  }
+  r = dest - names;
+
+done:
+  free(full_buf);
+  return r;
 }
 
 
@@ -116,6 +304,7 @@ static void get_attrname(const char *name, char *buf, int len)
 {
   snprintf(buf, len, "user.ceph.%s", name);
 }
+
 bool parse_attrname(char **name)
 {
   if (strncmp(*name, "user.ceph.", 10) == 0) {
@@ -1977,8 +2166,8 @@ int FileStore::getattr(coll_t cid, const sobject_t& oid, const char *name,
   char fn[PATH_MAX];
   get_coname(cid, oid, fn, sizeof(fn));
   dout(15) << "getattr " << fn << " '" << name << "' len " << size << dendl;
-  char n[ATTR_MAX];
-  get_attrname(name, n, ATTR_MAX);
+  char n[ATTR_MAX_NAME_LEN];
+  get_attrname(name, n, ATTR_MAX_NAME_LEN);
   int r = do_getxattr(fn, n, value, size);
   dout(10) << "getattr " << fn << " '" << name << "' len " << size << " = " << r << dendl;
   return r;
@@ -1991,8 +2180,8 @@ int FileStore::getattr(coll_t cid, const sobject_t& oid, const char *name, buffe
   char fn[PATH_MAX];
   get_coname(cid, oid, fn, sizeof(fn));
   dout(15) << "getattr " << fn << " '" << name << "'" << dendl;
-  char n[ATTR_MAX];
-  get_attrname(name, n, ATTR_MAX);
+  char n[ATTR_MAX_NAME_LEN];
+  get_attrname(name, n, ATTR_MAX_NAME_LEN);
   int r = _getattr(fn, n, bp);
   dout(10) << "getattr " << fn << " '" << name << "' = " << r << dendl;
   return r;
@@ -2022,8 +2211,8 @@ int FileStore::_setattr(coll_t cid, const sobject_t& oid, const char *name,
   char fn[PATH_MAX];
   get_coname(cid, oid, fn, sizeof(fn));
   dout(15) << "setattr " << fn << " '" << name << "' len " << size << dendl;
-  char n[ATTR_MAX];
-  get_attrname(name, n, ATTR_MAX);
+  char n[ATTR_MAX_NAME_LEN];
+  get_attrname(name, n, ATTR_MAX_NAME_LEN);
   int r = do_setxattr(fn, n, value, size);
   dout(10) << "setattr " << fn << " '" << name << "' len " << size << " = " << r << dendl;
   return r;
@@ -2040,8 +2229,8 @@ int FileStore::_setattrs(coll_t cid, const sobject_t& oid, map<string,bufferptr>
   for (map<string,bufferptr>::iterator p = aset.begin();
        p != aset.end();
        ++p) {
-    char n[ATTR_MAX];
-    get_attrname(p->first.c_str(), n, ATTR_MAX);
+    char n[ATTR_MAX_NAME_LEN];
+    get_attrname(p->first.c_str(), n, ATTR_MAX_NAME_LEN);
     const char *val;
     if (p->second.length())
       val = p->second.c_str();
@@ -2066,8 +2255,8 @@ int FileStore::_rmattr(coll_t cid, const sobject_t& oid, const char *name)
   char fn[PATH_MAX];
   get_coname(cid, oid, fn, sizeof(fn));
   dout(15) << "rmattr " << fn << " '" << name << "'" << dendl;
-  char n[ATTR_MAX];
-  get_attrname(name, n, ATTR_MAX);
+  char n[ATTR_MAX_NAME_LEN];
+  get_attrname(name, n, ATTR_MAX_NAME_LEN);
   int r = do_removexattr(fn, n);
   dout(10) << "rmattr " << fn << " '" << name << "' = " << r << dendl;
   return r;
@@ -2086,8 +2275,8 @@ int FileStore::_rmattrs(coll_t cid, const sobject_t& oid)
   int r = _getattrs(fn, aset);
   if (r >= 0) {
     for (map<string,bufferptr>::iterator p = aset.begin(); p != aset.end(); p++) {
-      char n[ATTR_MAX];
-      get_attrname(p->first.c_str(), n, ATTR_MAX);
+      char n[ATTR_MAX_NAME_LEN];
+      get_attrname(p->first.c_str(), n, ATTR_MAX_NAME_LEN);
       r = do_removexattr(fn, n);
       if (r < 0)
        break;
index c9052637301b1c0d370d317ab31af32d8a8535a0..67b6894eff4b2ae45de98a0bfd72d14a03acf687 100644 (file)
@@ -47,8 +47,9 @@ void usage()
   cerr << "   get objname [outfile] -- fetch object\n";
   cerr << "   put objname [infile] -- write object\n";
   cerr << "   rm objname  -- remove object\n";
-  cerr << "   getxattr objame attr\n";
-  cerr << "   setxattr objame attr val\n";
+  cerr << "   listxattr objname\n";
+  cerr << "   getxattr objname attr\n";
+  cerr << "   setxattr objname attr val\n";
   cerr << "   ls          -- list objects in pool\n\n";
   cerr << "   chown 123   -- change the pool owner to auid 123\n";
 
@@ -317,11 +318,28 @@ int main(int argc, const char **argv)
     bufferlist bl;
     ret = rados.getxattr(p, oid, attr_name.c_str(), bl);
     if (ret < 0) {
-      cerr << "error setting xattr " << pool << "/" << oid << "/" << attr_name << ": " << strerror_r(-ret, buf, sizeof(buf)) << std::endl;
+      cerr << "error getting xattr " << pool << "/" << oid << "/" << attr_name << ": " << strerror_r(-ret, buf, sizeof(buf)) << std::endl;
       goto out;
     }
     string s(bl.c_str(), bl.length());
     cout << s << std::endl;
+  } else if (strcmp(nargs[0], "listxattr") == 0) {
+    if (!pool || nargs.size() < 2)
+      usage();
+
+    string oid(nargs[1]);
+    map<std::string, bufferlist> attrset;
+    bufferlist bl;
+    ret = rados.getxattrs(p, oid, attrset);
+    if (ret < 0) {
+      cerr << "error getting xattr set " << pool << "/" << oid << ": " << strerror_r(-ret, buf, sizeof(buf)) << std::endl;
+      goto out;
+    }
+
+    for (map<std::string, bufferlist>::iterator iter = attrset.begin();
+         iter != attrset.end(); ++iter) {
+      cout << iter->first << std::endl;
+    }
   }
   else if (strcmp(nargs[0], "rm") == 0) {
     if (!pool || nargs.size() < 2)