]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
object.h: make hobject key private, prevent objects where key == oid
authorSamuel Just <samuel.just@dreamhost.com>
Wed, 7 Sep 2011 21:25:02 +0000 (14:25 -0700)
committerSamuel Just <samuel.just@dreamhost.com>
Wed, 7 Sep 2011 22:10:33 +0000 (15:10 -0700)
There should be no difference between an object with a key identical to
its name and an object with the same name but no key.

Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/include/object.h
src/os/LFNIndex.cc
src/osd/PG.cc
src/osd/ReplicatedPG.cc
src/osd/osd_types.cc

index 706caa6bffcc2061bb79b1bdd413ccac81daa022..f24587a55c62fda20ce22a9894bad9f10ae62ff6 100644 (file)
@@ -261,15 +261,25 @@ namespace __gnu_cxx {
 
 struct hobject_t {
   object_t oid;
-  string key;
   snapid_t snap;
   uint32_t hash;
 
+private:
+  string key;
+
+public:
+  const string &get_key() const {
+    return key;
+  }
+
   hobject_t() : snap(0), hash(0) {}
   hobject_t(object_t oid, const string &key, snapid_t snap, uint32_t hash) : 
-    oid(oid), key(key), snap(snap), hash(hash) {}
+    oid(oid), snap(snap), hash(hash), 
+    key(oid.name == key ? string() : key) {}
+
   hobject_t(const sobject_t &soid, const string &key, uint32_t hash) : 
-    oid(soid.oid), key(key), snap(soid.snap), hash(hash) {}
+    oid(soid.oid), snap(soid.snap), hash(hash),
+    key(soid.oid.name == key ? string() : soid.oid.name) {}
 
   /* Do not use when a particular hash function is needed */
   explicit hobject_t(const sobject_t &o) :
index 557b69a564dcf410d3eab0eff57ab122c90e9164..210244efb5eee658965dcb63b8a8bde587c4e685 100644 (file)
@@ -502,7 +502,7 @@ string LFNIndex::lfn_generate_object_name(const hobject_t &hoid) {
   }
   append_escaped(i, hoid.oid.name.end(), &full_name);
   full_name.append("_");
-  append_escaped(hoid.key.begin(), hoid.key.end(), &full_name);
+  append_escaped(hoid.get_key().begin(), hoid.get_key().end(), &full_name);
   full_name.append("_");
 
   char snap_with_hash[PATH_MAX];
@@ -763,6 +763,11 @@ static bool append_unescaped(string::const_iterator begin,
 }
 
 bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) {
+  string name;
+  string key;
+  uint32_t hash;
+  snapid_t snap;
+
   if (index_version == HASH_INDEX_TAG)
     return lfn_parse_object_name_keyless(long_name, out);
 
@@ -772,10 +777,10 @@ bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) {
     if (current == long_name.end()) {
       return false;
     } else if (*current == 'd') {
-      out->oid.name.append("DIR_");
+      name.append("DIR_");
       ++current;
     } else if (*current == '.') {
-      out->oid.name.append(".");
+      name.append(".");
       ++current;
     } else {
       --current;
@@ -786,35 +791,37 @@ bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) {
   for ( ; end != long_name.end() && *end != '_'; ++end) ;
   if (end == long_name.end())
     return false;
-  if (!append_unescaped(current, end, &(out->oid.name)))
+  if (!append_unescaped(current, end, &name))
     return false;
 
   current = ++end;
   for ( ; end != long_name.end() && *end != '_'; ++end) ;
   if (end == long_name.end())
     return false;
-  if (!append_unescaped(current, end, &(out->key)))
+  if (!append_unescaped(current, end, &key))
     return false;
 
   current = ++end;
   for ( ; end != long_name.end() && *end != '_'; ++end) ;
   if (end == long_name.end())
     return false;
-  string snap(current, end);
+  string snap_str(current, end);
 
   current = ++end;
   for ( ; end != long_name.end() && *end != '_'; ++end) ;
   if (end != long_name.end())
     return false;
-  string hash(current, end);
+  string hash_str(current, end);
 
-  if (snap == "head")
-    out->snap = CEPH_NOSNAP;
-  else if (snap == "snapdir")
-    out->snap = CEPH_SNAPDIR;
+  if (snap_str == "head")
+    snap = CEPH_NOSNAP;
+  else if (snap_str == "snapdir")
+    snap = CEPH_SNAPDIR;
   else
-    out->snap = strtoull(snap.c_str(), NULL, 16);
-  sscanf(hash.c_str(), "%X", &out->hash);
+    snap = strtoull(snap_str.c_str(), NULL, 16);
+  sscanf(hash_str.c_str(), "%X", &hash);
+
+  (*out) = hobject_t(name, key, snap, hash);
   return true;
 }
 
index d77639c2a3b706eddc448b6b2faa9c5b4702f7fe..42d98bef16133572500f99a0096c9cdcb913bc57 100644 (file)
@@ -2249,8 +2249,7 @@ void PG::read_log(ObjectStore *store)
             i != ls.end();
             ++i) {
          if (i->oid == e.soid.oid && i->snap == e.soid.snap) {
-           e.soid.hash = i->hash;
-           e.soid.key = i->key;
+           e.soid = *i;
            found = true;
            break;
          }
index 700b02c01c3327099a66cd819aafb63a14534197..81850f4bcb493d6c4ddb1596eba8f5666c1467b7 100644 (file)
@@ -821,7 +821,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
   // load clone info
   bufferlist bl;
   ObjectContext *obc = 0;
-  int r = find_object_context(hobject_t(coid.oid, coid.key, sn, coid.hash),
+  int r = find_object_context(hobject_t(coid.oid, coid.get_key(), sn, coid.hash),
                              OLOC_BLANK, &obc, false, NULL);
   if (r == -ENOENT || coid.snap != obc->obs.oi.soid.snap) {
     if (obc) put_object_context(obc);
@@ -834,7 +834,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
 
   // get snap set context
   if (!obc->ssc)
-    obc->ssc = get_snapset_context(coid.oid, coid.key, coid.hash, false);
+    obc->ssc = get_snapset_context(coid.oid, coid.get_key(), coid.hash, false);
   SnapSetContext *ssc = obc->ssc;
   assert(ssc);
   SnapSet& snapset = ssc->snapset;
@@ -931,7 +931,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
   // save head snapset
   dout(10) << coid << " new snapset " << snapset << dendl;
 
-  hobject_t snapoid(coid.oid, coid.key, snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash);
+  hobject_t snapoid(coid.oid, coid.get_key(), snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash);
   ctx->snapset_obc = get_object_context(snapoid, coi.oloc, false);
   assert(ctx->snapset_obc->registered);
   if (snapset.clones.empty() && !snapset.head_exists) {
@@ -1162,7 +1162,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
 
     ObjectContext *src_obc = 0;
     if (ceph_osd_op_type_multi(op.op)) {
-      src_obc = ctx->src_obc[hobject_t(osd_op.soid, soid.key, soid.hash)];
+      src_obc = ctx->src_obc[hobject_t(osd_op.soid, soid.get_key(), soid.hash)];
       assert(src_obc);
     }
 
@@ -1683,7 +1683,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
          result = -EINVAL;
          break;
        }
-       t.clone_range(coll, hobject_t(osd_op.soid, obs.oi.soid.key
+       t.clone_range(coll, hobject_t(osd_op.soid, obs.oi.soid.get_key()
                                      obs.oi.soid.hash),
                      obs.oi.soid, op.clonerange.src_offset,
                      op.clonerange.length, op.clonerange.offset);
@@ -2117,7 +2117,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
       /* a different problem, like degraded pool
        * with not-yet-restored object. We shouldn't have been able
        * to get here; recovery should have completed first! */
-      hobject_t rollback_target(soid.oid, soid.key, cloneid, soid.hash);
+      hobject_t rollback_target(soid.oid, soid.get_key(), cloneid, soid.hash);
       assert(is_missing_object(rollback_target));
       dout(20) << "_rollback_to attempted to roll back to a missing object " 
               << rollback_target << " (requested snapid: ) " << snapid << dendl;
@@ -2554,7 +2554,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
     ctx->op_t.setattr(coll, soid, SS_ATTR, bss);   
     if (!head_existed) {
       // if we logically recreated the head, remove old _snapdir object
-      hobject_t snapoid(soid.oid, soid.key, CEPH_SNAPDIR, soid.hash);
+      hobject_t snapoid(soid.oid, soid.get_key(), CEPH_SNAPDIR, soid.hash);
 
       ctx->snapset_obc = get_object_context(snapoid, ctx->new_obs.oi.oloc, false);
       if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
@@ -2571,7 +2571,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
     }
   } else if (ctx->new_snapset.clones.size()) {
     // save snapset on _snap
-    hobject_t snapoid(soid.oid, soid.key, CEPH_SNAPDIR, soid.hash);
+    hobject_t snapoid(soid.oid, soid.get_key(), CEPH_SNAPDIR, soid.hash);
     dout(10) << " final snapset " << ctx->new_snapset
             << " in " << snapoid << dendl;
     ctx->at_version.version++;
@@ -3026,13 +3026,13 @@ ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& s
 
       SnapSetContext *ssc = NULL;
       if (can_create)
-       ssc = get_snapset_context(soid.oid, soid.key, soid.hash, true);
+       ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true);
       obc = new ObjectContext(oi, true, ssc);
     }
     register_object_context(obc);
 
     if (can_create && !obc->ssc)
-      obc->ssc = get_snapset_context(soid.oid, soid.key, soid.hash, true);
+      obc->ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true);
 
     if (r >= 0) {
       obc->obs.oi.decode(bv);
@@ -3067,7 +3067,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
                                      snapid_t *psnapid)
 {
   // want the head?
-  hobject_t head(oid.oid, oid.key, CEPH_NOSNAP, oid.hash);
+  hobject_t head(oid.oid, oid.get_key(), CEPH_NOSNAP, oid.hash);
   if (oid.snap == CEPH_NOSNAP) {
     ObjectContext *obc = get_object_context(head, oloc, can_create);
     if (!obc)
@@ -3076,13 +3076,13 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
     *pobc = obc;
 
     if (can_create && !obc->ssc)
-      obc->ssc = get_snapset_context(oid.oid, oid.key, oid.hash, true);
+      obc->ssc = get_snapset_context(oid.oid, oid.get_key(), oid.hash, true);
 
     return 0;
   }
 
   // we want a snap
-  SnapSetContext *ssc = get_snapset_context(oid.oid, oid.key, oid.hash, can_create);
+  SnapSetContext *ssc = get_snapset_context(oid.oid, oid.get_key(), oid.hash, can_create);
   if (!ssc)
     return -ENOENT;
 
@@ -3124,7 +3124,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
     put_snapset_context(ssc);
     return -ENOENT;
   }
-  hobject_t soid(oid.oid, oid.key, ssc->snapset.clones[k], oid.hash);
+  hobject_t soid(oid.oid, oid.get_key(), ssc->snapset.clones[k], oid.hash);
 
   put_snapset_context(ssc); // we're done with ssc
   ssc = 0;
@@ -3613,7 +3613,7 @@ int ReplicatedPG::pull(const hobject_t& soid)
     }
 
     // check snapset
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
+    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
     dout(10) << " snapset " << ssc->snapset << dendl;
     calc_clone_subsets(ssc->snapset, soid, missing,
                       data_subset, clone_subsets);
@@ -3721,7 +3721,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in
       return push_start(snapdir, peer);
     }
     
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
+    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
     calc_clone_subsets(ssc->snapset, soid, peer_missing[peer],
                       data_subset, clone_subsets);
@@ -3729,7 +3729,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in
   } else if (soid.snap == CEPH_NOSNAP) {
     // pushing head or unversioned object.
     // base this on partially on replica's clones?
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
+    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
     calc_head_subsets(ssc->snapset, soid, peer_missing[peer], data_subset, clone_subsets);
     put_snapset_context(ssc);
@@ -4073,7 +4073,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
 
     if (soid.snap && soid.snap < CEPH_NOSNAP) {
       // clone.  make sure we have enough data.
-      SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
+      SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false);
       assert(ssc);
 
       clone_subsets.clear();   // forget what pusher said; recalculate cloning.
index 2bdf099d0cf6319f1ed023a44924c95a5251b3f3..292836e6fb3ca4409dca43ef06ae91499842e07b 100644 (file)
@@ -608,8 +608,10 @@ void object_info_t::decode(bufferlist::iterator& bl)
   } else if (v >= 6) {
     ::decode(soid, bl);
     ::decode(oloc, bl);
-    if (v == 6)
-      soid.key = oloc.key;
+    if (v == 6) {
+      hobject_t hoid(soid.oid, oloc.key, soid.snap, hoid.hash);
+      soid = hoid;
+    }
   }
     
   if (v >= 5)