]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
filestore: introduce long-file-names
authorYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 20 Apr 2011 18:58:39 +0000 (11:58 -0700)
committerYehuda Sadeh <yehuda@hq.newdream.net>
Wed, 20 Apr 2011 18:58:39 +0000 (11:58 -0700)
still needs testing, adjusting of constants and using a real hash function

src/common/debug.h
src/os/FileStore.cc

index d89f95b4248ddc57754e8fdd448bad3b2817e9f1..b0269856fab61338517613f8228e4350f1307e54 100644 (file)
@@ -111,4 +111,6 @@ inline std::ostream& operator<<(std::ostream& out, _bad_endl_use_dendl_t) {
 
 #define derr dout(-1)
 
+#define generic_derr generic_dout(-1)
+
 #endif
index 615ae78549383c629f20d86ad3ce4f1bfa3ad934..3a4878928d3c5fbc4bdbe16f44d51b83ce483c2e 100644 (file)
 
 #include <map>
 
+/*
+ * long file names will have the following format:
+ *
+ * prefix_cookie_hash_index
+ *
+ * The prefix will just be the first X bytes of the original file name.
+ * The cookie is a constant string that hints whether this file name
+ * is hashed
+ */
+#define FILENAME_MAX_LEN 4096 // the long file name size
+
+#define FILENAME_SHORT_LEN 16 // the short file name size
+#define FILENAME_COOKIE "cLFN"  // ceph long file name
+#define FILENAME_HASH_LEN 3
+
+#define FILENAME_PREFIX_LEN (FILENAME_SHORT_LEN - FILENAME_HASH_LEN - 1 - (sizeof(FILENAME_COOKIE) - 1) - 1)
+
 #ifdef DARWIN
 static int sys_getxattr(const char *fn, const char *name, void *val, size_t size)
 {
@@ -95,13 +112,6 @@ static int sys_setxattr(const char *fn, const char *name, const void *val, size_
   return (r < 0 ? -errno : r);
 }
 
-static int sys_removexattr(const char *fn, const char *name)
-{
-  int r = ::removexattr(fn, name, 0);
-  return (r < 0 ? -errno : r);
-}
-
-
 static int sys_removexattr(const char *fn, const char *name)
 {
   int r = ::removexattr(fn, name, 0);
@@ -140,6 +150,329 @@ int sys_listxattr(const char *fn, char *names, size_t len)
 }
 #endif
 
+static int hash_filename(const char *filename, char *hash, int buf_len)
+{
+  if (buf_len < FILENAME_HASH_LEN + 1)
+    return -EINVAL;
+  sprintf(hash, "zzz");
+  return 0;
+}
+
+static void build_filename(char *filename, int len, const char *old_filename, int i)
+{
+  char hash[FILENAME_HASH_LEN + 1];
+
+  assert(len >= FILENAME_SHORT_LEN + 4);
+
+  strncpy(filename, old_filename, FILENAME_PREFIX_LEN);
+  filename[FILENAME_PREFIX_LEN] = '\0';
+  if (strlen(filename) < FILENAME_PREFIX_LEN)
+    return;
+  if (old_filename[FILENAME_PREFIX_LEN] == '\0')
+    return;
+
+  hash_filename(old_filename, hash, sizeof(hash));
+  sprintf(filename + FILENAME_PREFIX_LEN, "_" FILENAME_COOKIE "_%s_%d", hash, i);
+}
+
+/* is this a candidate? */
+static int lfn_is_hashed_filename(const char *filename)
+{
+  if (strlen(filename) < FILENAME_SHORT_LEN)
+    return 0;
+  return (strncmp(filename + FILENAME_PREFIX_LEN, "_" FILENAME_COOKIE "_", sizeof(FILENAME_COOKIE) -1 + 2) == 0);
+}
+
+static void lfn_translate(const char *path, const char *name, char *new_name, int len)
+{
+  if (!lfn_is_hashed_filename(name)) {
+    strncpy(new_name, name, len);
+    return;
+  }
+
+  char buf[PATH_MAX];
+
+  snprintf(buf, sizeof(buf), "%s/%s", path, name);
+  int r = sys_getxattr(buf, "user.ceph._lfn", new_name, sizeof(buf) - 1);
+  if (r < 0)
+    strncpy(new_name, name, len);
+  else
+    buf[r] = '\0';
+  return;
+}
+
+static int prepare_short_path(const char *pathname, char *short_path, const char **pfilename, int *is_lfn)
+{
+  const char *filename = strrchr(pathname, '/');
+  int path_len;
+  int len;
+
+  if (!filename)
+    filename = pathname;
+  else
+    filename++;
+
+  *pfilename = filename;
+
+  len = strlen(filename);
+  *is_lfn = (len >= (int)FILENAME_PREFIX_LEN);
+  if (!*is_lfn)
+    return 0;
+
+  path_len = filename - pathname;
+  if (short_path)
+    memcpy(short_path, pathname, path_len);
+
+  return path_len;
+}
+
+int lfn_get(const char *pathname, char *short_fn, int len, const char **long_fn, int *exist, int *is_lfn)
+{
+  int i = 0;
+  const char *filename;
+  int path_len;
+
+  path_len = prepare_short_path(pathname, short_fn, &filename, is_lfn);
+  if (!*is_lfn)
+    return 0;
+
+  *long_fn = filename;
+
+  *exist = 0;
+
+  path_len = filename - pathname;
+
+  while (1) {
+    char buf[PATH_MAX];
+    int r;
+
+    build_filename(&short_fn[path_len], len - path_len, filename, i);
+    r = getxattr(short_fn, "user.ceph._lfn", buf, sizeof(buf));
+    if (r < 0)
+      r = -errno;
+    if (r > 0) {
+      buf[MAX((int)sizeof(buf)-1, r)] = '\0';
+      if (strcmp(buf, filename) == 0) { // a match?
+        *exist = 1;
+        return i;
+      }
+    }
+    switch (r) {
+    case -ENOENT:
+      return i;
+    case -ERANGE:
+      assert(0); // shouldn't happen
+    default:
+      break;
+    }
+    i++;
+  }
+
+  return 0; // unreachable anyway
+}
+
+static int lfn_find(const char *pathname, char *new_pathname, int len)
+{
+  const char *long_fn;
+  int r, exist;
+  int is_lfn;
+
+  r = lfn_get(pathname, new_pathname, len, &long_fn, &exist, &is_lfn);
+  if (r < 0)
+    return r;
+  if (!is_lfn)
+    strncpy(new_pathname, pathname, len);
+  else if (!exist)
+    return -ENOENT;
+  return 0;
+}
+
+static int lfn_getxattr(const char *fn, const char *name, void *val, size_t size)
+{
+  char new_fn[PATH_MAX];
+  int r;
+
+  r = lfn_find(fn, new_fn, sizeof(new_fn));
+  if (r < 0)
+    return r;
+  return sys_getxattr(new_fn, name, val, size);
+}
+
+static int lfn_setxattr(const char *fn, const char *name, const void *val, size_t size)
+{
+  char new_fn[PATH_MAX];
+  int r;
+
+  r = lfn_find(fn, new_fn, sizeof(new_fn));
+  if (r < 0)
+    return r;
+  return sys_setxattr(new_fn, name, val, size);
+}
+
+static int lfn_removexattr(const char *fn, const char *name)
+{
+  char new_fn[PATH_MAX];
+  int r;
+
+  r = lfn_find(fn, new_fn, sizeof(new_fn));
+  if (r < 0)
+    return r;
+  return sys_removexattr(new_fn, name);
+}
+
+int lfn_listxattr(const char *fn, char *names, size_t len)
+{
+  char new_fn[PATH_MAX];
+  int r;
+
+  r = lfn_find(fn, new_fn, sizeof(new_fn));
+  if (r < 0)
+    return r;
+  return sys_listxattr(new_fn, names, len);
+}
+
+static int lfn_truncate(const char *fn, off_t length)
+{
+ char new_fn[PATH_MAX];
+  int r;
+
+  r = lfn_find(fn, new_fn, sizeof(new_fn));
+  if (r < 0)
+    return r;
+  r = ::truncate(fn, length);
+  if (r < 0)
+    return -errno;
+  return r;
+}
+
+static int lfn_stat(const char *fn, struct stat *buf)
+{
+  char new_fn[PATH_MAX];
+  int r;
+
+  r = lfn_find(fn, new_fn, sizeof(new_fn));
+  if (r < 0)
+    return r;
+  return ::stat(new_fn, buf);
+}
+
+static int lfn_open(const char *pathname, int flags, mode_t mode)
+{
+  const char *long_fn;
+  char short_fn[PATH_MAX];
+  int r, fd, exist;
+  int is_lfn;
+
+  r = lfn_get(pathname, short_fn, sizeof(short_fn), &long_fn, &exist, &is_lfn);
+  if (r < 0)
+    return r;
+  if (!is_lfn)
+    return open(pathname, flags);
+
+  r = ::open(short_fn, flags, mode);
+
+  if (r < 0)
+    return r;
+
+  fd = r;
+
+  if (flags & O_CREAT) {
+    r = fsetxattr(fd, "user.ceph._lfn", long_fn, strlen(long_fn), 0);
+    if (r < 0) {
+      close(fd);
+      return r;
+    }
+  }
+  return fd;
+}
+
+static int lfn_open(const char *pathname, int flags)
+{
+  return lfn_open(pathname, flags, 0);
+}
+
+static int lfn_link(const char *oldpath, const char *newpath)
+{
+  char short_fn_old[PATH_MAX];
+  char short_fn_new[PATH_MAX];
+  int exist, is_lfn;
+  const char *filename;
+  int r;
+
+  r = lfn_get(oldpath, short_fn_old, sizeof(short_fn_old), &filename, &exist, &is_lfn);
+  if (r < 0)
+    return r;
+  if (!exist)
+    return -ENOENT;
+  if (!is_lfn)
+    strncpy(short_fn_old, oldpath, sizeof(short_fn_old));
+
+  r = lfn_get(newpath, short_fn_new, sizeof(short_fn_new), &filename, &exist, &is_lfn);
+  if (r < 0)
+    return r;
+  if (exist)
+    return -EEXIST;
+  if (!is_lfn)
+    strncpy(short_fn_new, newpath, sizeof(short_fn_new));
+
+  r = link(short_fn_old, short_fn_new);
+  if (r < 0)
+    return -errno;
+  return 0;
+}
+
+static int lfn_unlink(const char *pathname)
+{
+  const char *filename;
+  char short_fn[PATH_MAX];
+  char short_fn2[PATH_MAX];
+  int r, i, exist, err;
+  int path_len;
+  int is_lfn;
+
+  r = lfn_get(pathname, short_fn, sizeof(short_fn), &filename, &exist, &is_lfn);
+  if (r < 0)
+    return r;
+  if (!is_lfn)
+    return unlink(pathname);
+  if (!exist) {
+    errno = ENOENT;
+    return -1;
+  }
+
+  err = unlink(short_fn);
+  if (err < 0)
+    return err;
+
+
+  path_len = filename - pathname;
+  memcpy(short_fn2, pathname, path_len);
+
+  for (i = r + 1; ; i++) {
+    struct stat buf;
+    int ret;
+
+    build_filename(&short_fn2[path_len], sizeof(short_fn2) - path_len, filename, i);
+    ret = stat(short_fn2, &buf);
+    if (ret < 0) {
+      if (i == r + 1)
+        return 0;
+
+      break;
+    }
+  }
+
+  build_filename(&short_fn2[path_len], sizeof(short_fn2) - path_len, filename, i - 1);
+  generic_dout(0) << "renaming " << short_fn2 << " -> " << short_fn << dendl;
+
+  if (rename(short_fn2, short_fn) < 0) {
+    generic_derr << "ERROR: could not rename " << short_fn2 << " -> " << short_fn << dendl;
+    assert(0);
+  }
+
+  return 0;
+}
+
 static void get_raw_xattr_name(const char *name, int i, char *raw_name, int raw_len)
 {
   int r;
@@ -215,7 +548,7 @@ int do_getxattr_len(const char *fn, const char *name)
 
   do {
     get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
-    r = sys_getxattr(fn, raw_name, 0, 0);
+    r = lfn_getxattr(fn, raw_name, 0, 0);
     if (!i && r < 0) {
       return r;
     }
@@ -244,7 +577,7 @@ int do_getxattr(const char *fn, const char *name, void *val, size_t size)
     get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
     size -= chunk_size;
 
-    r = sys_getxattr(fn, raw_name, (char *)val + pos, chunk_size);
+    r = lfn_getxattr(fn, raw_name, (char *)val + pos, chunk_size);
     if (r < 0) {
       ret = r;
       break;
@@ -262,7 +595,7 @@ int do_getxattr(const char *fn, const char *name, void *val, size_t size)
        exactly one block */
     if (chunk_size == ATTR_MAX_BLOCK_LEN) {
       get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
-      r = sys_getxattr(fn, raw_name, 0, 0);
+      r = lfn_getxattr(fn, raw_name, 0, 0);
       if (r > 0) { // there's another chunk.. the original buffer was too small
         ret = -ERANGE;
       }
@@ -282,7 +615,7 @@ int do_setxattr(const char *fn, const char *name, const void *val, size_t size)
     get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
     size -= chunk_size;
 
-    int r = sys_setxattr(fn, raw_name, (char *)val + pos, chunk_size);
+    int r = lfn_setxattr(fn, raw_name, (char *)val + pos, chunk_size);
     if (r < 0) {
       ret = r;
       break;
@@ -296,7 +629,7 @@ int do_setxattr(const char *fn, const char *name, const void *val, size_t size)
      before) */
   if (ret >= 0 && chunk_size == ATTR_MAX_BLOCK_LEN) {
     get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
-    sys_removexattr(fn, raw_name);
+    lfn_removexattr(fn, raw_name);
   }
   
   return ret;
@@ -309,7 +642,7 @@ int do_removexattr(const char *fn, const char *name) {
 
   do {
     get_raw_xattr_name(name, i, raw_name, sizeof(raw_name));
-    r = sys_removexattr(fn, raw_name);
+    r = lfn_removexattr(fn, raw_name);
     if (!i && r < 0) {
       return r;
     }
@@ -322,9 +655,9 @@ int do_listxattr(const char *fn, char *names, size_t len) {
   int r;
 
   if (!len)
-   return sys_listxattr(fn, names, len);
+   return lfn_listxattr(fn, names, len);
 
-  r = sys_listxattr(fn, 0, 0);
+  r = lfn_listxattr(fn, 0, 0);
   if (r < 0)
     return r;
 
@@ -333,7 +666,7 @@ int do_listxattr(const char *fn, char *names, size_t len) {
   if (!full_buf)
     return -ENOMEM;
 
-  r = sys_listxattr(fn, full_buf, total_len);
+  r = lfn_listxattr(fn, full_buf, total_len);
   if (r < 0)
     return r;
 
@@ -1087,7 +1420,7 @@ int FileStore::_sanity_check_fs()
 
 int FileStore::read_op_seq(const char *fn, uint64_t *seq)
 {
-  int op_fd = ::open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR, 0644);
+  int op_fd = lfn_open(current_op_seq_fn.c_str(), O_CREAT|O_RDWR, 0644);
   if (op_fd < 0)
     return -errno;
   char s[40];
@@ -1261,7 +1594,7 @@ int FileStore::mount()
       // roll back
       char s[PATH_MAX];
       snprintf(s, sizeof(s), "%s/" COMMIT_SNAP_ITEM, basedir.c_str(), (long long unsigned)cp);
-      vol_args.fd = ::open(s, O_RDONLY);
+      vol_args.fd = lfn_open(s, O_RDONLY);
       if (vol_args.fd < 0) {
        ret = -errno;
        derr << "FileStore::mount: error opening '" << s << "': "
@@ -1291,7 +1624,7 @@ int FileStore::mount()
   }
   initial_op_seq = 0;
 
-  current_fd = ::open(current_fn.c_str(), O_RDONLY);
+  current_fd = lfn_open(current_fn.c_str(), O_RDONLY);
   if (current_fd < 0) {
     derr << "FileStore::mount: error opening: " << current_fn << ": "
         << cpp_strerror(ret) << dendl;
@@ -2144,7 +2477,7 @@ int FileStore::stat(coll_t cid, const sobject_t& oid, struct stat *st)
 {
   char fn[PATH_MAX];
   get_coname(cid, oid, fn, sizeof(fn));
-  int r = ::stat(fn, st);
+  int r = lfn_stat(fn, st);
   if (r < 0)
     r = -errno;
   dout(10) << "stat " << fn << " = " << r << " (size " << st->st_size << ")" << dendl;
@@ -2160,7 +2493,7 @@ int FileStore::read(coll_t cid, const sobject_t& oid,
 
   dout(15) << "read " << fn << " " << offset << "~" << len << dendl;
 
-  int fd = ::open(fn, O_RDONLY);
+  int fd = lfn_open(fn, O_RDONLY);
   if (fd < 0) {
     int err = errno;
     dout(10) << "FileStore::read(" << fn << "): open error "
@@ -2214,7 +2547,7 @@ int FileStore::fiemap(coll_t cid, const sobject_t& oid,
   dout(15) << "fiemap " << fn << " " << offset << "~" << len << dendl;
 
   int r;
-  int fd = ::open(fn, O_RDONLY);
+  int fd = lfn_open(fn, O_RDONLY);
   if (fd < 0) {
     char buf[80];
     dout(10) << "read couldn't open " << fn << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
@@ -2277,7 +2610,7 @@ int FileStore::_remove(coll_t cid, const sobject_t& oid)
   char fn[PATH_MAX];
   get_coname(cid, oid, fn, sizeof(fn));
   dout(15) << "remove " << fn << dendl;
-  int r = ::unlink(fn);
+  int r = ::lfn_unlink(fn);
   if (r < 0) r = -errno;
   dout(10) << "remove " << fn << " = " << r << dendl;
   return r;
@@ -2288,8 +2621,7 @@ int FileStore::_truncate(coll_t cid, const sobject_t& oid, uint64_t size)
   char fn[PATH_MAX];
   get_coname(cid, oid, fn, sizeof(fn));
   dout(15) << "truncate " << fn << " size " << size << dendl;
-  int r = ::truncate(fn, size);
-  if (r < 0) r = -errno;
+  int r = lfn_truncate(fn, size);
   dout(10) << "truncate " << fn << " size " << size << " = " << r << dendl;
   return r;
 }
@@ -2303,7 +2635,7 @@ int FileStore::_touch(coll_t cid, const sobject_t& oid)
   dout(15) << "touch " << fn << dendl;
 
   int flags = O_WRONLY|O_CREAT;
-  int fd = ::open(fn, flags, 0644);
+  int fd = lfn_open(fn, flags, 0644);
   int r;
   if (fd >= 0) {
     ::close(fd);
@@ -2328,7 +2660,7 @@ int FileStore::_write(coll_t cid, const sobject_t& oid,
 
   char buf[80];
   int flags = O_WRONLY|O_CREAT;
-  int fd = ::open(fn, flags, 0644);
+  int fd = lfn_open(fn, flags, 0644);
   if (fd < 0) {
     dout(0) << "write couldn't open " << fn << " flags " << flags << " errno " << errno << " " << strerror_r(errno, buf, sizeof(buf)) << dendl;
     r = -errno;
@@ -2388,12 +2720,12 @@ int FileStore::_clone(coll_t cid, const sobject_t& oldoid, const sobject_t& newo
   dout(15) << "clone " << ofn << " -> " << nfn << dendl;
 
   int o, n, r;
-  o = ::open(ofn, O_RDONLY);
+  o = lfn_open(ofn, O_RDONLY);
   if (o < 0) {
     r = -errno;
     goto out2;
   }
-  n = ::open(nfn, O_CREAT|O_TRUNC|O_WRONLY, 0644);
+  n = lfn_open(nfn, O_CREAT|O_TRUNC|O_WRONLY, 0644);
   if (n < 0) {
     r = -errno;
     goto out;
@@ -2489,12 +2821,12 @@ int FileStore::_clone_range(coll_t cid, const sobject_t& oldoid, const sobject_t
 
   int r;
   int o, n;
-  o = ::open(ofn, O_RDONLY);
+  o = lfn_open(ofn, O_RDONLY);
   if (o < 0) {
     r = -errno;
     goto out2;
   }
-  n = ::open(nfn, O_CREAT|O_WRONLY, 0644);
+  n = lfn_open(nfn, O_CREAT|O_WRONLY, 0644);
   if (n < 0) {
     r = -errno;
     goto out;
@@ -3184,6 +3516,7 @@ int FileStore::list_collections(vector<coll_t>& ls)
     return -errno;
 
   struct dirent sde, *de;
+  char new_name[PATH_MAX];
   while (::readdir_r(dir, &sde, &de) == 0) {
     if (!de)
       break;
@@ -3194,7 +3527,8 @@ int FileStore::list_collections(vector<coll_t>& ls)
         (de->d_name[1] == '.' &&
          de->d_name[2] == '\0')))
       continue;
-    ls.push_back(coll_t(de->d_name));
+    lfn_translate(fn, de->d_name, new_name, sizeof(new_name));
+    ls.push_back(coll_t(new_name));
   }
   
   ::closedir(dir);
@@ -3208,7 +3542,7 @@ int FileStore::collection_stat(coll_t c, struct stat *st)
   char fn[PATH_MAX];
   get_cdir(c, fn, sizeof(fn));
   dout(15) << "collection_stat " << fn << dendl;
-  int r = ::stat(fn, st);
+  int r = lfn_stat(fn, st);
   if (r < 0) r = -errno;
   dout(10) << "collection_stat " << fn << " = " << r << dendl;
   return r;
@@ -3259,19 +3593,19 @@ int FileStore::collection_list_partial(coll_t c, snapid_t seq, vector<sobject_t>
 {  
   if (fake_collections) return collections.collection_list(c, ls);
 
-  char buf[PATH_MAX];
-  get_cdir(c, buf, sizeof(buf));
+  char dir_name[PATH_MAX], buf[PATH_MAX];
+  get_cdir(c, dir_name, sizeof(dir_name));
 
   DIR *dir = NULL;
 
   struct dirent *de;
   bool end;
   
-  dir = ::opendir(buf);
+  dir = ::opendir(dir_name);
 
   if (!dir) {
     int err = -errno;
-    dout(0) << "error opening directory " << buf << dendl;
+    dout(0) << "error opening directory " << dir_name << dendl;
     return err;
   }
 
@@ -3280,6 +3614,8 @@ int FileStore::collection_list_partial(coll_t c, snapid_t seq, vector<sobject_t>
     *handle = 0;
   }
 
+  char new_name[PATH_MAX];
+
   int i=0;
   while (i < max_count) {
     errno = 0;
@@ -3301,8 +3637,9 @@ int FileStore::collection_list_partial(coll_t c, snapid_t seq, vector<sobject_t>
       continue;
     }
     //cout << "  got object " << de->d_name << std::endl;
+    lfn_translate(dir_name, de->d_name, new_name, sizeof(new_name));
     sobject_t o;
-    if (parse_object(de->d_name, o)) {
+    if (parse_object(new_name, o)) {
       if (o.snap >= seq) {
        ls.push_back(o);
        i++;
@@ -3324,11 +3661,11 @@ int FileStore::collection_list(coll_t c, vector<sobject_t>& ls)
 {  
   if (fake_collections) return collections.collection_list(c, ls);
 
-  char buf[PATH_MAX];
-  get_cdir(c, buf, sizeof(buf));
-  dout(10) << "collection_list " << buf << dendl;
+  char dir_name[PATH_MAX], buf[PATH_MAX], new_name[PATH_MAX];
+  get_cdir(c, dir_name, sizeof(dir_name));
+  dout(10) << "collection_list " << dir_name << dendl;
 
-  DIR *dir = ::opendir(buf);
+  DIR *dir = ::opendir(dir_name);
   if (!dir)
     return -errno;
   
@@ -3344,7 +3681,8 @@ int FileStore::collection_list(coll_t c, vector<sobject_t>& ls)
       continue;
     //cout << "  got object " << de->d_name << std::endl;
     sobject_t o;
-    if (parse_object(de->d_name, o)) {
+    lfn_translate(dir_name, de->d_name, new_name, sizeof(new_name));
+    if (parse_object(new_name, o)) {
       inolist.push_back(pair<ino_t,sobject_t>(de->d_ino, o));
       ls.push_back(o);
     }
@@ -3402,7 +3740,7 @@ int FileStore::_collection_add(coll_t c, coll_t cid, const sobject_t& o)
   char of[PATH_MAX];
   get_coname(cid, o, of, sizeof(of));
   dout(15) << "collection_add " << cof << " " << of << dendl;
-  int r = ::link(of, cof);
+  int r = lfn_link(of, cof);
   if (r < 0) r = -errno;
   dout(10) << "collection_add " << cof << " " << of << " = " << r << dendl;
   return r;
@@ -3415,7 +3753,7 @@ int FileStore::_collection_remove(coll_t c, const sobject_t& o)
   char cof[PATH_MAX];
   get_coname(c, o, cof, sizeof(cof));
   dout(15) << "collection_remove " << cof << dendl;
-  int r = ::unlink(cof);
+  int r = lfn_unlink(cof);
   if (r < 0) r = -errno;
   dout(10) << "collection_remove " << cof << " = " << r << dendl;
   return r;