]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
FileStore: CollectionIndex, HashIndex, IndexManager
authorSamuel Just <samuel.just@dreamhost.com>
Tue, 9 Aug 2011 22:45:12 +0000 (15:45 -0700)
committerSamuel Just <samuel.just@dreamhost.com>
Tue, 30 Aug 2011 00:43:05 +0000 (17:43 -0700)
Adds ColletionIndex, an interface for collection indexing
systems, and HashIndex, a mechanism for organising a prehashed
collection.

Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/common/config.cc
src/common/config.h
src/os/CollectionIndex.h [new file with mode: 0644]
src/os/HashIndex.cc [new file with mode: 0644]
src/os/HashIndex.h [new file with mode: 0644]
src/os/IndexManager.cc [new file with mode: 0644]
src/os/IndexManager.h [new file with mode: 0644]
src/os/LFNIndex.cc [new file with mode: 0644]
src/os/LFNIndex.h [new file with mode: 0644]
src/osd/osd_types.h

index 161058984ca7da14418893b8b3fcfad7106087a0..0c4e3b7b6859d35723eff8d9283ddf0b11d24438 100644 (file)
@@ -407,6 +407,8 @@ struct config_option config_optionsp[] = {
   OPTION(filestore_op_thread_suicide_timeout, OPT_INT, 180),
   OPTION(filestore_commit_timeout, OPT_FLOAT, 600),
   OPTION(filestore_fiemap_threshold, OPT_INT, 4096),
+  OPTION(filestore_merge_threshold, OPT_INT, 10),
+  OPTION(filestore_split_multiple, OPT_INT, 2),
   OPTION(journal_dio, OPT_BOOL, true),
   OPTION(journal_block_align, OPT_BOOL, true),
   OPTION(journal_max_write_bytes, OPT_INT, 10 << 20),
index b5d7fb347226f2772c021dbe65c6f759c9a52056..cc9044cc2cb7c588cca5e50095e435c6760faa00 100644 (file)
@@ -535,6 +535,8 @@ public:
   int filestore_op_thread_suicide_timeout;
   float filestore_commit_timeout;
   int filestore_fiemap_threshold;
+  int filestore_merge_threshold;
+  int filestore_split_multiple;
 
   // journal
   bool journal_dio;
diff --git a/src/os/CollectionIndex.h b/src/os/CollectionIndex.h
new file mode 100644 (file)
index 0000000..8697779
--- /dev/null
@@ -0,0 +1,137 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef OS_COLLECTIONINDEX_H
+#define OS_COLLECTIONINDEX_H
+
+#include <string>
+#include <vector>
+#include <tr1/memory>
+
+#include "osd/osd_types.h"
+#include "include/object.h"
+#include "ObjectStore.h"
+
+/**
+ * CollectionIndex provides an interface for manipulating indexed colelctions
+ */
+class CollectionIndex {
+protected:
+  /** 
+   * Object encapsulating a returned path.
+   *
+   * A path to an object (existent or non-existent) becomes invalid
+   * when a different object is created in the index.  Path stores
+   * a shared_ptr to the CollectionIndex to keep the index alive
+   * during its lifetime.
+   * @see IndexManager
+   * @see self_ref
+   * @see set_ref
+   */
+  class Path {
+  public:
+    /// Returned path
+    string full_path;
+    /// Ref to parent Index
+    std::tr1::shared_ptr<CollectionIndex> parent_ref;
+    /// Constructor
+    Path(
+      string path,                              ///< [in] Path to return.
+      std::tr1::weak_ptr<CollectionIndex> ref)  ///< [in] weak_ptr to parent.
+      : full_path(path), parent_ref(ref) {}
+      
+    /// Getter for the stored path.
+    const char *path() const { return full_path.c_str(); }
+  };
+ public:
+  /// Type of returned paths
+  typedef std::tr1::shared_ptr<Path> IndexedPath;
+
+  /** 
+   * For setting the internal weak_ptr to a shared_ptr to this.
+   *
+   * @see IndexManager
+   */
+  virtual void set_ref(std::tr1::shared_ptr<CollectionIndex> ref) = 0;
+  /** 
+   * Initializes the index.
+   *
+   * @return Error Code, 0 for success
+   */
+  virtual int init() = 0;
+
+  /**
+   * Cleanup before replaying journal
+   *
+   * @return Error Code, 0 for success
+   */
+  virtual int cleanup() = 0;
+
+  /**
+   * Call when a file is created using a path returned from lookup.
+   *
+   * @return Error Code, 0 for success
+   */
+  virtual int created(
+    const hobject_t &hoid, ///< [in] Created object.
+    const char *path       ///< [in] Path to created object.
+    ) = 0;
+
+  /**
+   * Removes hoid from the collection
+   *
+   * @return Error Code, 0 for success
+   */
+  virtual int unlink(
+    const hobject_t &hoid ///< [in] Object to remove
+    ) = 0;
+
+  /**
+   * Gets the IndexedPath for hoid.
+   *
+   * @return Error Code, 0 for success
+   */
+  virtual int lookup(
+    const hobject_t &hoid, ///< [in] Object to lookup
+    IndexedPath *path,    ///< [out] Path to object
+    int *exist            ///< [out] True if the object exists, else false
+    ) = 0;
+
+  /**
+   * List contents of the collection
+   *
+   * @param [in] seq Snapid to list.
+   * @param [in] max_count Max number to list (0 for no limit).
+   * @param [out] ls Container for listed objects.
+   * @param [in,out] last List handle.  0 for beginning.  Passing the same
+   * cookie location will cause the next max_count to be listed.
+   * @return Error code.  0 on success.
+   */
+  virtual int collection_list_partial(
+    snapid_t seq,
+    int max_count,
+    vector<hobject_t> *ls, 
+    collection_list_handle_t *last
+    ) = 0;
+
+  /// List contents of collection.
+  virtual int collection_list(
+    vector<hobject_t> *ls ///< [out] Listed Objects
+    ) = 0;
+
+  /// Virtual destructor
+  virtual ~CollectionIndex() {}
+};
+
+#endif
diff --git a/src/os/HashIndex.cc b/src/os/HashIndex.cc
new file mode 100644 (file)
index 0000000..6496153
--- /dev/null
@@ -0,0 +1,473 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include "include/buffer.h"
+#include "osd/osd_types.h"
+
+#include "HashIndex.h"
+
+const string HashIndex::SUBDIR_ATTR = "contents";
+const string HashIndex::IN_PROGRESS_OP_TAG = "in_progress_op";
+
+int HashIndex::cleanup() {
+  bufferlist bl;
+  int r = get_attr_path(vector<string>(), IN_PROGRESS_OP_TAG, bl);
+  if (r < 0) {
+    // No in progress operations!
+    return 0;
+  }
+  bufferlist::iterator i = bl.begin();
+  InProgressOp in_progress(i);
+  subdir_info_s info;
+  r = get_info(in_progress.path, &info);
+  if (r < 0)
+    return r;
+  if (in_progress.is_split())
+    return complete_split(in_progress.path, info);
+  else if (in_progress.is_merge())
+    return complete_merge(in_progress.path, info);
+  else
+    return -EINVAL;
+}
+
+int HashIndex::_init() {
+  subdir_info_s info;
+  vector<string> path;
+  return set_info(path, info);
+}
+
+/* LFNIndex virtual method implementations */
+int HashIndex::_created(const vector<string> &path,
+                       const hobject_t &hoid,
+                       const string &mangled_name) {
+  subdir_info_s info;
+  int r;
+  r = get_info(path, &info);
+  if (r < 0)
+    return r;
+  info.objs++;
+  r = set_info(path, info);
+  if (r < 0)
+    return r;
+
+  if (must_split(info)) {
+    int r = initiate_split(path, info);
+    if (r < 0)
+      return r;
+    return complete_split(path, info);
+  } else {
+    return 0;
+  }
+}
+
+int HashIndex::_remove(const vector<string> &path,
+                      const hobject_t &hoid,
+                      const string &mangled_name) {
+  int r;
+  r = remove_object(path, hoid);
+  if (r < 0)
+    return r;
+  subdir_info_s info;
+  r = get_info(path, &info);
+  if (r < 0)
+    return r;
+  info.objs--;
+  r = set_info(path, info);
+  if (r < 0)
+    return r;
+  if (must_merge(info)) {
+    r = initiate_merge(path, info);
+    if (r < 0)
+      return r;
+    return complete_merge(path, info);
+  } else {
+    return 0;
+  }
+}
+
+int HashIndex::_lookup(const hobject_t &hoid,
+                      vector<string> *path,
+                      string *mangled_name,
+                      int *exists_out) {
+  vector<string> path_comp;
+  get_path_components(hoid, &path_comp);
+  vector<string>::iterator next = path_comp.begin();
+  int r, exists;
+  while (1) {
+    r = path_exists(*path, &exists);
+    if (r < 0)
+      return r;
+    if (!exists) {
+      if (path->empty())
+       return -ENOENT;
+      path->pop_back();
+      break;
+    }
+    if (next == path_comp.end())
+      break;
+    path->push_back(*(next++));
+  }
+  return get_mangled_name(*path, hoid, mangled_name, exists_out);
+}
+
+static void split_handle(collection_list_handle_t handle, 
+                        uint32_t *hash, uint32_t *index) {
+  *hash = handle & 
+    ~((~static_cast<collection_list_handle_t>(0)) << (sizeof(handle) * 4));
+  *index = handle >> (sizeof(handle) * 4);
+}
+
+static collection_list_handle_t form_handle(uint32_t hash, uint32_t index) {
+  return ((static_cast<uint64_t>(index)) << sizeof(hash) * 8) + hash;
+}
+
+int HashIndex::_collection_list_partial(snapid_t seq, int max_count,
+                                       vector<hobject_t> *ls, 
+                                       collection_list_handle_t *last) {
+  vector<string> path;
+  uint32_t index, hash;
+  string lower_bound;
+  if (last) {
+    split_handle(*last, &hash, &index);
+    lower_bound = get_hash_str(hash);
+  }
+  int r = list(path, 
+              max_count ? &max_count : NULL, 
+              &seq,
+              last ? &lower_bound : NULL,
+              last ? &index : NULL, 
+              ls);
+  if (r < 0)
+    return r;
+  if (last && ls->size())
+    *last = form_handle(ls->rbegin()->hash, index);
+  return 0;
+}
+
+int HashIndex::_collection_list(vector<hobject_t> *ls) {
+  vector<string> path;
+  return list(path, NULL, NULL, NULL, NULL, ls);
+}
+
+int HashIndex::start_split(const vector<string> &path) {
+  bufferlist bl;
+  InProgressOp op_tag(InProgressOp::SPLIT, path);
+  op_tag.encode(bl);
+  return add_attr_path(vector<string>(), IN_PROGRESS_OP_TAG, bl); 
+}
+
+int HashIndex::start_merge(const vector<string> &path) {
+  bufferlist bl;
+  InProgressOp op_tag(InProgressOp::MERGE, path);
+  op_tag.encode(bl);
+  return add_attr_path(vector<string>(), IN_PROGRESS_OP_TAG, bl); 
+}
+
+int HashIndex::end_split_or_merge(const vector<string> &path) {
+  return remove_attr_path(vector<string>(), IN_PROGRESS_OP_TAG);
+}
+
+int HashIndex::get_info(const vector<string> &path, subdir_info_s *info) {
+  bufferlist buf;
+  int r = get_attr_path(path, SUBDIR_ATTR, buf);
+  if (r < 0)
+    return r;
+  bufferlist::iterator bufiter = buf.begin();
+  info->decode(bufiter);
+  assert(path.size() == (unsigned)info->hash_level);
+  return 0;
+}
+
+int HashIndex::set_info(const vector<string> &path, const subdir_info_s &info) {
+  bufferlist buf;
+  assert(path.size() == (unsigned)info.hash_level);
+  info.encode(buf);
+  return add_attr_path(path, SUBDIR_ATTR, buf);
+}
+
+bool HashIndex::must_merge(const subdir_info_s &info) {
+  return (info.hash_level > 0 &&
+         info.objs < (unsigned)merge_threshold &&
+         info.subdirs == 0);
+}
+
+bool HashIndex::must_split(const subdir_info_s &info) {
+  return (info.hash_level < (unsigned)MAX_HASH_LEVEL &&
+         info.objs > ((unsigned)merge_threshold * 32));
+                           
+}
+
+int HashIndex::initiate_merge(const vector<string> &path, subdir_info_s info) {
+  return start_merge(path);
+}
+
+int HashIndex::complete_merge(const vector<string> &path, subdir_info_s info) {
+  vector<string> dst = path;
+  dst.pop_back();
+  subdir_info_s dstinfo;
+  int r, exists;
+  r = path_exists(path, &exists);
+  if (r < 0)
+    return r;
+  r = get_info(dst, &dstinfo);
+  if (r < 0)
+    return r;
+  if (exists) {
+    r = move_objects(path, dst);
+    if (r < 0)
+      return r;
+    
+    map<string,hobject_t> objects_dst;
+    r = list_objects(dst, 0, 0, &objects_dst);
+    if (r < 0)
+      return r;
+    set<string> subdirs;
+    r = list_subdirs(dst, &subdirs);
+    if (r < 0)
+      return r;
+    dstinfo.objs = objects_dst.size();
+    dstinfo.subdirs = subdirs.size() - 1;
+    r = set_info(dst, dstinfo);
+    if (r < 0)
+      return r;
+    r = remove_path(path);
+    if (r < 0)
+      return r;
+  }
+  if (must_merge(dstinfo)) {
+    r = initiate_merge(dst, dstinfo);
+    if (r < 0)
+      return r;
+    r = fsync_dir(dst);
+    if (r < 0)
+      return r;
+    return complete_merge(dst, dstinfo);
+  }
+  r = fsync_dir(dst);
+  if (r < 0)
+    return r;
+  return end_split_or_merge(path);
+}
+
+int HashIndex::initiate_split(const vector<string> &path, subdir_info_s info) {
+  return start_split(path);
+}
+
+int HashIndex::complete_split(const vector<string> &path, subdir_info_s info) {
+  int level = info.hash_level;
+  map<string, hobject_t> objects;
+  vector<string> dst = path;
+  int r;
+  dst.push_back("");
+  r = list_objects(path, 0, 0, &objects);
+  if (r < 0)
+    return r;
+  set<string> subdirs;
+  r = list_subdirs(path, &subdirs);
+  if (r < 0)
+    return r;
+  map<string, map<string, hobject_t> > mapped;
+  map<string, hobject_t> moved;
+  int num_moved = 0;
+  for (map<string, hobject_t>::iterator i = objects.begin();
+       i != objects.end();
+       ++i) {
+    vector<string> new_path;
+    get_path_components(i->second, &new_path);
+    mapped[new_path[level]][i->first] = i->second;
+  }
+  for (map<string, map<string, hobject_t> >::iterator i = mapped.begin();
+       i != mapped.end();
+       ) {
+    dst[level] = i->first;
+    /* If the info already exists, it must be correct,
+     * we may be picking up a partially finished split */
+    subdir_info_s temp;
+    // subdir has already been fully copied
+    if (subdirs.count(i->first) && !get_info(dst, &temp)) {
+      for (map<string, hobject_t>::iterator j = i->second.begin();
+          j != i->second.end();
+          ++j) {
+       moved[j->first] = j->second;
+       num_moved++;
+       objects.erase(j->first);
+      }
+      ++i;
+      continue;
+    }
+
+    subdir_info_s info_new;
+    info_new.objs = i->second.size();
+    info_new.subdirs = 0;
+    info_new.hash_level = level + 1;
+    if (must_merge(info_new) && !subdirs.count(i->first)) {
+      mapped.erase(i++);
+      continue;
+    }
+
+    // Subdir doesn't yet exist
+    if (!subdirs.count(i->first)) {
+      info.subdirs += 1;
+      r = create_path(dst);
+      if (r < 0)
+       return r;
+    } // else subdir has been created but only partially copied
+
+    for (map<string, hobject_t>::iterator j = i->second.begin();
+        j != i->second.end();
+        ++j) {
+      moved[j->first] = j->second;
+      num_moved++;
+      objects.erase(j->first);
+      r = link_object(path, dst, j->second, j->first);
+      // May be a partially finished split
+      if (r < 0 && r != -EEXIST) {
+       return r;
+      }
+    }
+
+    r = fsync_dir(dst);
+    if (r < 0)
+      return r;
+
+    // Presence of info must imply that all objects have been copied
+    r = set_info(dst, info_new);
+    if (r < 0)
+      return r;
+
+    r = fsync_dir(dst);
+    if (r < 0)
+      return r;
+
+    ++i;
+  }
+  r = remove_objects(path, moved, &objects);
+  if (r < 0)
+    return r;
+  info.objs = objects.size();
+  r = set_info(path, info);
+  if (r < 0)
+    return r;
+  r = fsync_dir(path);
+  if (r < 0)
+    return r;
+  return end_split_or_merge(path);
+}
+
+void HashIndex::get_path_components(const hobject_t &hoid,
+                                   vector<string> *path) {
+  char buf[MAX_HASH_LEVEL + 1];
+  snprintf(buf, sizeof(buf), "%.*X", MAX_HASH_LEVEL, hoid.hash);
+
+  // Path components are the hex characters of hoid.hash in, least
+  // significant first
+  for (int i = 0; i < MAX_HASH_LEVEL; ++i) {
+    path->push_back(string(&buf[MAX_HASH_LEVEL - 1 - i], 1));
+  }
+}
+
+string HashIndex::get_hash_str(uint32_t hash) {
+  char buf[MAX_HASH_LEVEL + 1];
+  snprintf(buf, sizeof(buf), "%.*X", MAX_HASH_LEVEL, hash);
+  string retval;
+  for (int i = 0; i < MAX_HASH_LEVEL; ++i) {
+    retval.push_back(buf[MAX_HASH_LEVEL - 1 - i]);
+  }
+  return retval;
+}
+
+string HashIndex::get_path_str(const hobject_t &hoid) {
+  return get_hash_str(hoid.hash);
+}
+
+int HashIndex::list(const vector<string> &path,
+                   const int *max_count,
+                   const snapid_t *seq,
+                   const string *lower_bound,
+                   uint32_t *index,
+                   vector<hobject_t> *out) {
+  if (lower_bound)
+    assert(index);
+  vector<string> next_path = path;
+  next_path.push_back("");
+  set<string> hash_prefixes;
+  multimap<string, hobject_t> objects;
+  map<string, hobject_t> rev_objects;
+  int r;
+  int max = max_count ? *max_count : 0;
+  string cur_prefix;
+  for (vector<string>::const_iterator i = path.begin();
+       i != path.end();
+       ++i) {
+    cur_prefix.append(*i);
+  }
+
+  r = list_objects(path, 0, 0, &rev_objects);
+  if (r < 0)
+    return r;
+  for (map<string, hobject_t>::iterator i = rev_objects.begin();
+       i != rev_objects.end();
+       ++i) {
+    string hash_prefix = get_path_str(i->second);
+    if (lower_bound && hash_prefix < *lower_bound)
+      continue;
+    if (seq && i->second.snap < *seq)
+      continue;
+    hash_prefixes.insert(hash_prefix);
+    objects.insert(pair<string, hobject_t>(hash_prefix, i->second));
+  }
+  set<string> subdirs;
+  r = list_subdirs(path, &subdirs);
+  if (r < 0)
+    return r;
+  for (set<string>::iterator i = subdirs.begin();
+       i != subdirs.end();
+       ++i) {
+    string candidate = cur_prefix + *i;
+    if (lower_bound && candidate < lower_bound->substr(0, candidate.size()))
+      continue;
+    hash_prefixes.insert(cur_prefix + *i);
+  }
+
+  uint32_t counter = 0;
+  for (set<string>::iterator i = hash_prefixes.begin();
+       i != hash_prefixes.end() && (!max_count || max > 0);
+       ++i) {
+    multimap<string, hobject_t>::iterator j = objects.find(*i);
+    if (j != objects.end()) {
+      counter = 0;
+      for (; (!max_count || max > 0) && j != objects.end() && j->first == *i; ++j, ++counter) {
+       if (lower_bound && *lower_bound == *i && counter < *index)
+         continue;
+       out->push_back(j->second);
+       if (max_count)
+         max--;
+      }
+      if (index)
+       *index = counter;
+      continue;
+    } 
+
+    // subdir
+    *(next_path.rbegin()) = *(i->rbegin());
+    int old_size = out->size();
+    assert(next_path.size() > path.size());
+    r = list(next_path, max_count ? &max : NULL, seq, lower_bound, index, out);
+    if (r < 0)
+      return r;
+    if (max_count)
+      max -= out->size() - old_size;
+  }
+  return 0;
+}
diff --git a/src/os/HashIndex.h b/src/os/HashIndex.h
new file mode 100644 (file)
index 0000000..fb8fde5
--- /dev/null
@@ -0,0 +1,271 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#ifndef CEPH_HASHINDEX_H
+#define CEPH_HASHINDEX_H
+
+#include "include/buffer.h"
+#include "include/encoding.h"
+#include "LFNIndex.h"
+
+
+/**
+ * Implements collection prehashing.
+ *     (root) - 0 - 0
+ *                - 1
+ *                - E
+ *            - 1
+ *            - 2 - D - 0
+ *            .
+ *            .
+ *            .
+ *            - F - 0
+ * A file is located at the longest existing directory from the root 
+ * given by the hex characters in the hash beginning with the least
+ * significant.
+ * 
+ * ex: hobject_t("object", CEPH_NO_SNAP, 0xA4CEE0D2)
+ * would be located in (root)/2/D/0/
+ * 
+ * Subdirectories are created when the number of objects in a directory
+ * exceed 32*merge_threshhold.  The number of objects in a directory 
+ * is encoded as subdir_info_s in an xattr on the directory.
+ */
+class HashIndex : public LFNIndex {
+private:
+  /// Attribute name for storing subdir info @see subdir_info_s
+  static const string SUBDIR_ATTR;
+  /// Attribute name for storing in progress op tag
+  static const string IN_PROGRESS_OP_TAG;
+  /// Size (bits) in object hash
+  static const int PATH_HASH_LEN = 32;
+  /// Max length of hashed path
+  static const int MAX_HASH_LEVEL = (PATH_HASH_LEN/4);
+
+  /**
+   * Merges occur when the number of object drops below
+   * merge_threshold and splits occur when the number of objects
+   * exceeds 16 * merge_threshold * split_threshold.
+   */
+  int merge_threshold;
+  int split_threshold;
+
+  /// Encodes current subdir state for determining when to split/merge.
+  struct subdir_info_s {
+    uint64_t objs;       ///< Objects in subdir.
+    uint32_t subdirs;    ///< Subdirs in subdir.
+    uint32_t hash_level; ///< Hashlevel of subdir.
+
+    subdir_info_s() : objs(0), subdirs(0), hash_level(0) {}
+    
+    void encode(bufferlist &bl) const
+    {
+      __u8 v = 1;
+      ::encode(v, bl);
+      ::encode(objs, bl);
+      ::encode(subdirs, bl);
+      ::encode(hash_level, bl);
+    }
+    
+    void decode(bufferlist::iterator &bl)
+    {
+      __u8 v;
+      ::decode(v, bl);
+      assert(v == 1);
+      ::decode(objs, bl);
+      ::decode(subdirs, bl);
+      ::decode(hash_level, bl);
+    }
+  };
+
+  /// Encodes in progress split or merge
+  struct InProgressOp {
+    static const int SPLIT = 0;
+    static const int MERGE = 1;
+    int op;
+    vector<string> path;
+
+    InProgressOp(int op, const vector<string> &path) 
+      : op(op), path(path) {}
+
+    InProgressOp(bufferlist::iterator &bl) {
+      decode(bl);
+    }
+
+    bool is_split() const { return op == SPLIT; }
+    bool is_merge() const { return op == MERGE; }
+
+    void encode(bufferlist &bl) const {
+      __u8 v = 1;
+      ::encode(v, bl);
+      ::encode(op, bl);
+      ::encode(path, bl);
+    }
+
+    void decode(bufferlist::iterator &bl) {
+      __u8 v;
+      ::decode(v, bl);
+      assert(v == 1);
+      ::decode(op, bl);
+      ::decode(path, bl);
+    }
+  };
+    
+    
+public:
+  /// Constructor.
+  HashIndex(
+    const char *base_path, ///< [in] Path to the index root.
+    int merge_at,          ///< [in] Merge threshhold.
+    int split_at)         ///< [in] Split threshhold.
+    : LFNIndex(base_path), merge_threshold(merge_at),
+      split_threshold(split_at) {}
+
+  /// @see CollectionIndex
+  uint32_t collection_version() { return 1; }
+
+  /// @see CollectionIndex
+  int cleanup();
+       
+protected:
+  int _init();
+
+  int _created(
+    const vector<string> &path,
+    const hobject_t &hoid,
+    const string &mangled_name
+    );
+  int _remove(
+    const vector<string> &path,
+    const hobject_t &hoid,
+    const string &mangled_name
+    );
+  int _lookup(
+    const hobject_t &hoid,
+    vector<string> *path,
+    string *mangled_name,
+    int *exists
+    );
+  int _collection_list_partial(
+    snapid_t seq,
+    int max_count,
+    vector<hobject_t> *ls, 
+    collection_list_handle_t *last
+    );
+  int _collection_list(
+    vector<hobject_t> *ls
+    );
+private:
+  /// Tag root directory at beginning of split
+  int start_split(
+    const vector<string> &path ///< [in] path to split
+    ); ///< @return Error Code, 0 on success
+  /// Tag root directory at beginning of split
+  int start_merge(
+    const vector<string> &path ///< [in] path to merge
+    ); ///< @return Error Code, 0 on success
+  /// Remove tag at end of split or merge
+  int end_split_or_merge(
+    const vector<string> &path ///< [in] path to split or merged
+    ); ///< @return Error Code, 0 on success
+  /// Gets info from the xattr on the subdir represented by path
+  int get_info(
+    const vector<string> &path, ///< [in] Path from which to read attribute.
+    subdir_info_s *info                ///< [out] Attribute value
+    ); /// @return Error Code, 0 on success
+
+  /// Sets info to the xattr on the subdir represented by path
+  int set_info(
+    const vector<string> &path, ///< [in] Path on which to set attribute.
+    const subdir_info_s &info          ///< [in] Value to set
+    ); /// @return Error Code, 0 on success
+
+  /// Encapsulates logic for when to split.
+  bool must_merge(
+    const subdir_info_s &info ///< [in] Info to check
+    ); /// @return True if info must be merged, False otherwise
+
+  /// Encapsulates logic for when to merge.
+  bool must_split(
+    const subdir_info_s &info ///< [in] Info to check
+    ); /// @return True if info must be split, False otherwise
+
+  /// Initiates merge
+  int initiate_merge(
+    const vector<string> &path, ///< [in] Subdir to merge
+    subdir_info_s info         ///< [in] Info attached to path
+    ); /// @return Error Code, 0 on success
+
+  /// Completes merge
+  int complete_merge(
+    const vector<string> &path, ///< [in] Subdir to merge
+    subdir_info_s info         ///< [in] Info attached to path
+    ); /// @return Error Code, 0 on success
+
+  /// Initiate Split
+  int initiate_split(
+    const vector<string> &path, ///< [in] Subdir to split
+    subdir_info_s info         ///< [in] Info attached to path
+    ); /// @return Error Code, 0 on success
+
+  /// Completes Split
+  int complete_split(
+    const vector<string> &path, ///< [in] Subdir to split
+    subdir_info_s info        ///< [in] Info attached to path
+    ); /// @return Error Code, 0 on success
+
+  /// Determine path components from hoid hash
+  void get_path_components(
+    const hobject_t &hoid, ///< [in] Object for which to get path components
+    vector<string> *path   ///< [out] Path components for hoid.
+    );
+
+  /** 
+   * Get string representation of hobject_t/hash
+   *
+   * e.g: 0x01234567 -> "76543210"
+   */
+  string get_path_str(
+    const hobject_t &hoid ///< [in] Object to get hash string for
+    ); ///< @return Hash string for hoid.
+
+  /// Get string from hash, @see get_path_str
+  string get_hash_str(
+    uint32_t hash ///< [in] Hash to convert to a string.
+    ); ///< @return String representation of hash
+
+  /** 
+   * Recursively lists all objects in path.
+   *
+   * Lists all objects in path or a subdirectory of path which
+   * sort greater than *lower_bound and have snapid_t > *seq in order of 
+   * hash up to a max count of *max_count
+   *
+   * In case of multiple objects with the same hash, *index indicates the
+   * last listed.  (If non-null, the index of the last object listed will 
+   * be assigned to *index).
+   *
+   * max_count, seq, lower_bound optional, lower_bound iff index
+   */
+  int list(
+    const vector<string> &path, ///< [in] Path to list.
+    const int *max_count,      ///< [in] Max number to list (NULL if not needed)
+    const snapid_t *seq,       ///< [in] Snap to list (NULL if not needed)
+    const string *lower_bound, ///< [in] Last hash listed (NULL if not needed)
+    uint32_t *index,           ///< [in,out] last index (NULL iff !lower_bound)
+    vector<hobject_t> *out     ///< [out] Listed objects
+    ); ///< @return Error Code, 0 on success
+};
+
+#endif
diff --git a/src/os/IndexManager.cc b/src/os/IndexManager.cc
new file mode 100644 (file)
index 0000000..251599b
--- /dev/null
@@ -0,0 +1,126 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include <tr1/memory>
+#include <map>
+
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/config.h"
+#include "common/debug.h"
+#include "include/buffer.h"
+
+#include "IndexManager.h"
+#include "FlatIndex.h"
+#include "HashIndex.h"
+#include "FlatIndex.h"
+#include "CollectionIndex.h"
+
+int do_getxattr(const char *fn, const char *name, void *val, size_t size);
+int do_setxattr(const char *fn, const char *name, const void *val, size_t size);
+
+static int set_version(const char *path, uint32_t version) {
+  bufferlist bl;
+  ::encode(version, bl);
+  return do_setxattr(path, "user.cephos.collection_version", bl.c_str(), 
+                    bl.length());
+}
+
+static int get_version(const char *path, uint32_t *version) {
+  bufferptr bp(PATH_MAX);
+  int r = do_getxattr(path, "user.cephos.collection_version", 
+                     bp.c_str(), bp.length());
+  if (r < 0) {
+    if (r != -ENOENT) {
+      *version = 0;
+      return 0;
+    } else {
+      return r;
+    }
+  }
+  bp.set_length(r);
+  bufferlist bl;
+  bl.push_back(bp);
+  bufferlist::iterator i = bl.begin();
+  ::decode(*version, i);
+  return 0;
+}
+
+void IndexManager::put_index(coll_t c) {
+  Mutex::Locker l(lock);
+  assert(col_indices.count(c));
+  col_indices.erase(c);
+  cond.Signal();
+}
+
+int IndexManager::init_index(coll_t c, const char *path, uint32_t version) {
+  Mutex::Locker l(lock);
+  int r = set_version(path, version);
+  if (r < 0)
+    return r;
+  HashIndex index(path, g_conf->filestore_merge_threshold, 
+                 g_conf->filestore_split_multiple);
+  return index.init();
+}
+
+int IndexManager::build_index(coll_t c, const char *path, Index *index) {
+  int r;
+  if (g_conf->filestore_update_collections) {
+    // Need to check the collection generation
+    uint32_t version;
+    r = get_version(path, &version);
+    if (r < 0)
+      return r;
+
+    switch (version) {
+    case 0: {
+      *index = Index(new FlatIndex(path), 
+                    RemoveOnDelete(c, this));
+      return 0;
+    }
+    case 1: {
+      // Must be a HashIndex
+      *index = Index(new HashIndex(path, g_conf->filestore_merge_threshold,
+                                  g_conf->filestore_split_multiple), 
+                    RemoveOnDelete(c, this));
+      return 0;
+    }
+    default: assert(0);
+    }
+
+  } else {
+    // No need to check
+    *index = Index(new HashIndex(path, g_conf->filestore_merge_threshold,
+                                g_conf->filestore_split_multiple), 
+                  RemoveOnDelete(c, this));
+    return 0;
+  }
+}
+
+int IndexManager::get_index(coll_t c, const char *path, Index *index) {
+  Mutex::Locker l(lock);
+  while (1) {
+    if (!col_indices.count(c)) {
+      int r = build_index(c, path, index);
+      if (r < 0)
+       return r;
+      (*index)->set_ref(*index);
+      col_indices[c] = (*index);
+      break;
+    } else {
+      cond.Wait(lock);
+    }
+  }
+  return 0;
+}
diff --git a/src/os/IndexManager.h b/src/os/IndexManager.h
new file mode 100644 (file)
index 0000000..9627b35
--- /dev/null
@@ -0,0 +1,108 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+#ifndef OS_INDEXMANAGER_H
+#define OS_INDEXMANAGER_H
+
+#include <tr1/memory>
+#include <map>
+
+#include "common/Mutex.h"
+#include "common/Cond.h"
+#include "common/config.h"
+#include "common/debug.h"
+
+#include "CollectionIndex.h"
+#include "HashIndex.h"
+
+
+/// Public type for Index
+typedef std::tr1::shared_ptr<CollectionIndex> Index;
+/**
+ * Encapsulates mutual exclusion for CollectionIndexes.
+ *
+ * Allowing a modification (removal or addition of an object) to occur
+ * while a read is occuring (lookup of an object's path and use of
+ * that path) may result in the path becoming invalid.  Thus, during
+ * the lifetime of a CollectionIndex object and any paths returned
+ * by it, no other concurrent accesses may be allowed.
+ *
+ * This is enforced using shared_ptr.  A shared_ptr<CollectionIndex>
+ * is returned from get_index.  Any paths generated using that object
+ * carry a reference to the parrent index.  Once all
+ * shared_ptr<CollectionIndex> references have expired, the destructor
+ * removes the weak_ptr from col_indices and wakes waiters.
+ */
+class IndexManager {
+  Mutex lock; ///< Lock for Index Manager
+  Cond cond;  ///< Cond for waiters on col_indices
+
+  /// Currently in use CollectionIndices
+  map<coll_t,std::tr1::weak_ptr<CollectionIndex> > col_indices;
+
+  /// Cleans up state for c @see RemoveOnDelete
+  void put_index(
+    coll_t c ///< Put the index for c
+    );
+
+  /// Callback for shared_ptr release @see get_index
+  class RemoveOnDelete {
+  public:
+    coll_t c;
+    IndexManager *manager;
+    RemoveOnDelete(coll_t c, IndexManager *manager) : 
+      c(c), manager(manager) {}
+
+    void operator()(CollectionIndex *index) {
+      manager->put_index(c);
+    }
+  };
+
+  /**
+   * Index factory
+   *
+   * Encapsulates logic for handling legacy FileStore
+   * layouts
+   *
+   * @param [in] c Collection for which to get index
+   * @param [in] path Path to collection
+   * @param [out] index Index for c
+   * @return error code
+   */
+  int build_index(coll_t c, const char *path, Index *index);
+public:
+  /// Constructor
+  IndexManager() : lock("IndexManager lock") {}
+
+  /**
+   * Reserve and return index for c
+   *
+   * @param [in] c Collection for which to get index
+   * @param [in] path Path to collection
+   * @param [out] index Index for c
+   * @return error code
+   */
+  int get_index(coll_t c, const char *path, Index *index);
+
+  /**
+   * Initialize index for collection c at path
+   *
+   * @param [in] c Collection for which to init Index
+   * @param [in] path Path to collection
+   * @param [in] filestore_version version of containing FileStore
+   * @return error code
+   */
+  int init_index(coll_t c, const char *path, uint32_t filestore_version);
+};
+
+#endif
diff --git a/src/os/LFNIndex.cc b/src/os/LFNIndex.cc
new file mode 100644 (file)
index 0000000..d690303
--- /dev/null
@@ -0,0 +1,804 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+#include <string>
+#include <map>
+#include <set>
+#include <vector>
+#include <errno.h>
+#include <string.h>
+
+#include "osd/osd_types.h"
+#include "include/object.h"
+#include "common/config.h"
+#include "common/debug.h"
+#include "include/buffer.h"
+#include "common/ceph_crypto.h"
+
+#include "LFNIndex.h"
+using ceph::crypto::SHA1;
+
+#define DOUT_SUBSYS filestore
+#undef dout_prefix
+#define dout_prefix *_dout << "LFNIndex(" << get_base_path() << ") "
+
+
+const string LFNIndex::LFN_ATTR = "user.cephos.lfn";
+const string LFNIndex::PHASH_ATTR_PREFIX = "user.cephos.phash.";
+const string LFNIndex::SUBDIR_PREFIX = "DIR_";
+const string LFNIndex::FILENAME_COOKIE = "long";
+const int LFNIndex::FILENAME_PREFIX_LEN =  FILENAME_SHORT_LEN - FILENAME_HASH_LEN - 
+                                                               FILENAME_COOKIE.size() - 
+                                                               FILENAME_EXTRA;
+
+int do_getxattr(const char *fn, const char *name, void *val, size_t size);
+int do_setxattr(const char *fn, const char *name, const void *val, size_t size);
+int do_removexattr(const char *fn, const char *name);
+
+/* Public methods */
+
+void LFNIndex::set_ref(std::tr1::shared_ptr<CollectionIndex> ref) {
+  self_ref = ref;
+}
+
+int LFNIndex::init() {
+  return _init();
+}
+
+int LFNIndex::created(const hobject_t &hoid, const char *path) {
+  vector<string> path_comp;
+  string short_name;
+  int r;
+  r = decompose_full_path(path, &path_comp, 0, &short_name);
+  if (r < 0)
+    return r;
+  r = lfn_created(path_comp, hoid, short_name);
+  if (r < 0)
+    return r;
+  return _created(path_comp, hoid, short_name);
+}
+
+int LFNIndex::unlink(const hobject_t &hoid) {
+  vector<string> path;
+  string short_name;
+  int r;
+  r = _lookup(hoid, &path, &short_name, NULL);
+  if (r < 0)
+    return r;
+  r = _remove(path, hoid, short_name);
+  if (r < 0)
+    return r;
+  return 0;
+}
+
+int LFNIndex::lookup(const hobject_t &hoid,
+                    IndexedPath *out_path,
+                    int *exist) {
+  vector<string> path;
+  string short_name;
+  int r;
+  r = _lookup(hoid, &path, &short_name, exist);
+  if (r < 0)
+    return r;
+  string full_path = get_full_path(path, short_name);
+  struct stat buf;
+  r = ::stat(full_path.c_str(), &buf);
+  if (r < 0) {
+    if (errno == ENOENT) {
+      *exist = 0;
+    } else {
+      return -errno;
+    }
+  } else {
+    *exist = 1;
+  }
+  *out_path = IndexedPath(new Path(full_path, self_ref));
+  return 0;
+}
+
+int LFNIndex::collection_list_partial(snapid_t seq, int max_count,
+                                     vector<hobject_t> *ls, 
+                                     collection_list_handle_t *last) {
+  return _collection_list_partial(seq, max_count, ls, last);
+}
+
+int LFNIndex::collection_list(vector<hobject_t> *ls) {
+  return _collection_list(ls);
+}
+
+/* Derived class utility methods */
+
+int LFNIndex::fsync_dir(const vector<string> &path) {
+  int fd = ::open(get_full_path_subdir(path).c_str(), O_RDONLY);
+  if (fd < 0)
+    return -errno;
+  int r = ::fsync(fd);
+  TEMP_FAILURE_RETRY(::close(fd));
+  if (r < 0)
+    return -errno;
+  else
+    return 0;
+}
+
+int LFNIndex::link_object(const vector<string> &from,
+                         const vector<string> &to,
+                         const hobject_t &hoid,
+                         const string &from_short_name) {
+  int r;
+  string from_path = get_full_path(from, from_short_name);
+  string to_path;
+  r = lfn_get_name(to, hoid, 0, &to_path, 0);
+  if (r < 0)
+    return r;
+  r = ::link(from_path.c_str(), to_path.c_str());
+  if (r < 0)
+    return -errno;
+  else
+    return 0;
+}
+
+int LFNIndex::remove_objects(const vector<string> &dir,
+                            const map<string, hobject_t> &to_remove,
+                            map<string, hobject_t> *remaining) {
+  set<string> clean_chains;
+  for (map<string, hobject_t>::const_iterator to_clean = to_remove.begin();
+       to_clean != to_remove.end();
+       ++to_clean) {
+    if (!lfn_is_hashed_filename(to_clean->first)) {
+      int r = ::unlink(get_full_path(dir, to_clean->first).c_str());
+      if (r < 0)
+       return -errno;
+      continue;
+    }
+    if (clean_chains.count(lfn_get_short_name(to_clean->second, 0)))
+      continue;
+    set<int> holes;
+    map<int, pair<string, hobject_t> > chain;
+    for (int i = 0; ; ++i) {
+      string short_name = lfn_get_short_name(to_clean->second, i);
+      if (remaining->count(short_name)) {
+       chain[i] = *(remaining->find(short_name));
+      } else if (to_remove.count(short_name)) {
+       holes.insert(i);
+      } else {
+       break;
+      }
+    }
+
+    map<int, pair<string, hobject_t > >::reverse_iterator candidate = chain.rbegin();
+    for (set<int>::iterator i = holes.begin();
+        i != holes.end();
+        ++i) {
+      if (candidate == chain.rend() || *i > candidate->first) {
+       string remove_path_name = 
+         get_full_path(dir, lfn_get_short_name(to_clean->second, *i)); 
+       int r = ::unlink(remove_path_name.c_str());
+       if (r < 0)
+         return -errno;
+       continue;
+      }
+      string from = get_full_path(dir, candidate->second.first);
+      string to = get_full_path(dir, lfn_get_short_name(candidate->second.second, *i));
+      int r = ::rename(from.c_str(), to.c_str());
+      if (r < 0)
+       return -errno;
+      remaining->erase(candidate->second.first);
+      remaining->insert(pair<string, hobject_t>(
+                         lfn_get_short_name(candidate->second.second, *i),
+                                            candidate->second.second));
+      candidate++;
+    }
+    if (holes.size() > 0)
+      clean_chains.insert(lfn_get_short_name(to_clean->second, 0));
+  }
+  return 0;
+}
+
+int LFNIndex::move_objects(const vector<string> &from,
+                          const vector<string> &to) {
+  map<string, hobject_t> to_move;
+  int r;
+  r = list_objects(from, 0, NULL, &to_move);
+  if (r < 0)
+    return r;
+  for (map<string,hobject_t>::iterator i = to_move.begin();
+       i != to_move.end();
+       ++i) {
+    string from_path = get_full_path(from, i->first);
+    string to_path, to_name;
+    r = lfn_get_name(to, i->second, &to_name, &to_path, 0);
+    if (r < 0)
+      return r;
+    r = ::link(from_path.c_str(), to_path.c_str());
+    if (r < 0 && errno != EEXIST)
+      return -errno;
+    r = lfn_created(to, i->second, to_name);
+    if (r < 0)
+      return r;
+  }
+  r = fsync_dir(to);
+  if (r < 0)
+    return r;
+  for (map<string,hobject_t>::iterator i = to_move.begin();
+       i != to_move.end();
+       ++i) {
+    r = ::unlink(get_full_path(from, i->first).c_str());
+    if (r < 0)
+      return -errno;
+  }
+  return fsync_dir(from);
+}
+
+int LFNIndex::remove_object(const vector<string> &from,
+                           const hobject_t &hoid) {
+  string short_name;
+  int r, exist;
+  r = get_mangled_name(from, hoid, &short_name, &exist);
+  if (r < 0)
+    return r;
+  return lfn_unlink(from, hoid, short_name);
+}
+
+int LFNIndex::get_mangled_name(const vector<string> &from,
+                              const hobject_t &hoid,
+                              string *mangled_name, int *exists) {
+  return lfn_get_name(from, hoid, mangled_name, 0, exists);
+}
+
+int LFNIndex::list_objects(const vector<string> &to_list, int max_objs,
+                          long *handle, map<string, hobject_t> *out) {
+  string to_list_path = get_full_path_subdir(to_list);
+  DIR *dir = ::opendir(to_list_path.c_str());
+  char buf[PATH_MAX];
+  int r;
+  if (!dir) {
+    return -errno;
+  }
+
+  if (handle && *handle) {
+    seekdir(dir, *handle);
+  }
+
+  struct dirent *de;
+  int listed = 0;
+  bool end = false;
+  while (!::readdir_r(dir, reinterpret_cast<struct dirent*>(buf), &de)) {
+    if (!de) {
+      end = true;
+      break;
+    }
+    if (max_objs > 0 && listed >= max_objs) {
+      break;
+    }
+    if (de->d_name[0] == '.')
+      continue;
+    string short_name(de->d_name);
+    hobject_t obj;
+    if (lfn_is_object(short_name)) {
+      r = lfn_translate(to_list, short_name, &obj);
+      if (r < 0) {
+       r = -errno;
+       goto cleanup;
+      } else if (r > 0) {
+       string long_name = lfn_generate_object_name(obj);
+       if (!lfn_must_hash(long_name)) {
+         assert(long_name == short_name);
+       }
+       out->insert(pair<string, hobject_t>(short_name, obj));
+       ++listed;
+      } else {
+       continue;
+      }
+    }
+  }
+
+  if (handle && !end) {
+    *handle = telldir(dir);
+  }
+
+  r = 0;
+ cleanup:
+  ::closedir(dir);
+  return r;
+}
+
+int LFNIndex::list_subdirs(const vector<string> &to_list,
+                                 set<string> *out) {
+  string to_list_path = get_full_path_subdir(to_list);
+  DIR *dir = ::opendir(to_list_path.c_str());
+  char buf[PATH_MAX];
+  if (!dir)
+    return -errno;
+
+  struct dirent *de;
+  while (!::readdir_r(dir, reinterpret_cast<struct dirent*>(buf), &de)) {
+    if (!de) {
+      break;
+    }
+    string short_name(de->d_name);
+    string demangled_name;
+    hobject_t obj;
+    if (lfn_is_subdir(short_name, &demangled_name)) {
+      out->insert(demangled_name);
+    }
+  }
+
+  ::closedir(dir);
+  return 0;
+}
+
+int LFNIndex::create_path(const vector<string> &to_create) {
+  int r = ::mkdir(get_full_path_subdir(to_create).c_str(), 0777);
+  if (r < 0)
+    return -errno;
+  else
+    return 0;
+}
+
+int LFNIndex::remove_path(const vector<string> &to_remove) {
+  int r = ::rmdir(get_full_path_subdir(to_remove).c_str());
+  if (r < 0)
+    return -errno;
+  else
+    return 0;
+}
+
+int LFNIndex::path_exists(const vector<string> &to_check, int *exists) {
+  string full_path = get_full_path_subdir(to_check);
+  struct stat buf;
+  if (::stat(full_path.c_str(), &buf)) {
+    int r = -errno;
+    if (r == -ENOENT) {
+      *exists = 0;
+      return 0;
+    } else {
+      return r;
+    }
+  } else {
+    *exists = 1;
+    return 0;
+  }
+}
+
+int LFNIndex::add_attr_path(const vector<string> &path,
+                           const string &attr_name, 
+                           bufferlist &attr_value) {
+  string full_path = get_full_path_subdir(path);
+  return do_setxattr(full_path.c_str(), mangle_attr_name(attr_name).c_str(),
+                    reinterpret_cast<void *>(attr_value.c_str()),
+                    attr_value.length());
+}
+
+int LFNIndex::get_attr_path(const vector<string> &path,
+                           const string &attr_name, 
+                           bufferlist &attr_value) {
+  string full_path = get_full_path_subdir(path);
+  size_t size = 1024; // Initial
+  while (1) {
+    bufferptr buf(size);
+    int r = do_getxattr(full_path.c_str(), mangle_attr_name(attr_name).c_str(),
+                        reinterpret_cast<void *>(buf.c_str()),
+                        size);
+    if (r > 0) {
+      buf.set_length(r);
+      attr_value.push_back(buf);
+      break;
+    } else {
+      r = -errno;
+      if (r == -ERANGE) {
+       size *= 2;
+      } else {
+       return r;
+      }
+    }
+  }
+  return 0;
+}
+
+int LFNIndex::remove_attr_path(const vector<string> &path,
+                              const string &attr_name) {
+  string full_path = get_full_path_subdir(path);
+  string mangled_attr_name = mangle_attr_name(attr_name);
+  return do_removexattr(full_path.c_str(), mangled_attr_name.c_str());
+}
+  
+string LFNIndex::lfn_generate_object_name(const hobject_t &hoid) {
+  char s[FILENAME_MAX_LEN];
+  char *end = s + sizeof(s);
+  char *t = s;
+
+  const char *i = hoid.oid.name.c_str();
+  // Escape subdir prefix
+  if (hoid.oid.name.substr(0, 4) == "DIR_") {
+    *t++ = '\\';
+    *t++ = 'd';
+    i += 4;
+  }
+  while (*i && t < end) {
+    if (*i == '\\') {
+      *t++ = '\\';
+      *t++ = '\\';      
+    } else if (*i == '.' && i == hoid.oid.name.c_str()) {  // only escape leading .
+      *t++ = '\\';
+      *t++ = '.';
+    } else if (*i == '/') {
+      *t++ = '\\';
+      *t++ = 's';
+    } else
+      *t++ = *i;
+    i++;
+  }
+
+  if (hoid.snap == CEPH_NOSNAP)
+    t += snprintf(t, end - t, "_head");
+  else if (hoid.snap == CEPH_SNAPDIR)
+    t += snprintf(t, end - t, "_snapdir");
+  else
+    t += snprintf(t, end - t, "_%llx", (long long unsigned)hoid.snap);
+  snprintf(t, end - t, "_%.*X", (int)(sizeof(hoid.hash)*2), hoid.hash);
+
+  return string(s);
+ }
+
+int LFNIndex::lfn_get_name(const vector<string> &path, 
+                          const hobject_t &hoid,
+                          string *mangled_name, string *out_path,
+                          int *exists) {
+  string subdir_path = get_full_path_subdir(path);
+  string full_name = lfn_generate_object_name(hoid);
+  int r;
+
+  if (!lfn_must_hash(full_name)) {
+    if (mangled_name)
+      *mangled_name = full_name;
+    if (out_path)
+      *out_path = get_full_path(path, full_name);
+    if (exists) {
+      struct stat buf;
+      string full_path = get_full_path(path, full_name);
+      r = ::stat(full_path.c_str(), &buf);
+      if (r < 0) {
+       if (errno == ENOENT)
+         *exists = 0;
+       else
+         return -errno;
+      } else {
+       *exists = 1;
+      }
+    }
+    return 0;
+  }
+
+  int i = 0;
+  string candidate;
+  string candidate_path;
+  char buf[FILENAME_MAX_LEN + 1];
+  for ( ; ; ++i) {
+    candidate = lfn_get_short_name(hoid, i);
+    candidate_path = get_full_path(path, candidate);
+    r = do_getxattr(candidate_path.c_str(), LFN_ATTR.c_str(), buf, sizeof(buf));
+    if (r < 0) {
+      if (errno != ENODATA && errno != ENOENT)
+       return -errno;
+      if (errno == ENODATA) {
+       // Left over from incomplete transaction, it'll be replayed
+       r = ::unlink(candidate_path.c_str());
+       if (r < 0)
+         return -errno;
+      }
+      if (mangled_name)
+       *mangled_name = candidate;
+      if (out_path)
+       *out_path = candidate_path;
+      if (exists)
+       *exists = 0;
+      return 0;
+    }
+    assert(r > 0);
+    buf[MIN((int)sizeof(buf) - 1, r)] = '\0';
+    if (!strcmp(buf, full_name.c_str())) {
+      if (mangled_name)
+       *mangled_name = candidate;
+      if (out_path)
+       *out_path = candidate_path;
+      if (exists)
+       *exists = 1;
+      return 0;
+    }
+  }
+  assert(0); // Unreachable
+  return 0;
+}
+
+int LFNIndex::lfn_created(const vector<string> &path,
+                         const hobject_t &hoid,
+                         const string &mangled_name) {
+  if (!lfn_is_hashed_filename(mangled_name))
+    return 0;
+  string full_path = get_full_path(path, mangled_name);
+  string full_name = lfn_generate_object_name(hoid);
+  return do_setxattr(full_path.c_str(), LFN_ATTR.c_str(), 
+                    full_name.c_str(), full_name.size());
+}
+
+int LFNIndex::lfn_unlink(const vector<string> &path,
+                        const hobject_t &hoid,
+                        const string &mangled_name) {
+  if (!lfn_is_hashed_filename(mangled_name)) {
+    string full_path = get_full_path(path, mangled_name);
+    int r = ::unlink(full_path.c_str());
+    if (r < 0)
+      return -errno;
+    return 0;
+  }
+  string subdir_path = get_full_path_subdir(path);
+  
+  
+  int i = 0;
+  for ( ; ; ++i) {
+    string candidate = lfn_get_short_name(hoid, i);
+    if (candidate == mangled_name)
+      break;
+  }
+  int removed_index = i;
+  ++i;
+  for ( ; ; ++i) {
+    struct stat buf;
+    string to_check = lfn_get_short_name(hoid, i);
+    string to_check_path = get_full_path(path, to_check);
+    int r = ::stat(to_check_path.c_str(), &buf);
+    if (r < 0) {
+      if (errno == ENOENT) {
+       break;
+      } else {
+       return -errno;
+      }
+    }
+  }
+  if (i == removed_index + 1) {
+    string full_path = get_full_path(path, mangled_name);
+    int r = ::unlink(full_path.c_str());
+    if (r < 0)
+      return -errno;
+    else
+      return 0;
+  } else {
+    string rename_to = get_full_path(path, mangled_name);
+    string rename_from = get_full_path(path, lfn_get_short_name(hoid, i - 1));
+    int r = ::rename(rename_from.c_str(), rename_to.c_str());
+    if (r < 0)
+      return -errno;
+    else
+      return 0;
+  }
+}
+
+int LFNIndex::lfn_translate(const vector<string> &path,
+                                  const string &short_name,
+                                  hobject_t *out) {
+  if (!lfn_is_hashed_filename(short_name)) {
+    return lfn_parse_object_name(short_name, out);
+  }
+  // Get lfn_attr
+  string full_path = get_full_path(path, short_name);
+  char attr[PATH_MAX];
+  int r = do_getxattr(full_path.c_str(), LFN_ATTR.c_str(), attr, sizeof(attr) - 1);
+  if (r < 0)
+    return -errno;
+  if (r < (int)sizeof(attr))
+    attr[r] = '\0';
+
+  string long_name(attr);
+  return lfn_parse_object_name(long_name, out);
+}
+
+bool LFNIndex::lfn_is_object(const string &short_name) {
+  return lfn_is_hashed_filename(short_name) || !lfn_is_subdir(short_name, 0);
+}
+
+bool LFNIndex::lfn_is_subdir(const string &name, string *demangled) {
+  if (name.substr(0, SUBDIR_PREFIX.size()) == SUBDIR_PREFIX) {
+    if (demangled)
+      *demangled = demangle_path_component(name);
+    return 1;
+  }
+  return 0;
+}
+
+static int parse_object(const char *s, hobject_t& o)
+{
+  const char *hash = s + strlen(s) - 1;
+  while (*hash != '_' &&
+        hash > s)
+    hash--;
+  const char *bar = hash - 1;
+  while (*bar != '_' &&
+        bar > s)
+    bar--;
+  if (*bar == '_') {
+    char buf[bar-s + 1];
+    char *t = buf;
+    const char *i = s;
+    while (i < bar) {
+      if (*i == '\\') {
+       i++;
+       switch (*i) {
+       case '\\': *t++ = '\\'; break;
+       case '.': *t++ = '.'; break;
+       case 's': *t++ = '/'; break;
+       case 'd': {
+         *t++ = 'D';
+         *t++ = 'I';
+         *t++ = 'R';
+         *t++ = '_';
+         break;
+       }
+       default: assert(0);
+       }
+      } else {
+       *t++ = *i;
+      }
+      i++;
+    }
+    *t = 0;
+    o.oid.name = string(buf, t-buf);
+    if (strncmp(bar+1, "head", 4) == 0)
+      o.snap = CEPH_NOSNAP;
+    else if (strncmp(bar+1, "snapdir", 7) == 0)
+      o.snap = CEPH_SNAPDIR;
+    else 
+      o.snap = strtoull(bar+1, NULL, 16);
+    sscanf(hash, "_%X", &o.hash);
+
+    return 1;
+  }
+  return 0;
+}
+
+bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) {
+  bool r = parse_object(long_name.c_str(), *out);
+  if (!r) return r;
+  string temp = lfn_generate_object_name(*out);
+  return r;
+}
+
+bool LFNIndex::lfn_is_hashed_filename(const string &name) {
+  if (name.size() < (unsigned)FILENAME_SHORT_LEN) {
+    return 0;
+  }
+  if (name.substr(name.size() - FILENAME_COOKIE.size(), FILENAME_COOKIE.size())
+      == FILENAME_COOKIE) {
+    return 1;
+  } else {
+    return 0;
+  }
+}
+
+bool LFNIndex::lfn_must_hash(const string &long_name) {
+  return (int)long_name.size() >= FILENAME_SHORT_LEN;
+}
+
+static inline void buf_to_hex(const unsigned char *buf, int len, char *str)
+{
+  int i;
+  str[0] = '\0';
+  for (i = 0; i < len; i++) {
+    sprintf(&str[i*2], "%02x", (int)buf[i]);
+  }
+}
+
+int LFNIndex::hash_filename(const char *filename, char *hash, int buf_len)
+{
+  if (buf_len < FILENAME_HASH_LEN + 1)
+    return -EINVAL;
+
+  char buf[FILENAME_LFN_DIGEST_SIZE];
+  char hex[FILENAME_LFN_DIGEST_SIZE * 2];
+
+  SHA1 h;
+  h.Update((const byte *)filename, strlen(filename));
+  h.Final((byte *)buf);
+
+  buf_to_hex((byte *)buf, (FILENAME_HASH_LEN + 1) / 2, hex);
+  strncpy(hash, hex, FILENAME_HASH_LEN);
+  hash[FILENAME_HASH_LEN] = '\0';
+  return 0;
+}
+
+void LFNIndex::build_filename(const char *old_filename, int i, char *filename, int len)
+{
+  char hash[FILENAME_HASH_LEN + 1];
+
+  assert(len >= FILENAME_SHORT_LEN + 4);
+
+  strncpy(filename, old_filename, FILENAME_PREFIX_LEN);
+  filename[FILENAME_PREFIX_LEN] = '\0';
+  if ((int)strlen(filename) < FILENAME_PREFIX_LEN)
+    return;
+  if (old_filename[FILENAME_PREFIX_LEN] == '\0')
+    return;
+
+  hash_filename(old_filename, hash, sizeof(hash));
+  int ofs = FILENAME_PREFIX_LEN;
+  int suffix_len;
+  while (1) {
+    suffix_len = sprintf(filename + ofs, "_%s_%d_%s", hash, i, FILENAME_COOKIE.c_str());
+    if (ofs + suffix_len <= FILENAME_SHORT_LEN || !ofs)
+      break;
+    ofs--;
+  }
+}
+
+string LFNIndex::lfn_get_short_name(const hobject_t &hoid, int i) {
+  string long_name = lfn_generate_object_name(hoid);
+  assert(lfn_must_hash(long_name));
+  char buf[FILENAME_SHORT_LEN + 4];
+  build_filename(long_name.c_str(), i, buf, sizeof(buf));
+  return string(buf);
+}
+
+const string &LFNIndex::get_base_path() {
+  return base_path;
+}
+
+string LFNIndex::get_full_path_subdir(const vector<string> &rel) {
+  string retval = get_base_path();
+  for (vector<string>::const_iterator i = rel.begin();
+       i != rel.end();
+       ++i) {
+    retval += "/";
+    retval += mangle_path_component(*i);
+  }
+  return retval;
+}
+
+string LFNIndex::get_full_path(const vector<string> &rel, const string &name) {
+  return get_full_path_subdir(rel) + "/" + name;
+}
+
+string LFNIndex::mangle_path_component(const string &component) {
+  return SUBDIR_PREFIX + component;
+}
+
+string LFNIndex::demangle_path_component(const string &component) {
+  return component.substr(SUBDIR_PREFIX.size(), component.size() - SUBDIR_PREFIX.size());
+}
+
+int LFNIndex::decompose_full_path(const char *in, vector<string> *out,
+                       hobject_t *hoid, string *shortname) {
+  const char *beginning = in + get_base_path().size();
+  const char *end = beginning;
+  while (1) {
+    end++;
+    beginning = end++;
+    for (; *end != '\0' && *end != '/'; ++end);
+    if (*end != '\0') {
+      out->push_back(demangle_path_component(string(beginning, end - beginning)));
+      continue;
+    } else {
+      break;
+    }
+  }
+  *shortname = string(beginning, end - beginning);
+  if (hoid) {
+    int r = lfn_translate(*out, *shortname, hoid);
+    if (r < 0)
+      return r;
+  }
+  return 0;
+}
+
+string LFNIndex::mangle_attr_name(const string &attr) {
+  return PHASH_ATTR_PREFIX + attr;
+}
diff --git a/src/os/LFNIndex.h b/src/os/LFNIndex.h
new file mode 100644 (file)
index 0000000..e85c300
--- /dev/null
@@ -0,0 +1,443 @@
+// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- 
+// vim: ts=8 sw=2 smarttab
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef OS_LFNINDEX_H
+#define OS_LFNINDEX_H
+
+#include <string>
+#include <map>
+#include <set>
+#include <vector>
+#include <tr1/memory>
+
+#include "osd/osd_types.h"
+#include "include/object.h"
+#include "common/ceph_crypto.h"
+#include "ObjectStore.h"
+
+#include "CollectionIndex.h"
+
+/** 
+ * LFNIndex also encapsulates logic for manipulating
+ * subdirectories of of a collection as well as the long filename
+ * logic.
+ *
+ * The protected methods provide machinery for derived classes to
+ * manipulate subdirectories and objects.
+ *  
+ * The virtual methods are to be overridden to provide the actual
+ * hashed layout.
+ *  
+ * User must call created when an object is created.
+ *  
+ * Syncronization: Calling code must ensure that there are no object
+ * creations or deletions during the lifetime of a Path object (except
+ * of an object at that path).
+ *
+ * Unless otherwise noted, methods which return an int return 0 on sucess
+ * and a negative error code on failure.
+ */
+class LFNIndex : public CollectionIndex {
+  /// Hash digest output size.
+  static const int FILENAME_LFN_DIGEST_SIZE = CEPH_CRYPTO_SHA1_DIGESTSIZE;
+  /// Length of filename hash.
+  static const int FILENAME_HASH_LEN = FILENAME_LFN_DIGEST_SIZE;
+  /// Max filename size.
+  static const int FILENAME_MAX_LEN = 4096;
+  /// Length of hashed filename.
+  static const int FILENAME_SHORT_LEN = 255;
+  /// Length of hashed filename prefix.
+  static const int FILENAME_PREFIX_LEN;
+  /// Length of hashed filename cookie.
+  static const int FILENAME_EXTRA = 4;
+  /// Lfn cookie value.
+  static const string FILENAME_COOKIE;
+  /// Name of LFN attribute for storing full name.
+  static const string LFN_ATTR;
+  /// Prefix for subdir index attributes.
+  static const string PHASH_ATTR_PREFIX;
+  /// Prefix for index subdirectories.
+  static const string SUBDIR_PREFIX;
+
+  /// Path to Index base.
+  const string base_path;
+  /// For reference counting the collection @see Path
+  std::tr1::weak_ptr<CollectionIndex> self_ref;
+
+protected:
+  /// Constructor
+  LFNIndex(const char *base_path) ///< [in] path to Index root
+    : base_path(base_path) {}
+
+  /// Virtual destructor
+  virtual ~LFNIndex() {}
+
+  /// @see CollectionIndex
+  void set_ref(std::tr1::shared_ptr<CollectionIndex> ref);
+
+  /// @see CollectionIndex
+  int init();
+
+  /// @see CollectionIndex
+  int cleanup() = 0;
+
+  /// @see CollectionIndex
+  int created(
+    const hobject_t &hoid,
+    const char *path
+    );
+
+  /// @see CollectionIndex
+  int unlink(
+    const hobject_t &hoid
+    );
+
+  /// @see CollectionIndex
+  int lookup(
+    const hobject_t &hoid,
+    IndexedPath *path,
+    int *exist
+    );
+
+  /// @see CollectionIndex
+  int collection_list_partial(
+    snapid_t seq,
+    int max_count,
+    vector<hobject_t> *ls, 
+    collection_list_handle_t *last
+    );
+
+  /// @see CollectionIndex
+  int collection_list(
+    vector<hobject_t> *ls
+    );
+
+protected:
+  virtual int _init() = 0;
+
+  /// Will be called upon object creation
+  virtual int _created(
+    const vector<string> &path, ///< [in] Path to subdir.
+    const hobject_t &hoid,      ///< [in] Object created.
+    const string &mangled_name  ///< [in] Mangled filename.
+    ) = 0;
+
+  /// Will be called to remove an object
+  virtual int _remove(
+    const vector<string> &path,     ///< [in] Path to subdir.
+    const hobject_t &hoid,          ///< [in] Object to remove.
+    const string &mangled_name     ///< [in] Mangled filename.
+    ) = 0;
+
+  /// Return the path and mangled_name for hoid.
+  virtual int _lookup(
+    const hobject_t &hoid,///< [in] Object for lookup.
+    vector<string> *path, ///< [out] Path to the object.
+    string *mangled_name, ///< [out] Mangled filename.
+    int *exists                  ///< [out] True if the object exists.
+    ) = 0;
+
+  /**
+   * List contents of the collection, must be implemented by derived class.
+   *
+   * @param [out] seq Snapid to list.
+   * @param [in] max_count Max number to list (0 for no limit).
+   * @param [out] ls Container for listed objects.
+   * @param [in,out] last List handle.  0 for beginning.  Passing the same
+   * cookie location will cause the next max_count to be listed.
+   * @return Error code.  0 on success.
+   */
+  virtual int _collection_list_partial(
+    snapid_t seq,
+    int max_count,
+    vector<hobject_t> *ls,
+    collection_list_handle_t *last
+    ) = 0;
+                                      
+  /// List contents of collection.
+  virtual int _collection_list(
+    vector<hobject_t> *ls ///< [out] Listed objects.
+    ) = 0;
+
+protected:
+
+  /* Non-virtual utility methods */
+
+  /// Sync a subdirectory
+  int fsync_dir(
+    const vector<string> &path ///< [in] Path to sync
+    ); ///< @return Error Code, 0 on success
+
+  /// Link an object from from into to
+  int link_object(
+    const vector<string> &from,   ///< [in] Source subdirectory.
+    const vector<string> &to,     ///< [in] Dest subdirectory.
+    const hobject_t &hoid,        ///< [in] Object to move.
+    const string &from_short_name ///< [in] Mangled filename of hoid.
+    ); ///< @return Error Code, 0 on success
+
+  /**
+   * Efficiently remove objects from a subdirectory
+   *
+   * remove_object invalidates mangled names in the directory requiring
+   * the mangled name of each additional object to be looked up a second
+   * time.  remove_objects removes the need for additional lookups
+   *
+   * @param [in] dir Directory from which to remove.
+   * @param [in] map of objects to remove to mangle names
+   * @param [in,out] map of filenames to objects
+   * @return Error Code, 0 on success.
+   */
+  int remove_objects(
+    const vector<string> &dir,             
+    const map<string, hobject_t> &to_remove,
+    map<string, hobject_t> *remaining
+    );
+       
+
+  /** 
+   * Moves contents of from into to.
+   *
+   * Invalidates mangled names in to.  If interupted, all objects will be
+   * present in to before objects are removed from from.  Ignores EEXIST 
+   * while linking into to.
+   * @return Error Code, 0 on success
+   */
+  int move_objects(
+    const vector<string> &from, ///< [in] Source subdirectory.
+    const vector<string> &to    ///< [in] Dest subdirectory.
+    );
+
+  /** 
+   * Remove an object from from.
+   *
+   * Invalidates mangled names in from.
+   * @return Error Code, 0 on success
+   */
+  int remove_object(
+    const vector<string> &from,  ///< [in] Directory from which to remove.
+    const hobject_t &to_remove   ///< [in] Object to remove.
+    );
+
+  /**
+   * Gets the filename corresponding to hoid in from.
+   * 
+   * The filename may differ between subdirectories.  Furthermore,
+   * file creations ore removals in from may invalidate the name.
+   * @return Error code on failure, 0 on success
+   */
+  int get_mangled_name(
+    const vector<string> &from, ///< [in] Subdirectory
+    const hobject_t &hoid,     ///< [in] Object 
+    string *mangled_name,      ///< [out] Filename
+    int *exists                        ///< [out] 1 if the file exists, else 0
+    );
+
+  /**
+   * Lists objects in to_list.
+   *
+   * @param [in] to_list Directory to list.
+   * @param [in] max_objects Max number to list.
+   * @param [in,out] handle Cookie for continuing the listing.
+   * Initialize to zero to start at the beginning of the directory.
+   * @param [out] out Mapping of listed object filenames to objects.
+   * @return Error code on failure, 0 on success
+   */
+  int list_objects(
+    const vector<string> &to_list,
+    int max_objects,
+    long *handle,
+    map<string, hobject_t> *out
+    );
+
+  /// Lists subdirectories.
+  int list_subdirs(
+    const vector<string> &to_list, ///< [in] Directory to list.
+    set<string> *out              ///< [out] Subdirectories listed. 
+    );
+
+  /// Create subdirectory.
+  int create_path(
+    const vector<string> &to_create ///< [in] Subdirectory to create.
+    );
+
+  /// Remove subdirectory.
+  int remove_path(
+    const vector<string> &to_remove ///< [in] Subdirectory to remove.
+    );
+
+  /// Check whether to_check exists.
+  int path_exists(
+    const vector<string> &to_check, ///< [in] Subdirectory to check.
+    int *exists                            ///< [out] 1 if it exists, 0 else
+    );
+
+  /// Save attr_value to attr_name attribute on path.
+  int add_attr_path(
+    const vector<string> &path, ///< [in] Path to modify.
+    const string &attr_name,   ///< [in] Name of attribute.
+    bufferlist &attr_value     ///< [in] Value to save.
+    );
+
+  /// Read into attr_value atribute attr_name on path.
+  int get_attr_path(
+    const vector<string> &path, ///< [in] Path to read.
+    const string &attr_name,   ///< [in] Attribute to read. 
+    bufferlist &attr_value     ///< [out] Attribute value read.
+    );
+
+  /// Remove attr from path
+  int remove_attr_path(
+    const vector<string> &path, ///< [in] path from which to remove attr
+    const string &attr_name    ///< [in] attr to remove
+    ); ///< @return Error code, 0 on success
+
+private:
+  /* lfn translation functions */
+  /**
+   * Gets the filename corresponsing to hoid in path.
+   *
+   * @param [in] path Path in which to get filename for hoid.
+   * @param [in] hoid Object for which to get filename.
+   * @param [out] mangled_name Filename for hoid, pass NULL if not needed.
+   * @param [out] full_path Fullpath for hoid, pass NULL if not needed.
+   * @param [out] exists 1 if the file exists, 0 otherwise, pass NULL if 
+   * not needed
+   * @return Error Code, 0 on success.
+   */
+  int lfn_get_name(
+    const vector<string> &path,
+    const hobject_t &hoid, 
+    string *mangled_name,
+    string *full_path,
+    int *exists
+    );
+
+  /// Adjusts path contents when hoid is created at name mangled_name.
+  int lfn_created(
+    const vector<string> &path, ///< [in] Path to adjust.
+    const hobject_t &hoid,     ///< [in] Object created. 
+    const string &mangled_name  ///< [in] Filename of created object.
+    );
+
+  /// Removes hoid from path while adjusting path contents
+  int lfn_unlink(
+    const vector<string> &path, ///< [in] Path containing hoid.
+    const hobject_t &hoid,     ///< [in] Object to remove.
+    const string &mangled_name ///< [in] Filename of object to remove.
+    );
+
+  ///Transate a file into and hobject_t.
+  int lfn_translate(
+    const vector<string> &path, ///< [in] Path containing the file.
+    const string &short_name,  ///< [in] Filename to translate. 
+    hobject_t *out             ///< [out] Object found.
+    ); ///< @return Negative error code on error, 0 if not an object, 1 else
+
+  /* manglers/demanglers */
+  /// Filters object filenames
+  bool lfn_is_object(
+    const string &short_name ///< [in] Filename to check
+    ); ///< True if short_name is an object, false otherwise
+
+  /// Filters subdir filenames
+  bool lfn_is_subdir(
+    const string &short_name, ///< [in] Filename to check.
+    string *demangled_name    ///< [out] Demangled subdir name.
+    ); ///< @return True if short_name is a subdir, false otherwise
+
+  /// Generate object name
+  string lfn_generate_object_name(
+    const hobject_t &hoid ///< [in] Object for which to generate.
+    ); ///< @return Generated object name.
+
+  /// Parse object name
+  bool lfn_parse_object_name(
+    const string &long_name, ///< [in] Name to parse
+    hobject_t *out          ///< [out] Resulting Object
+    ); ///< @return True if successfull, False otherwise.
+
+  /// Checks whether short_name is a hashed filename.
+  bool lfn_is_hashed_filename(
+    const string &short_name ///< [in] Name to check.
+    ); ///< @return True if short_name is hashed, False otherwise.
+
+  /// Checks whether long_name must be hashed.
+  bool lfn_must_hash(
+    const string &long_name ///< [in] Name to check.
+    ); ///< @return True if long_name must be hashed, False otherwise.
+
+  /// Generate hashed name.
+  string lfn_get_short_name(
+    const hobject_t &hoid, ///< [in] Object for which to generate.
+    int i                 ///< [in] Index of hashed name to generate.
+    ); ///< @return Hashed filename.
+
+  /* other common methods */
+  /// Gets the base path
+  const string &get_base_path(); ///< @return Index base_path
+
+  /// Get full path the subdir
+  string get_full_path_subdir(
+    const vector<string> &rel ///< [in] The subdir.
+    ); ///< @return Full path to rel.
+
+  /// Get full path to object
+  string get_full_path(
+    const vector<string> &rel, ///< [in] Path to object.
+    const string &name        ///< [in] Filename of object.
+    ); ///< @return Fullpath to object at name in rel.
+
+  /// Get mangled path component
+  string mangle_path_component(
+    const string &component ///< [in] Component to mangle
+    ); /// @return Mangled component
+
+  /// Demangle component
+  string demangle_path_component(
+    const string &component ///< [in] Subdir name to demangle
+    ); ///< @return Demangled path component.
+
+  /// Decompose full path into object name and filename.
+  int decompose_full_path(
+    const char *in,      ///< [in] Full path to object.
+    vector<string> *out, ///< [out] Path to object at in.
+    hobject_t *hoid,    ///< [out] Object at in.
+    string *shortname   ///< [out] Filename of object at in.
+    ); ///< @return Error Code, 0 on success.
+
+  /// Mangle attribute name
+  string mangle_attr_name(
+    const string &attr ///< [in] Attribute to mangle.
+    ); ///< @return Mangled attribute name.
+
+  /// Builds hashed filename
+  void build_filename(
+    const char *old_filename, ///< [in] Filename to convert.
+    int i,                   ///< [in] Index of hash.
+    char *filename,          ///< [out] Resulting filename.
+    int len                  ///< [in] Size of buffer for filename
+    ); ///< @return Error Code, 0 on success
+
+  /// Get hash of filename
+  int hash_filename(
+    const char *filename, ///< [in] Filename to hash.
+    char *hash,                  ///< [out] Hash of filename.
+    int len              ///< [in] Size of hash buffer.
+    ); ///< @return Error Code, 0 on success.
+};
+typedef LFNIndex::IndexedPath IndexedPath;
+
+#endif
index 9cbf87bcc87b3cdca939386e3b41b6afa5621e2b..0605ee3a108de92a1396a4d05d26a1d226321052 100644 (file)
@@ -266,6 +266,10 @@ public:
     return str.c_str();
   }
 
+  int operator<(const coll_t &rhs) const {
+    return str < rhs.str;
+  }
+
   bool is_pg(pg_t& pgid, snapid_t& snap) const;
   void encode(bufferlist& bl) const;
   void decode(bufferlist::iterator& bl);