]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
librados, os, osd, osdc, test: Add support for client specified namespaces
authorDavid Zafman <david.zafman@inktank.com>
Tue, 11 Jun 2013 01:18:59 +0000 (18:18 -0700)
committerDavid Zafman <david.zafman@inktank.com>
Tue, 9 Jul 2013 21:09:02 +0000 (14:09 -0700)
Add rados_ioctx_namespace_set_key() and librados::IoCtx::namespace_set_key()
Add namespace to admin-daemon operations
Support namespace in osd map command
Add namespace to object_locator_t and hobject_t
Add random namespaces to psim program

Feature: #4982 (OSD: namespaces pt 1 (librados/osd, not caps))

Signed-off-by: David Zafman <david.zafman@inktank.com>
32 files changed:
src/include/rados/librados.h
src/include/rados/librados.hpp
src/librados/IoCtxImpl.cc
src/librados/librados.cc
src/librbd/ImageCtx.cc
src/messages/MOSDOp.h
src/mon/OSDMonitor.cc
src/os/DBObjectMap.cc
src/os/HashIndex.cc
src/os/LFNIndex.cc
src/os/ObjectStore.cc
src/os/hobject.cc
src/os/hobject.h
src/osd/OSD.cc
src/osd/OSDMap.cc
src/osd/OSDMap.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.cc
src/osd/osd_types.h
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h
src/osdc/Objecter.cc
src/osdc/Objecter.h
src/psim.cc
src/test/filestore/store_test.cc
src/test/os/TestFlatIndex.cc
src/test/os/TestLFNIndex.cc
src/test/osd/TestPGLog.cc
src/test/osdc/object_cacher_stress.cc
src/test/test_osd_types.cc
src/test/test_snap_mapper.cc

index eb1a30632b7e491c975d174f91841e18c04ed4bd..4a5be3d1777950c679eb359b109b228525f0116b 100644 (file)
@@ -657,6 +657,19 @@ int rados_ioctx_get_pool_name(rados_ioctx_t io, char *buf, unsigned maxlen);
  * any previously set key
  */
 void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key);
+
+/**
+ * Set the namespace for objects within an io context
+ *
+ * The namespace specification further refines a pool into different
+ * domains.  The mapping of objects to pgs is also based on this
+ * value.
+ *
+ * @param io the io context to change
+ * @param nspace the name to use as the namespace, or NULL use the
+ * default namespace
+ */
+void rados_ioctx_set_namespace(rados_ioctx_t io, const char *nspace);
 /** @} obj_loc */
 
 /**
index 497fc212db19de4b1b7100ee2d76778518ea7464..f10417f087c74e1c939793ba07d04c90bb270c4b 100644 (file)
@@ -654,6 +654,7 @@ namespace librados
     const std::string& get_pool_name() const;
 
     void locator_set_key(const std::string& key);
+    void set_namespace(const std::string& nspace);
 
     int64_t get_id();
 
index d2ef7cd217de44579340a689dfb43ca07581b719..411e62e5822b349927a5d7d6da418dea95a1030c 100644 (file)
@@ -212,7 +212,6 @@ int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid,
 {
   utime_t ut = ceph_clock_now(client->cct);
   int reply;
-  eversion_t ver;
 
   Mutex mylock("IoCtxImpl::snap_rollback::mylock");
   Cond cond;
@@ -225,7 +224,7 @@ int librados::IoCtxImpl::selfmanaged_snap_rollback_object(const object_t& oid,
   lock->Lock();
   objecter->mutate(oid, oloc,
                   op, snapc, ut, 0,
-                  onack, NULL, &ver);
+                  onack, NULL, NULL);
   lock->Unlock();
 
   mylock.Lock();
@@ -370,6 +369,7 @@ int librados::IoCtxImpl::list(Objecter::ListContext *context, int max_entries)
     return 0;
 
   context->max_entries = max_entries;
+  context->nspace = oloc.nspace;
 
   lock->Lock();
   objecter->list_objects(context, new C_SafeCond(&mylock, &cond, &done, &r));
index 0d4277d8fab33b041d2703fb2059851c4017cc2b..002aeefe256b1ab93e382e13715ec7eeb3dc825e 100644 (file)
@@ -1242,6 +1242,11 @@ void librados::IoCtx::locator_set_key(const string& key)
   io_ctx_impl->oloc.key = key;
 }
 
+void librados::IoCtx::set_namespace(const string& nspace)
+{
+  io_ctx_impl->oloc.nspace = nspace;
+}
+
 int64_t librados::IoCtx::get_id()
 {
   return io_ctx_impl->get_id();
@@ -2169,6 +2174,15 @@ extern "C" void rados_ioctx_locator_set_key(rados_ioctx_t io, const char *key)
     ctx->oloc.key = "";
 }
 
+extern "C" void rados_ioctx_set_namespace(rados_ioctx_t io, const char *nspace)
+{
+  librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
+  if (nspace)
+    ctx->oloc.nspace = nspace;
+  else
+    ctx->oloc.nspace = "";
+}
+
 extern "C" rados_t rados_ioctx_get_cluster(rados_ioctx_t io)
 {
   librados::IoCtxImpl *ctx = (librados::IoCtxImpl *)io;
@@ -2446,6 +2460,7 @@ extern "C" int rados_objects_list_open(rados_ioctx_t io, rados_list_ctx_t *listh
   Objecter::ListContext *h = new Objecter::ListContext;
   h->pool_id = ctx->poolid;
   h->pool_snap_seq = ctx->snap_seq;
+  h->nspace = ctx->oloc.nspace;
   *listh = (void *)new librados::ObjListCtx(ctx, h);
   return 0;
 }
index 2ab5fd9e91d82902631514db7f4337f685b1fd1b..41518b67698cdec5a750141703d574a523686e7d 100644 (file)
@@ -494,6 +494,8 @@ namespace librbd {
     snap_lock.put_read();
     ObjectExtent extent(o, 0, off, len, 0);
     extent.oloc.pool = data_ctx.get_id();
+    // XXX: nspace is always default, io_ctx_impl field private
+    //extent.oloc.nspace = data_ctx.io_ctx_impl->oloc.nspace;
     extent.buffer_extents.push_back(make_pair(0, len));
     wr->extents.push_back(extent);
     {
index f81691ba9ed30dbabff328783d0200d5d7ab86b4..3855670f8ebf7d0d52f53a0a418c0d38f1195795 100644 (file)
@@ -341,7 +341,10 @@ struct ceph_osd_request_head {
   const char *get_type_name() const { return "osd_op"; }
   void print(ostream& out) const {
     out << "osd_op(" << get_reqid();
-    out << " " << oid;
+    out << " ";
+    if (!oloc.nspace.empty())
+      out << oloc.nspace << "/";
+    out << oid;
 
 #if 0
     out << " ";
index ff5da60fffe283d450b5fdd8a0a6bd942d5b8995..a8433da81ed5c0839b1a81be4c79569ea1250b63 100644 (file)
@@ -2055,23 +2055,30 @@ bool OSDMonitor::preprocess_command(MMonCommand *m)
     ds << "\n";
     rdata.append(ds);
   } else if (prefix == "osd map") {
-    string poolstr, objstr;
+    string poolstr, objstr, namespacestr;
     cmd_getval(g_ceph_context, cmdmap, "pool", poolstr);
     cmd_getval(g_ceph_context, cmdmap, "object", objstr);
+    cmd_getval(g_ceph_context, cmdmap, "nspace", namespacestr);
+
     int64_t pool = osdmap.lookup_pg_pool_name(poolstr.c_str());
     if (pool < 0) {
       ss << "pool " << poolstr << " does not exist";
       r = -ENOENT;
       goto reply;
     }
-    object_locator_t oloc(pool);
+    object_locator_t oloc(pool, namespacestr);
     object_t oid(objstr);
     pg_t pgid = osdmap.object_locator_to_pg(oid, oloc);
     pg_t mpgid = osdmap.raw_pg_to_pg(pgid);
     vector<int> up, acting;
     osdmap.pg_to_up_acting_osds(mpgid, up, acting);
+    string fullobjname;
+    if (!namespacestr.empty())
+      fullobjname = namespacestr + string("/") + oid.name;
+    else
+      fullobjname = oid.name;
     ds << "osdmap e" << osdmap.get_epoch()
-       << " pool '" << poolstr << "' (" << pool << ") object '" << oid << "' ->"
+       << " pool '" << poolstr << "' (" << pool << ") object '" << fullobjname << "' ->"
        << " pg " << pgid << " (" << mpgid << ")"
        << " -> up " << up << " acting " << acting;
     rdata.append(ds);
index 29edfbe1f050a3f6d0b565758faecb8d23732f9b..5142f4d7420a93e7289a60b5fb720e3a8592cef1 100644 (file)
@@ -244,7 +244,7 @@ bool DBObjectMap::parse_hobject_key_v0(const string &in, coll_t *c,
   pg_t pg;
   if (c->is_pg_prefix(pg))
     pool = (int64_t)pg.pool();
-  (*hoid) = hobject_t(name, key, snap, hash, pool);
+  (*hoid) = hobject_t(name, key, snap, hash, pool, "");
   return true;
 }
 
index 17b0f0388b99f1d0ff71cad47a702c931e6f8e3f..86a912bbef2b099a37ec135273b1b5b934862530 100644 (file)
@@ -702,7 +702,7 @@ int HashIndex::list_by_hash(const vector<string> &path,
     if (j == objects.end() || j->first != *i) {
       if (min_count > 0 && out->size() > (unsigned)min_count) {
        if (next)
-         *next = hobject_t("", "", CEPH_NOSNAP, hash_prefix_to_hash(*i), -1);
+         *next = hobject_t("", "", CEPH_NOSNAP, hash_prefix_to_hash(*i), -1, "");
        return 0;
       }
       *(next_path.rbegin()) = *(i->rbegin());
index 38dfa23a333afb9b4f8a74a2a08f4ddabf496194..edf361a44f0fa6061aba4a96a063e4184e0fc208 100644 (file)
@@ -1008,7 +1008,7 @@ bool LFNIndex::lfn_parse_object_name_poolless(const string &long_name,
   pg_t pg;
   if (coll().is_pg_prefix(pg))
     pool = (int64_t)pg.pool();
-  (*out) = hobject_t(name, key, snap, hash, pool);
+  (*out) = hobject_t(name, key, snap, hash, pool, "");
   return true;
 }
 
@@ -1095,8 +1095,7 @@ bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out)
   else
     pool = strtoull(pstring.c_str(), NULL, 16);
 
-  (*out) = hobject_t(name, key, snap, hash, (int64_t)pool);
-  out->nspace = ns;
+  (*out) = hobject_t(name, key, snap, hash, (int64_t)pool, ns);
   return true;
 }
 
index 437549d0b393e013719a0fe52b6cd3738a058ce6..94c9b02ac91261b844169109bd851f9f1e083294 100644 (file)
@@ -464,9 +464,9 @@ void ObjectStore::Transaction::generate_test_instances(list<ObjectStore::Transac
   t = new Transaction;
   coll_t c("foocoll");
   coll_t c2("foocoll2");
-  hobject_t o1("obj", "", 123, 456, -1);
-  hobject_t o2("obj2", "", 123, 456, -1);
-  hobject_t o3("obj3", "", 123, 456, -1);
+  hobject_t o1("obj", "", 123, 456, -1, "");
+  hobject_t o2("obj2", "", 123, 456, -1, "");
+  hobject_t o3("obj3", "", 123, 456, -1, "");
   t->touch(c, o1);
   bufferlist bl;
   bl.append("some data");
index 288887d45c78187ed18f5be2be0833a79fd9d8c2..d6273693c6235c893d423c0ef5902b786c75b211 100644 (file)
@@ -152,6 +152,8 @@ void hobject_t::decode(json_spirit::Value& v)
       max = p.value_.get_int();
     else if (p.name_ == "pool")
       pool = p.value_.get_int();
+    else if (p.name_ == "namespace")
+      nspace = p.value_.get_str();
   }
 }
 
@@ -163,6 +165,7 @@ void hobject_t::dump(Formatter *f) const
   f->dump_int("hash", hash);
   f->dump_int("max", (int)max);
   f->dump_int("pool", pool);
+  f->dump_string("namespace", nspace);
 }
 
 void hobject_t::generate_test_instances(list<hobject_t*>& o)
@@ -170,9 +173,11 @@ void hobject_t::generate_test_instances(list<hobject_t*>& o)
   o.push_back(new hobject_t);
   o.push_back(new hobject_t);
   o.back()->max = true;
-  o.push_back(new hobject_t(object_t("oname"), string(), 1, 234, -1));
-  o.push_back(new hobject_t(object_t("oname2"), string("okey"), CEPH_NOSNAP, 67, 0));
-  o.push_back(new hobject_t(object_t("oname3"), string("oname3"), CEPH_SNAPDIR, 910, 1));
+  o.push_back(new hobject_t(object_t("oname"), string(), 1, 234, -1, ""));
+  o.push_back(new hobject_t(object_t("oname2"), string("okey"), CEPH_NOSNAP,
+       67, 0, "n1"));
+  o.push_back(new hobject_t(object_t("oname3"), string("oname3"),
+       CEPH_SNAPDIR, 910, 1, "n2"));
 }
 
 ostream& operator<<(ostream& out, const hobject_t& o)
index a30ea0117bd5c440b394c8abd8e02ab1092730a2..633e471dffcd3fd4e6be78d91b8a5d438dbdd0e3 100644 (file)
@@ -59,15 +59,15 @@ public:
   hobject_t() : snap(0), hash(0), max(false), pool(-1) {}
 
   hobject_t(object_t oid, const string& key, snapid_t snap, uint64_t hash,
-           int64_t pool) : 
+           int64_t pool, string nspace) :
     oid(oid), snap(snap), hash(hash), max(false),
-    pool(pool),
+    pool(pool), nspace(nspace),
     key(oid.name == key ? string() : key) {}
 
   hobject_t(const sobject_t &soid, const string &key, uint32_t hash,
-           int64_t pool) : 
+           int64_t pool, string nspace) :
     oid(soid.oid), snap(soid.snap), hash(hash), max(false),
-    pool(pool),
+    pool(pool), nspace(nspace),
     key(soid.oid.name == key ? string() : key) {}
 
   /// @return min hobject_t ret s.t. ret.hash == this->hash
@@ -138,6 +138,10 @@ public:
     (*this) = temp;
   }
 
+  string get_namespace() const {
+    return nspace;
+  }
+
   void encode(bufferlist& bl) const;
   void decode(bufferlist::iterator& bl);
   void decode(json_spirit::Value& v);
index bab345324d0254e77a39207b7389a125881419dd..d7196b8a79164542df553aee98fb913b133bb8e3 100644 (file)
@@ -3078,13 +3078,13 @@ void OSD::check_ops_in_flight()
 }
 
 // Usage:
-//   setomapval <pool-id> <obj-name> <key> <val>
-//   rmomapkey <pool-id> <obj-name> <key>
-//   setomapheader <pool-id> <obj-name> <header>
-//   getomap <pool> <obj-name>
-//   truncobj <pool-id> <obj-name> <newlen>
-//   injectmdataerr
-//   injectdataerr
+//   setomapval <pool-id> [namespace/]<obj-name> <key> <val>
+//   rmomapkey <pool-id> [namespace/]<obj-name> <key>
+//   setomapheader <pool-id> [namespace/]<obj-name> <header>
+//   getomap <pool> [namespace/]<obj-name>
+//   truncobj <pool-id> [namespace/]<obj-name> <newlen>
+//   injectmdataerr [namespace/]<obj-name>
+//   injectdataerr [namespace/]<obj-name>
 void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
      std::string command, std::string args, ostream &ss)
 {
@@ -3116,21 +3116,29 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
     if (pool < 0 && isdigit(argv[1].c_str()[0]))
       pool = atoll(argv[1].c_str());
     r = -1;
-    if (pool >= 0)
-        r = curmap->object_locator_to_pg(object_t(argv[2]),
-          object_locator_t(pool), rawpg);
+    string objname, nspace;
+    objname = string(argv[2]);
+    if (pool >= 0) {
+        std::size_t found = argv[2].find_first_of('/');
+        if (found != string::npos) {
+          nspace = argv[2].substr(0, found);
+          objname = argv[2].substr(found+1);
+        }
+        object_locator_t oloc(pool, nspace);
+        r = curmap->object_locator_to_pg(object_t(objname), oloc,  rawpg);
+    }
     if (r < 0) {
         ss << "Invalid pool " << argv[1];
         return;
     }
     pgid = curmap->raw_pg_to_pg(rawpg);
 
-    hobject_t obj(object_t(argv[2]), string(""), CEPH_NOSNAP, rawpg.ps(), pool);
+    hobject_t obj(object_t(objname), string(""), CEPH_NOSNAP, rawpg.ps(), pool, nspace);
     ObjectStore::Transaction t;
 
     if (command == "setomapval") {
       if (argc != 5) {
-        ss << "usage: setomapval <pool> <obj-name> <key> <val>";
+        ss << "usage: setomapval <pool> [namespace/]<obj-name> <key> <val>";
         return;
       }
       map<string, bufferlist> newattrs;
@@ -3147,7 +3155,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
         ss << "ok";
     } else if (command == "rmomapkey") {
       if (argc != 4) {
-        ss << "usage: rmomapkey <pool> <obj-name> <key>";
+        ss << "usage: rmomapkey <pool> [namespace/]<obj-name> <key>";
         return;
       }
       set<string> keys;
@@ -3161,7 +3169,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
         ss << "ok";
     } else if (command == "setomapheader") {
       if (argc != 4) {
-        ss << "usage: setomapheader <pool> <obj-name> <header>";
+        ss << "usage: setomapheader <pool> [namespace/]<obj-name> <header>";
         return;
       }
       bufferlist newheader;
@@ -3175,7 +3183,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
         ss << "ok";
     } else if (command == "getomap") {
       if (argc != 3) {
-        ss << "usage: getomap <pool> <obj-name>";
+        ss << "usage: getomap <pool> [namespace/]<obj-name>";
         return;
       }
       //Debug: Output entire omap
@@ -3193,7 +3201,7 @@ void TestOpsSocketHook::test_ops(OSDService *service, ObjectStore *store,
       }
     } else if (command == "truncobj") {
       if (argc != 4) {
-       ss << "usage: truncobj <pool> <obj-name> <val>";
+       ss << "usage: truncobj <pool> [namespace/]<obj-name> <val>";
        return;
       }
       t.truncate(coll_t(pgid), obj, atoi(argv[3].c_str()));
index 711997782a8e1c5207a3a49cf4f0019a6f627f95..9a14d7c293437e7d28e7189861c3b777000fedea 100644 (file)
@@ -974,26 +974,39 @@ int OSDMap::apply_incremental(const Incremental &inc)
   return 0;
 }
 
+static string make_hash_str(const string &inkey, const string &nspace)
+{
+  if (nspace.empty())
+    return inkey;
+  return nspace + '\037' + inkey;
+}
 
 // mapping
-int OSDMap::object_locator_to_pg(const object_t& oid, const object_locator_t& loc, pg_t &pg) const
+int OSDMap::object_locator_to_pg(
+       const object_t& oid,
+       const object_locator_t& loc,
+       pg_t &pg) const
 {
   // calculate ps (placement seed)
   const pg_pool_t *pool = get_pg_pool(loc.get_pool());
   if (!pool)
     return -ENOENT;
   ps_t ps;
-  if (loc.key.length())
-    ps = ceph_str_hash(pool->object_hash, loc.key.c_str(), loc.key.length());
+  string key;
+  if (!loc.key.empty())
+    key = make_hash_str(loc.key, loc.nspace);
   else
-    ps = ceph_str_hash(pool->object_hash, oid.name.c_str(), oid.name.length());
+    key = make_hash_str(oid.name, loc.nspace);
+
+  ps = ceph_str_hash(pool->object_hash, key.c_str(), key.length());
   pg = pg_t(ps, loc.get_pool(), -1);
   return 0;
 }
 
-ceph_object_layout OSDMap::make_object_layout(object_t oid, int pg_pool) const
+ceph_object_layout OSDMap::make_object_layout(
+  object_t oid, int pg_pool, string nspace) const
 {
-  object_locator_t loc(pg_pool);
+  object_locator_t loc(pg_pool, nspace);
 
   ceph_object_layout ol;
   pg_t pgid = object_locator_to_pg(oid, loc);
index aadbe3c58be12828c4c42dd7020e19dbc738d107..92d1c5bf8a1abd46f978e8ef05349b91e1d7c0f6 100644 (file)
@@ -466,12 +466,13 @@ public:
     return object_locator_t(layout.fl_pg_pool);
   }
 
+  // XXX: not used, mentioned in psim.cc comment
   // oid -> pg
-  ceph_object_layout file_to_object_layout(object_t oid, ceph_file_layout& layout) const {
-    return make_object_layout(oid, layout.fl_pg_pool);
+  ceph_object_layout file_to_object_layout(object_t oid, ceph_file_layout& layout, string nspace) const {
+    return make_object_layout(oid, layout.fl_pg_pool, nspace);
   }
 
-  ceph_object_layout make_object_layout(object_t oid, int pg_pool) const;
+  ceph_object_layout make_object_layout(object_t oid, int pg_pool, string nspace) const;
 
   int get_pg_num(int pg_pool) const
   {
index a8b40b4b5cbd7f5f03d4b91cf6fc92b367a1f430..2fc7918b8102ec78833c1cc546ecce44e91452cf 100644 (file)
@@ -543,6 +543,10 @@ void ReplicatedPG::do_pg_op(OpRequestRef op)
            }
          }
 
+         // skip wrong namespace
+         if (candidate.get_namespace() != m->get_object_locator().nspace)
+           continue;
+
          if (filter && !pgls_filter(filter, candidate, filter_out))
            continue;
 
@@ -654,7 +658,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
 
   hobject_t head(m->get_oid(), m->get_object_locator().key,
                 CEPH_NOSNAP, m->get_pg().ps(),
-                info.pgid.pool());
+                info.pgid.pool(), m->get_object_locator().nspace);
 
   if (op->may_write() && scrubber.write_blocked_by_scrub(head)) {
     dout(20) << __func__ << ": waiting for scrub" << dendl;
@@ -682,7 +686,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
 
   // missing snapdir?
   hobject_t snapdir(m->get_oid(), m->get_object_locator().key,
-                   CEPH_SNAPDIR, m->get_pg().ps(), info.pgid.pool());
+                   CEPH_SNAPDIR, m->get_pg().ps(), info.pgid.pool(),
+                   m->get_object_locator().nspace);
   if (is_missing_object(snapdir)) {
     wait_for_missing_object(snapdir, op);
     return;
@@ -710,7 +715,8 @@ void ReplicatedPG::do_op(OpRequestRef op)
              m->get_object_locator().key,
              m->get_snapid(),
              m->get_pg().ps(),
-             m->get_object_locator().get_pool()),
+             m->get_object_locator().get_pool(),
+             m->get_object_locator().nspace),
     m->get_object_locator(),
     &obc, can_create, &snapid);
   if (r) {
@@ -725,7 +731,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
        assert(!can_create); // only happens on a read
        hobject_t soid(m->get_oid(), m->get_object_locator().key,
                       snapid, m->get_pg().ps(),
-                      info.pgid.pool());
+                      info.pgid.pool(), m->get_object_locator().nspace);
        wait_for_missing_object(soid, op);
        return;
       }
@@ -822,7 +828,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
       object_locator_t src_oloc;
       get_src_oloc(m->get_oid(), m->get_object_locator(), src_oloc);
       hobject_t src_oid(osd_op.soid, src_oloc.key, m->get_pg().ps(),
-                       info.pgid.pool());
+                       info.pgid.pool(), m->get_object_locator().nspace);
       if (!src_obc.count(src_oid)) {
        ObjectContext *sobc;
        snapid_t ssnapid;
@@ -831,7 +837,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
        if (r == -EAGAIN) {
          // missing the specific snap we need; requeue and wait.
          hobject_t wait_oid(osd_op.soid.oid, src_oloc.key, ssnapid, m->get_pg().ps(),
-                            info.pgid.pool());
+                            info.pgid.pool(), m->get_object_locator().nspace);
          wait_for_missing_object(wait_oid, op);
        } else if (r) {
          osd->reply_op_error(op, r);
@@ -887,7 +893,7 @@ void ReplicatedPG::do_op(OpRequestRef op)
        if (r == -EAGAIN) {
          // missing the specific snap we need; requeue and wait.
          hobject_t wait_oid(clone_oid.oid, src_oloc.key, ssnapid, m->get_pg().ps(),
-                            info.pgid.pool());
+                            info.pgid.pool(), clone_oid.get_namespace());
          wait_for_missing_object(wait_oid, op);
        } else if (r) {
          osd->reply_op_error(op, r);
@@ -1414,7 +1420,8 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
       coid.oid,
       coid.get_key(),
       coid.hash,
-      false);
+      false,
+      coid.get_namespace());
 
   assert(obc->ssc);
   SnapSet& snapset = obc->ssc->snapset;
@@ -1541,7 +1548,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid)
   hobject_t snapoid(
     coid.oid, coid.get_key(),
     snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash,
-    info.pgid.pool());
+    info.pgid.pool(), coid.get_namespace());
   ctx->snapset_obc = get_object_context(snapoid, coi.oloc, false);
   assert(ctx->snapset_obc->registered);
 
@@ -1997,10 +2004,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
     ObjectContext *src_obc = 0;
     if (ceph_osd_op_type_multi(op.op)) {
+      MOSDOp *m = static_cast<MOSDOp *>(ctx->op->request);
       object_locator_t src_oloc;
-      get_src_oloc(soid.oid, (static_cast<MOSDOp *>(ctx->op->request))->get_object_locator(), src_oloc);
+      get_src_oloc(soid.oid, m->get_object_locator(), src_oloc);
       hobject_t src_oid(osd_op.soid, src_oloc.key, soid.hash,
-                       info.pgid.pool());
+                       info.pgid.pool(), src_oloc.nspace);
       src_obc = ctx->src_obc[src_oid];
       dout(10) << " src_oid " << src_oid << " obc " << src_obc << dendl;
       assert(src_obc);
@@ -2373,7 +2381,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops)
 
         if (!ssc) {
          ssc = ctx->obc->ssc = get_snapset_context(soid.oid,
-                                                   soid.get_key(), soid.hash, false);
+                   soid.get_key(), soid.hash, false,  soid.get_namespace());
         }
         assert(ssc);
 
@@ -3306,7 +3314,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
 
   ObjectContext *rollback_to;
   int ret = find_object_context(
-    hobject_t(soid.oid, oi.oloc.key, snapid, soid.hash, info.pgid.pool()), 
+    hobject_t(soid.oid, oi.oloc.key, snapid, soid.hash, info.pgid.pool(), soid.get_namespace()),
     oi.oloc, &rollback_to, false, &cloneid);
   if (ret) {
     if (-ENOENT == ret) {
@@ -3326,7 +3334,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
        * with not-yet-restored object. We shouldn't have been able
        * to get here; recovery should have completed first! */
       hobject_t rollback_target(soid.oid, soid.get_key(), cloneid, soid.hash,
-                               info.pgid.pool());
+                               info.pgid.pool(), soid.get_namespace());
       assert(is_missing_object(rollback_target));
       dout(20) << "_rollback_to attempted to roll back to a missing object " 
               << rollback_target << " (requested snapid: ) " << snapid << dendl;
@@ -3683,7 +3691,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
     if (!head_existed) {
       // if we logically recreated the head, remove old _snapdir object
       hobject_t snapoid(soid.oid, soid.get_key(), CEPH_SNAPDIR, soid.hash,
-                       info.pgid.pool());
+                       info.pgid.pool(), soid.get_namespace());
 
       ctx->snapset_obc = get_object_context(snapoid, ctx->new_obs.oi.oloc, false);
       if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
@@ -3701,7 +3709,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
   } else if (ctx->new_snapset.clones.size()) {
     // save snapset on _snap
     hobject_t snapoid(soid.oid, soid.get_key(), CEPH_SNAPDIR, soid.hash,
-                     info.pgid.pool());
+                     info.pgid.pool(), soid.get_namespace());
     dout(10) << " final snapset " << ctx->new_snapset
             << " in " << snapoid << dendl;
     ctx->log.push_back(pg_log_entry_t(pg_log_entry_t::MODIFY, snapoid, ctx->at_version, old_version,
@@ -4445,7 +4453,7 @@ ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
 
       // new object.
       object_info_t oi(soid, oloc);
-      SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true);
+      SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace());
       return create_object_context(oi, ssc);
     }
 
@@ -4458,14 +4466,14 @@ ObjectContext *ReplicatedPG::get_object_context(const hobject_t& soid,
 
     SnapSetContext *ssc = NULL;
     if (can_create)
-      ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true);
+      ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace());
     obc = new ObjectContext(oi, true, ssc);
     obc->obs.exists = true;
 
     register_object_context(obc);
 
     if (can_create && !obc->ssc)
-      obc->ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true);
+      obc->ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true, soid.get_namespace());
 
     populate_obc_watchers(obc);
     dout(10) << "get_object_context " << obc << " " << soid << " 0 -> 1 read " << obc->obs.oi << dendl;
@@ -4504,9 +4512,9 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
                                      snapid_t *psnapid)
 {
   hobject_t head(oid.oid, oid.get_key(), CEPH_NOSNAP, oid.hash,
-                info.pgid.pool());
+                info.pgid.pool(), oid.get_namespace());
   hobject_t snapdir(oid.oid, oid.get_key(), CEPH_SNAPDIR, oid.hash,
-                   info.pgid.pool());
+                   info.pgid.pool(), oid.get_namespace());
 
   // want the snapdir?
   if (oid.snap == CEPH_SNAPDIR) {
@@ -4527,7 +4535,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
 
     // always populate ssc for SNAPDIR...
     if (!obc->ssc)
-      obc->ssc = get_snapset_context(oid.oid, oid.get_key(), oid.hash, true);
+      obc->ssc = get_snapset_context(oid.oid, oid.get_key(), oid.hash, true, oid.get_namespace());
     return 0;
   }
 
@@ -4540,13 +4548,13 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
     *pobc = obc;
 
     if (can_create && !obc->ssc)
-      obc->ssc = get_snapset_context(oid.oid, oid.get_key(), oid.hash, true);
+      obc->ssc = get_snapset_context(oid.oid, oid.get_key(), oid.hash, true, oid.get_namespace());
 
     return 0;
   }
 
   // we want a snap
-  SnapSetContext *ssc = get_snapset_context(oid.oid, oid.get_key(), oid.hash, can_create);
+  SnapSetContext *ssc = get_snapset_context(oid.oid, oid.get_key(), oid.hash, can_create, oid.get_namespace());
   if (!ssc)
     return -ENOENT;
 
@@ -4589,7 +4597,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
     return -ENOENT;
   }
   hobject_t soid(oid.oid, oid.get_key(), ssc->snapset.clones[k], oid.hash,
-                info.pgid.pool());
+                info.pgid.pool(), oid.get_namespace());
 
   put_snapset_context(ssc); // we're done with ssc
   ssc = 0;
@@ -4676,7 +4684,8 @@ void ReplicatedPG::add_object_context_to_pg_stat(ObjectContext *obc, pg_stat_t *
       obc->ssc = get_snapset_context(oi.soid.oid,
                                     oi.soid.get_key(),
                                     oi.soid.hash,
-                                    false);
+                                    false,
+                                    oi.soid.get_namespace());
     assert(obc->ssc);
 
     // subtract off clone overlap
@@ -4708,7 +4717,8 @@ SnapSetContext *ReplicatedPG::create_snapset_context(const object_t& oid)
 SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
                                                  const string& key,
                                                  ps_t seed,
-                                                 bool can_create)
+                                                 bool can_create,
+                                                 const string& nspace)
 {
   SnapSetContext *ssc;
   map<object_t, SnapSetContext*>::iterator p = snapset_contexts.find(oid);
@@ -4717,12 +4727,12 @@ SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
   } else {
     bufferlist bv;
     hobject_t head(oid, key, CEPH_NOSNAP, seed,
-                  info.pgid.pool());
+                  info.pgid.pool(), nspace);
     int r = osd->store->getattr(coll, head, SS_ATTR, bv);
     if (r < 0) {
       // try _snapset
       hobject_t snapdir(oid, key, CEPH_SNAPDIR, seed,
-                       info.pgid.pool());
+                       info.pgid.pool(), nspace);
       r = osd->store->getattr(coll, snapdir, SS_ATTR, bv);
       if (r < 0 && !can_create)
        return NULL;
@@ -5204,7 +5214,7 @@ int ReplicatedPG::pull(
     }
 
     // check snapset
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
+    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
     assert(ssc);
     dout(10) << " snapset " << ssc->snapset << dendl;
     calc_clone_subsets(ssc->snapset, soid, pg_log.get_missing(), info.last_backfill,
@@ -5290,7 +5300,7 @@ void ReplicatedPG::push_to_replica(
       return push_start(prio, obc, soid, peer);
     }
     
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
+    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
     assert(ssc);
     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
     calc_clone_subsets(ssc->snapset, soid, peer_missing[peer],
@@ -5300,7 +5310,7 @@ void ReplicatedPG::push_to_replica(
   } else if (soid.snap == CEPH_NOSNAP) {
     // pushing head or unversioned object.
     // base this on partially on replica's clones?
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
+    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false, soid.get_namespace());
     assert(ssc);
     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
     calc_head_subsets(obc, ssc->snapset, soid, peer_missing[peer],
@@ -5483,7 +5493,8 @@ ObjectRecoveryInfo ReplicatedPG::recalc_subsets(const ObjectRecoveryInfo& recove
   SnapSetContext *ssc = get_snapset_context(recovery_info.soid.oid,
                                            recovery_info.soid.get_key(),
                                            recovery_info.soid.hash,
-                                           false);
+                                           false,
+                                           recovery_info.soid.get_namespace());
   assert(ssc);
   ObjectRecoveryInfo new_info = recovery_info;
   new_info.copy_subset.clear();
@@ -5591,7 +5602,8 @@ void ReplicatedPG::handle_pull_response(OpRequestRef op)
       ssc = create_snapset_context(hoid.oid);
       ssc->snapset = pi.recovery_info.ss;
     } else {
-      ssc = get_snapset_context(hoid.oid, hoid.get_key(), hoid.hash, false);
+      ssc = get_snapset_context(hoid.oid, hoid.get_key(), hoid.hash, false,
+       hoid.get_namespace());
       assert(ssc);
     }
     ObjectContext *obc = create_object_context(pi.recovery_info.oi, ssc);
@@ -6207,6 +6219,7 @@ ObjectContext *ReplicatedPG::mark_object_lost(ObjectStore::Transaction *t,
   object_locator_t oloc;
   oloc.pool = info.pgid.pool();
   oloc.key = oid.get_key();
+  oloc.nspace = oid.get_namespace();
   ObjectContext *obc = get_object_context(oid, oloc, true);
 
   obc->ondisk_write_lock();
index d38fdcd51f2d6c08c37bb0ce1d5c4d703aec2416..fbcbaa77d2116fae87eda0545e8f50169833c632 100644 (file)
@@ -493,7 +493,7 @@ protected:
 
   SnapSetContext *create_snapset_context(const object_t& oid);
   SnapSetContext *get_snapset_context(const object_t& oid, const string &key,
-                                     ps_t seed, bool can_create);
+                                     ps_t seed, bool can_create, const string &nspace);
   void register_snapset_context(SnapSetContext *ssc) {
     if (!ssc->registered) {
       assert(snapset_contexts.count(ssc->oid) == 0);
index 2f05bb9c24685bd8ebf9f60820646db395c73c54..7628f4ef4e1c525adbd9ae9ca4f7adae31f8240d 100644 (file)
@@ -56,17 +56,18 @@ void osd_reqid_t::generate_test_instances(list<osd_reqid_t*>& o)
 
 void object_locator_t::encode(bufferlist& bl) const
 {
-  ENCODE_START(4, 3, bl);
+  ENCODE_START(5, 3, bl);
   ::encode(pool, bl);
   int32_t preferred = -1;  // tell old code there is no preferred osd (-1).
   ::encode(preferred, bl);
   ::encode(key, bl);
+  ::encode(nspace, bl);
   ENCODE_FINISH(bl);
 }
 
 void object_locator_t::decode(bufferlist::iterator& p)
 {
-  DECODE_START_LEGACY_COMPAT_LEN(4, 3, 3, p);
+  DECODE_START_LEGACY_COMPAT_LEN(5, 3, 3, p);
   if (struct_v < 2) {
     int32_t op;
     ::decode(op, p);
@@ -79,6 +80,8 @@ void object_locator_t::decode(bufferlist::iterator& p)
     ::decode(preferred, p);
   }
   ::decode(key, p);
+  if (struct_v >= 5)
+    ::decode(nspace, p);
   DECODE_FINISH(p);
 }
 
@@ -86,14 +89,16 @@ void object_locator_t::dump(Formatter *f) const
 {
   f->dump_int("pool", pool);
   f->dump_string("key", key);
+  f->dump_string("namespace", nspace);
 }
 
 void object_locator_t::generate_test_instances(list<object_locator_t*>& o)
 {
   o.push_back(new object_locator_t);
   o.push_back(new object_locator_t(123));
-  o.push_back(new object_locator_t(1234, "key"));
-  o.push_back(new object_locator_t(12, "key2"));
+  o.push_back(new object_locator_t(1, "n2"));
+  o.push_back(new object_locator_t(1234, "", "key"));
+  o.push_back(new object_locator_t(12, "n1", "key2"));
 }
 
 
@@ -1537,7 +1542,7 @@ void pg_info_t::generate_test_instances(list<pg_info_t*>& o)
   o.back()->last_update = eversion_t(3, 4);
   o.back()->last_complete = eversion_t(5, 6);
   o.back()->log_tail = eversion_t(7, 8);
-  o.back()->last_backfill = hobject_t(object_t("objname"), "key", 123, 456, -1);
+  o.back()->last_backfill = hobject_t(object_t("objname"), "key", 123, 456, -1, "");
   list<pg_stat_t*> s;
   pg_stat_t::generate_test_instances(s);
   o.back()->stats = *s.back();
@@ -1907,7 +1912,7 @@ void pg_log_entry_t::dump(Formatter *f) const
 void pg_log_entry_t::generate_test_instances(list<pg_log_entry_t*>& o)
 {
   o.push_back(new pg_log_entry_t());
-  hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+  hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
   o.push_back(new pg_log_entry_t(MODIFY, oid, eversion_t(1,2), eversion_t(3,4),
                                 osd_reqid_t(entity_name_t::CLIENT(777), 8, 999), utime_t(8,9)));
 }
@@ -2110,7 +2115,7 @@ void pg_missing_t::generate_test_instances(list<pg_missing_t*>& o)
 {
   o.push_back(new pg_missing_t);
   o.push_back(new pg_missing_t);
-  o.back()->add(hobject_t(object_t("foo"), "foo", 123, 456, 0), eversion_t(5, 6), eversion_t(5, 1));
+  o.back()->add(hobject_t(object_t("foo"), "foo", 123, 456, 0, ""), eversion_t(5, 6), eversion_t(5, 1));
 }
 
 ostream& operator<<(ostream& out, const pg_missing_t::item& i) 
@@ -2602,13 +2607,13 @@ void object_info_t::decode(bufferlist::iterator& bl)
     sobject_t obj;
     ::decode(obj, bl);
     ::decode(oloc, bl);
-    soid = hobject_t(obj.oid, oloc.key, obj.snap, 0, 0);
+    soid = hobject_t(obj.oid, oloc.key, obj.snap, 0, 0 , "");
     soid.hash = legacy_object_locator_to_ps(soid.oid, oloc);
   } else if (struct_v >= 6) {
     ::decode(soid, bl);
     ::decode(oloc, bl);
     if (struct_v == 6) {
-      hobject_t hoid(soid.oid, oloc.key, soid.snap, soid.hash, 0);
+      hobject_t hoid(soid.oid, oloc.key, soid.snap, soid.hash, 0 , "");
       soid = hoid;
     }
   }
@@ -2954,9 +2959,9 @@ void ScrubMap::generate_test_instances(list<ScrubMap*>& o)
   o.back()->attrs["bar"] = buffer::copy("barval", 6);
   list<object*> obj;
   object::generate_test_instances(obj);
-  o.back()->objects[hobject_t(object_t("foo"), "fookey", 123, 456, 0)] = *obj.back();
+  o.back()->objects[hobject_t(object_t("foo"), "fookey", 123, 456, 0, "")] = *obj.back();
   obj.pop_back();
-  o.back()->objects[hobject_t(object_t("bar"), string(), 123, 456, 0)] = *obj.back();
+  o.back()->objects[hobject_t(object_t("bar"), string(), 123, 456, 0, "")] = *obj.back();
 }
 
 // -- ScrubMap::object --
index f9625e523819c5ce94b6106ca02029d6f7d09021..9ec87d7b25e820cf206f3e626acec04bfcd959f1 100644 (file)
@@ -106,13 +106,16 @@ namespace __gnu_cxx {
 struct object_locator_t {
   int64_t pool;
   string key;
+  string nspace;
 
   explicit object_locator_t()
     : pool(-1) {}
   explicit object_locator_t(int64_t po)
     : pool(po) {}
-  explicit object_locator_t(int64_t po, string s)
-    : pool(po), key(s) {}
+  explicit object_locator_t(int64_t po, string ns)
+    : pool(po), nspace(ns) {}
+  explicit object_locator_t(int64_t po, string ns, string s)
+    : pool(po), key(s), nspace(ns) {}
 
   int64_t get_pool() const {
     return pool;
@@ -121,6 +124,7 @@ struct object_locator_t {
   void clear() {
     pool = -1;
     key = "";
+    nspace = "";
   }
 
   void encode(bufferlist& bl) const;
@@ -131,7 +135,7 @@ struct object_locator_t {
 WRITE_CLASS_ENCODER(object_locator_t)
 
 inline bool operator==(const object_locator_t& l, const object_locator_t& r) {
-  return l.pool == r.pool && l.key == r.key;
+  return l.pool == r.pool && l.key == r.key && l.nspace == r.nspace;
 }
 inline bool operator!=(const object_locator_t& l, const object_locator_t& r) {
   return !(l == r);
@@ -140,6 +144,8 @@ inline bool operator!=(const object_locator_t& l, const object_locator_t& r) {
 inline ostream& operator<<(ostream& out, const object_locator_t& loc)
 {
   out << "@" << loc.pool;
+  if (loc.nspace.length())
+    out << ";" << loc.nspace;
   if (loc.key.length())
     out << ":" << loc.key;
   return out;
@@ -1583,7 +1589,7 @@ struct pg_ls_response_t {
   static void generate_test_instances(list<pg_ls_response_t*>& o) {
     o.push_back(new pg_ls_response_t);
     o.push_back(new pg_ls_response_t);
-    o.back()->handle = hobject_t(object_t("hi"), "key", 1, 2, -1);
+    o.back()->handle = hobject_t(object_t("hi"), "key", 1, 2, -1, "");
     o.back()->entries.push_back(make_pair(object_t("one"), string()));
     o.back()->entries.push_back(make_pair(object_t("two"), string("twokey")));
   }
index 0481c6c94de7f23f345ee4841aeadd13334e3841..51fad69955512cb14ff935c077fd5691ca9f8445 100644 (file)
@@ -559,6 +559,7 @@ ObjectCacher::Object *ObjectCacher::get_object(sobject_t oid, ObjectSet *oset,
                                               uint64_t truncate_size,
                                               uint64_t truncate_seq)
 {
+  // XXX: Add handling of nspace in object_locator_t in cache
   assert(lock.is_locked());
   // have it?
   if ((uint32_t)l.pool < objects.size()) {
index 4441cfa77cff1ae79ec48eefd5ca0b30f8b4f311..ac2833b9ea68ef2f6be1f6b145505e12d183e205 100644 (file)
@@ -215,6 +215,7 @@ class ObjectCacher {
     object_t get_oid() { return oid.oid; }
     snapid_t get_snap() { return oid.snap; }
     ObjectSet *get_object_set() { return oset; }
+    string get_namespace() { return oloc.nspace; }
     
     object_locator_t& get_oloc() { return oloc; }
     void set_object_locator(object_locator_t& l) { oloc = l; }
index 1dfeb36e0305bf8a3877176f2adeaa5bd80318a0..45b4573bbb1fd5a15a564fd9029a46cefff7781d 100644 (file)
@@ -1680,7 +1680,7 @@ void Objecter::list_objects(ListContext *list_context, Context *onfinish) {
   C_List *onack = new C_List(list_context, onfinish, bl, this);
 
   object_t oid;
-  object_locator_t oloc(list_context->pool_id);
+  object_locator_t oloc(list_context->pool_id, list_context->nspace);
 
   // 
   Op *o = new Op(oid, oloc, op.ops, CEPH_OSD_FLAG_READ, onack, NULL, NULL);
index b4086f6a4138459ba161c98258e9464e53559bd5..319eb5da6eae427505ec1172f48d7dc3af320696 100644 (file)
@@ -880,6 +880,7 @@ public:
     int64_t pool_id;
     int pool_snap_seq;
     int max_entries;
+    string nspace;
     std::list<pair<object_t, string> > list;
 
     bufferlist filter;
index 7c51cee1a058c73770217d99e8125a4f42525017..c1adc7580fb677bb822af07114658912b4ae2ac6 100644 (file)
@@ -34,12 +34,15 @@ int main(int argc, char **argv)
   for (int i=0; i<4; i++)
     size[i] = 0;
 
-  for (int f = 0; f < 50000; f++) {  // files
+  for (int n = 0; n < 10; n++) {   // namespaces
+    char nspace[20];
+    snprintf(nspace, sizeof(nspace), "n%d", n);
+  for (int f = 0; f < 5000; f++) {  // files
     for (int b = 0; b < 4; b++) {   // blocks
       char foo[20];
       snprintf(foo, sizeof(foo), "%d.%d", f, b);
       object_t oid(foo);
-      ceph_object_layout l = osdmap.make_object_layout(oid, 0);
+      ceph_object_layout l = osdmap.make_object_layout(oid, 0, nspace);
        //osdmap.file_to_object_layout(oid, g_default_file_layout);
       vector<int> osds;
       pg_t pgid = pg_t(l.ol_pgid);
@@ -64,6 +67,7 @@ int main(int argc, char **argv)
       }
     }
   }
+  }
 
   uint64_t avg = 0;
   for (int i=0; i<n; i++) {
index abc5c5f1170ef1ee820e419be00ca206b92f9efe..87482ef702d8c3da72f81b81b9c2af3af8511cb6 100644 (file)
@@ -283,7 +283,7 @@ public:
     // hash
     //boost::binomial_distribution<uint32_t> bin(0xFFFFFF, 0.5);
     ++seq;
-    return hobject_t(name, string(), rand() & 2 ? CEPH_NOSNAP : rand(), rand() & 0xFF, 0);
+    return hobject_t(name, string(), rand() & 2 ? CEPH_NOSNAP : rand(), rand() & 0xFF, 0, "");
   }
 };
 
@@ -506,13 +506,16 @@ TEST_F(StoreTest, HashCollisionTest) {
   string base = "";
   for (int i = 0; i < 100; ++i) base.append("aaaaa");
   set<hobject_t> created;
+  for (int n = 0; n < 10; ++n) {
+    char nbuf[100];
+    sprintf(nbuf, "n%d", n);
   for (int i = 0; i < 1000; ++i) {
     char buf[100];
     sprintf(buf, "%d", i);
     if (!(i % 5)) {
-      cerr << "Object << i << std::endl;
+      cerr << "Object n" << n << " "<< i << std::endl;
     }
-    hobject_t hoid(string(buf) + base, string(), CEPH_NOSNAP, 0, 0);
+    hobject_t hoid(string(buf) + base, string(), CEPH_NOSNAP, 0, 0, string(nbuf));
     {
       ObjectStore::Transaction t;
       t.touch(cid, hoid);
@@ -521,6 +524,7 @@ TEST_F(StoreTest, HashCollisionTest) {
     }
     created.insert(hoid);
   }
+  }
   vector<hobject_t> objects;
   r = store->collection_list(cid, objects);
   ASSERT_EQ(r, 0);
@@ -572,7 +576,7 @@ TEST_F(StoreTest, HashCollisionTest) {
 
 TEST_F(StoreTest, OMapTest) {
   coll_t cid("blah");
-  hobject_t hoid("tesomap", "", CEPH_NOSNAP, 0, 0);
+  hobject_t hoid("tesomap", "", CEPH_NOSNAP, 0, 0, "");
   int r;
   {
     ObjectStore::Transaction t;
@@ -668,7 +672,7 @@ TEST_F(StoreTest, OMapTest) {
 
 TEST_F(StoreTest, XattrTest) {
   coll_t cid("blah");
-  hobject_t hoid("tesomap", "", CEPH_NOSNAP, 0, 0);
+  hobject_t hoid("tesomap", "", CEPH_NOSNAP, 0, 0, "");
   bufferlist big;
   for (unsigned i = 0; i < 10000; ++i) {
     big.append('\0');
@@ -770,7 +774,7 @@ void colsplittest(
          "",
          CEPH_NOSNAP,
          i<<common_suffix_size,
-         0));
+         0, ""));
     }
     r = store->apply_transaction(t);
     ASSERT_EQ(r, 0);
index 5d17c22a877b1ff608e498e1655ddf9342743be2..6db4f6c4aa5fb88372c4a8ac3cd22c6b22bd6516 100644 (file)
@@ -49,7 +49,7 @@ TEST(FlatIndex, collection) {
   uint64_t hash = 111;
   uint64_t pool = 222;
   const std::string object_name(10, 'A');
-  hobject_t hoid(object_t(object_name), key, CEPH_NOSNAP, hash, pool);
+  hobject_t hoid(object_t(object_name), key, CEPH_NOSNAP, hash, pool, "");
   vector<hobject_t> ls;
   ASSERT_DEATH(index.collection_list_partial(hoid, 0, 0, 0, &ls, &hoid), "0");
 }
@@ -70,7 +70,7 @@ TEST(FlatIndex, created_unlink) {
     CollectionIndex::IndexedPath indexed_path;
     index->set_ref(index);
     const std::string object_name(10, 'A');
-    hobject_t hoid(object_t(object_name), key, CEPH_NOSNAP, hash, pool);
+    hobject_t hoid(object_t(object_name), key, CEPH_NOSNAP, hash, pool, "");
     int exists;
     EXPECT_EQ(0, index->lookup(hoid, &indexed_path, &exists));
     EXPECT_EQ(0, exists);
@@ -88,7 +88,7 @@ TEST(FlatIndex, created_unlink) {
     CollectionIndex::IndexedPath indexed_path;
     index->set_ref(index);
     const std::string object_name(1024, 'A');
-    hobject_t hoid(object_t(object_name), key, CEPH_NOSNAP, hash, pool);
+    hobject_t hoid(object_t(object_name), key, CEPH_NOSNAP, hash, pool, "");
     int exists;
     EXPECT_EQ(0, index->lookup(hoid, &indexed_path, &exists));
     EXPECT_EQ(0, exists);
index eeacf1ab8fb644e2a34dd1b2d53f7a265e04c8a8..33dbfe532a9cb5a7be7e965c5b81aba7eec8813f 100644 (file)
@@ -101,9 +101,9 @@ TEST_F(TestHASH_INDEX_TAG, generate_and_parse_name) {
   uint64_t hash = 0xABABABAB;
   uint64_t pool = -1;
 
-  test_generate_and_parse(hobject_t(object_t(".A/B_\\C.D"), key, CEPH_NOSNAP, hash, pool),
+  test_generate_and_parse(hobject_t(object_t(".A/B_\\C.D"), key, CEPH_NOSNAP, hash, pool, ""),
                          "\\.A\\sB_\\\\C.D_head_ABABABAB");
-  test_generate_and_parse(hobject_t(object_t("DIR_A"), key, CEPH_NOSNAP, hash, pool),
+  test_generate_and_parse(hobject_t(object_t("DIR_A"), key, CEPH_NOSNAP, hash, pool, ""),
                          "\\dA_head_ABABABAB");
 }
 
@@ -123,11 +123,11 @@ TEST_F(TestHASH_INDEX_TAG_2, generate_and_parse_name) {
   {
     std::string name(".XA/B_\\C.D");
     name[1] = '\0';
-    hobject_t hoid(object_t(name), key, CEPH_NOSNAP, hash, pool);
+    hobject_t hoid(object_t(name), key, CEPH_NOSNAP, hash, pool, "");
 
     test_generate_and_parse(hoid, "\\.\\nA\\sB\\u\\\\C.D_KEY_head_ABABABAB");
   }
-  test_generate_and_parse(hobject_t(object_t("DIR_A"), key, CEPH_NOSNAP, hash, pool),
+  test_generate_and_parse(hobject_t(object_t("DIR_A"), key, CEPH_NOSNAP, hash, pool, ""),
                          "\\dA_KEY_head_ABABABAB");
 }
 
@@ -147,13 +147,13 @@ TEST_F(TestHOBJECT_WITH_POOL, generate_and_parse_name) {
   {
     std::string name(".XA/B_\\C.D");
     name[1] = '\0';
-    hobject_t hoid(object_t(name), key, CEPH_NOSNAP, hash, pool);
+    hobject_t hoid(object_t(name), key, CEPH_NOSNAP, hash, pool, "");
     hoid.nspace = "NSPACE";
 
     test_generate_and_parse(hoid, "\\.\\nA\\sB\\u\\\\C.D_KEY_head_ABABABAB_NSPACE_cdcdcdcd");
   }
   {
-    hobject_t hoid(object_t("DIR_A"), key, CEPH_NOSNAP, hash, pool);
+    hobject_t hoid(object_t("DIR_A"), key, CEPH_NOSNAP, hash, pool, "");
     hoid.nspace = "NSPACE";
 
     test_generate_and_parse(hoid, "\\dA_KEY_head_ABABABAB_NSPACE_cdcdcdcd");
index 9f5e68f526ebd53575ef1546f7451448404b5ad8..91f185d086f71c6b782bd4316e68e5f4618055d1 100644 (file)
@@ -668,7 +668,7 @@ TEST_F(PGLogTest, merge_log) {
     bool dirty_info = false;
     bool dirty_big_info = false;
 
-    hobject_t last_backfill(object_t("oname"), string("key"), 1, 234, 1);
+    hobject_t last_backfill(object_t("oname"), string("key"), 1, 234, 1, "");
     info.last_backfill = last_backfill;
     eversion_t stat_version(10, 1);
     info.stats.version = stat_version;
@@ -823,7 +823,7 @@ TEST_F(PGLogTest, merge_log) {
       olog.head = e.version;
     }
 
-    hobject_t last_backfill(object_t("oname"), string("key"), 1, 234, 1);
+    hobject_t last_backfill(object_t("oname"), string("key"), 1, 234, 1, "");
     info.last_backfill = last_backfill;
     eversion_t stat_version(10, 1);
     info.stats.version = stat_version;
@@ -1032,7 +1032,7 @@ TEST_F(PGLogTest, merge_log) {
       olog.head = e.version;
     }
 
-    hobject_t last_backfill(object_t("oname"), string("key"), 1, 234, 1);
+    hobject_t last_backfill(object_t("oname"), string("key"), 1, 234, 1, "");
     info.last_backfill = last_backfill;
     eversion_t stat_version(10, 1);
     info.stats.version = stat_version;
index db30de4f4e8e162a7f40e1637a1d944cdd6cb5ff..ee71b0e898cf8e47f4ace29f8f8ff1133dd19631 100644 (file)
@@ -22,6 +22,7 @@
 
 #include "FakeWriteback.h"
 
+// XXX: Only tests default namespace
 struct op_data {
   op_data(std::string oid, uint64_t offset, uint64_t len, bool read)
     : extent(oid, 0, offset, len, 0), is_read(read)
index c0a9a95b11a7e5bc8e0b642ad2719bbed698f5c8..7ec5fa121bf192144d11210df6a5304bd7a74a5b 100644 (file)
@@ -229,7 +229,7 @@ TEST(pg_missing_t, constructor)
 
 TEST(pg_missing_t, have_missing)
 {
-  hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+  hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
   pg_missing_t missing;
   EXPECT_FALSE(missing.have_missing());
   missing.add(oid, eversion_t(), eversion_t());
@@ -238,7 +238,7 @@ TEST(pg_missing_t, have_missing)
 
 TEST(pg_missing_t, swap)
 {
-  hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+  hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
   pg_missing_t missing;
   EXPECT_FALSE(missing.have_missing());
   missing.add(oid, eversion_t(), eversion_t());
@@ -256,7 +256,7 @@ TEST(pg_missing_t, is_missing)
 {
   // pg_missing_t::is_missing(const hobject_t& oid) const
   {
-    hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+    hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
     pg_missing_t missing;
     EXPECT_FALSE(missing.is_missing(oid));
     missing.add(oid, eversion_t(), eversion_t());
@@ -265,7 +265,7 @@ TEST(pg_missing_t, is_missing)
 
   // bool pg_missing_t::is_missing(const hobject_t& oid, eversion_t v) const
   {
-    hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+    hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
     pg_missing_t missing;
     eversion_t need(10,5);
     EXPECT_FALSE(missing.is_missing(oid, eversion_t()));
@@ -278,7 +278,7 @@ TEST(pg_missing_t, is_missing)
 
 TEST(pg_missing_t, have_old)
 {
-  hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+  hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
   pg_missing_t missing;
   EXPECT_EQ(eversion_t(), missing.have_old(oid));
   missing.add(oid, eversion_t(), eversion_t());
@@ -290,8 +290,8 @@ TEST(pg_missing_t, have_old)
 
 TEST(pg_missing_t, add_next_event)
 {
-  hobject_t oid(object_t("objname"), "key", 123, 456, 0);
-  hobject_t oid_other(object_t("other"), "key", 9123, 9456, 0);
+  hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
+  hobject_t oid_other(object_t("other"), "key", 9123, 9456, 0, "");
   eversion_t version(10,5);
   eversion_t prior_version(3,4);
   pg_log_entry_t sample_e(pg_log_entry_t::DELETE, oid, version, prior_version,
@@ -416,7 +416,7 @@ TEST(pg_missing_t, add_next_event)
 
 TEST(pg_missing_t, revise_need)
 {
-  hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+  hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
   pg_missing_t missing;
   // create a new entry
   EXPECT_FALSE(missing.is_missing(oid));
@@ -437,7 +437,7 @@ TEST(pg_missing_t, revise_need)
 
 TEST(pg_missing_t, revise_have)
 {
-  hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+  hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
   pg_missing_t missing;
   // a non existing entry means noop
   EXPECT_FALSE(missing.is_missing(oid));
@@ -457,7 +457,7 @@ TEST(pg_missing_t, revise_have)
 
 TEST(pg_missing_t, add)
 {
-  hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+  hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
   pg_missing_t missing;
   EXPECT_FALSE(missing.is_missing(oid));
   eversion_t have(1,1);
@@ -472,7 +472,7 @@ TEST(pg_missing_t, rm)
 {
   // void pg_missing_t::rm(const hobject_t& oid, eversion_t v)
   {
-    hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+    hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
     pg_missing_t missing;
     EXPECT_FALSE(missing.is_missing(oid));
     epoch_t epoch = 10;
@@ -488,7 +488,7 @@ TEST(pg_missing_t, rm)
   }
   // void pg_missing_t::rm(const std::map<hobject_t, pg_missing_t::item>::iterator &m)
   {
-    hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+    hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
     pg_missing_t missing;
     EXPECT_FALSE(missing.is_missing(oid));
     missing.add(oid, eversion_t(), eversion_t());
@@ -503,7 +503,7 @@ TEST(pg_missing_t, got)
 {
   // void pg_missing_t::got(const hobject_t& oid, eversion_t v)
   {
-    hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+    hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
     pg_missing_t missing;
     // assert if the oid does not exist
     EXPECT_THROW(missing.got(oid, eversion_t()), FailedAssertion);
@@ -520,7 +520,7 @@ TEST(pg_missing_t, got)
   }
   // void pg_missing_t::got(const std::map<hobject_t, pg_missing_t::item>::iterator &m)
   {
-    hobject_t oid(object_t("objname"), "key", 123, 456, 0);
+    hobject_t oid(object_t("objname"), "key", 123, 456, 0, "");
     pg_missing_t missing;
     EXPECT_FALSE(missing.is_missing(oid));
     missing.add(oid, eversion_t(), eversion_t());
@@ -534,9 +534,9 @@ TEST(pg_missing_t, got)
 TEST(pg_missing_t, split_into)
 {
   uint32_t hash1 = 1;
-  hobject_t oid1(object_t("objname"), "key1", 123, hash1, 0);
+  hobject_t oid1(object_t("objname"), "key1", 123, hash1, 0, "");
   uint32_t hash2 = 2;
-  hobject_t oid2(object_t("objname"), "key2", 123, hash2, 0);
+  hobject_t oid2(object_t("objname"), "key2", 123, hash2, 0, "");
   pg_missing_t missing;
   missing.add(oid1, eversion_t(), eversion_t());
   missing.add(oid2, eversion_t(), eversion_t());  
index 9e44ba32d97775fd7e95656c3d97056f81962e4e..41ba0446961e7f72dd0f2a1759a570ec5671f666 100644 (file)
@@ -462,7 +462,7 @@ public:
       random_string(1+(rand() % 16)),
       snapid_t(rand() % 1000),
       (rand() & ((~0)<<bits)) | (mask & ~((~0)<<bits)),
-      0);
+      0, random_string(rand() % 16));
   }
 
   void choose_random_snaps(int num, set<snapid_t> *snaps) {