From bf1963c2ef6e33e6a7e0570b55810c1e9377178a Mon Sep 17 00:00:00 2001 From: Samuel Just Date: Wed, 7 Sep 2011 14:25:02 -0700 Subject: [PATCH] object.h: make hobject key private, prevent objects where key == oid There should be no difference between an object with a key identical to its name and an object with the same name but no key. Signed-off-by: Samuel Just --- src/include/object.h | 16 +++++++++++++--- src/os/LFNIndex.cc | 33 ++++++++++++++++++++------------- src/osd/PG.cc | 3 +-- src/osd/ReplicatedPG.cc | 36 ++++++++++++++++++------------------ src/osd/osd_types.cc | 6 ++++-- 5 files changed, 56 insertions(+), 38 deletions(-) diff --git a/src/include/object.h b/src/include/object.h index 706caa6bffcc2..f24587a55c62f 100644 --- a/src/include/object.h +++ b/src/include/object.h @@ -261,15 +261,25 @@ namespace __gnu_cxx { struct hobject_t { object_t oid; - string key; snapid_t snap; uint32_t hash; +private: + string key; + +public: + const string &get_key() const { + return key; + } + hobject_t() : snap(0), hash(0) {} hobject_t(object_t oid, const string &key, snapid_t snap, uint32_t hash) : - oid(oid), key(key), snap(snap), hash(hash) {} + oid(oid), snap(snap), hash(hash), + key(oid.name == key ? string() : key) {} + hobject_t(const sobject_t &soid, const string &key, uint32_t hash) : - oid(soid.oid), key(key), snap(soid.snap), hash(hash) {} + oid(soid.oid), snap(soid.snap), hash(hash), + key(soid.oid.name == key ? string() : soid.oid.name) {} /* Do not use when a particular hash function is needed */ explicit hobject_t(const sobject_t &o) : diff --git a/src/os/LFNIndex.cc b/src/os/LFNIndex.cc index 557b69a564dcf..210244efb5eee 100644 --- a/src/os/LFNIndex.cc +++ b/src/os/LFNIndex.cc @@ -502,7 +502,7 @@ string LFNIndex::lfn_generate_object_name(const hobject_t &hoid) { } append_escaped(i, hoid.oid.name.end(), &full_name); full_name.append("_"); - append_escaped(hoid.key.begin(), hoid.key.end(), &full_name); + append_escaped(hoid.get_key().begin(), hoid.get_key().end(), &full_name); full_name.append("_"); char snap_with_hash[PATH_MAX]; @@ -763,6 +763,11 @@ static bool append_unescaped(string::const_iterator begin, } bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) { + string name; + string key; + uint32_t hash; + snapid_t snap; + if (index_version == HASH_INDEX_TAG) return lfn_parse_object_name_keyless(long_name, out); @@ -772,10 +777,10 @@ bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) { if (current == long_name.end()) { return false; } else if (*current == 'd') { - out->oid.name.append("DIR_"); + name.append("DIR_"); ++current; } else if (*current == '.') { - out->oid.name.append("."); + name.append("."); ++current; } else { --current; @@ -786,35 +791,37 @@ bool LFNIndex::lfn_parse_object_name(const string &long_name, hobject_t *out) { for ( ; end != long_name.end() && *end != '_'; ++end) ; if (end == long_name.end()) return false; - if (!append_unescaped(current, end, &(out->oid.name))) + if (!append_unescaped(current, end, &name)) return false; current = ++end; for ( ; end != long_name.end() && *end != '_'; ++end) ; if (end == long_name.end()) return false; - if (!append_unescaped(current, end, &(out->key))) + if (!append_unescaped(current, end, &key)) return false; current = ++end; for ( ; end != long_name.end() && *end != '_'; ++end) ; if (end == long_name.end()) return false; - string snap(current, end); + string snap_str(current, end); current = ++end; for ( ; end != long_name.end() && *end != '_'; ++end) ; if (end != long_name.end()) return false; - string hash(current, end); + string hash_str(current, end); - if (snap == "head") - out->snap = CEPH_NOSNAP; - else if (snap == "snapdir") - out->snap = CEPH_SNAPDIR; + if (snap_str == "head") + snap = CEPH_NOSNAP; + else if (snap_str == "snapdir") + snap = CEPH_SNAPDIR; else - out->snap = strtoull(snap.c_str(), NULL, 16); - sscanf(hash.c_str(), "%X", &out->hash); + snap = strtoull(snap_str.c_str(), NULL, 16); + sscanf(hash_str.c_str(), "%X", &hash); + + (*out) = hobject_t(name, key, snap, hash); return true; } diff --git a/src/osd/PG.cc b/src/osd/PG.cc index d77639c2a3b70..42d98bef16133 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -2249,8 +2249,7 @@ void PG::read_log(ObjectStore *store) i != ls.end(); ++i) { if (i->oid == e.soid.oid && i->snap == e.soid.snap) { - e.soid.hash = i->hash; - e.soid.key = i->key; + e.soid = *i; found = true; break; } diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 700b02c01c332..81850f4bcb493 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -821,7 +821,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid, // load clone info bufferlist bl; ObjectContext *obc = 0; - int r = find_object_context(hobject_t(coid.oid, coid.key, sn, coid.hash), + int r = find_object_context(hobject_t(coid.oid, coid.get_key(), sn, coid.hash), OLOC_BLANK, &obc, false, NULL); if (r == -ENOENT || coid.snap != obc->obs.oi.soid.snap) { if (obc) put_object_context(obc); @@ -834,7 +834,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid, // get snap set context if (!obc->ssc) - obc->ssc = get_snapset_context(coid.oid, coid.key, coid.hash, false); + obc->ssc = get_snapset_context(coid.oid, coid.get_key(), coid.hash, false); SnapSetContext *ssc = obc->ssc; assert(ssc); SnapSet& snapset = ssc->snapset; @@ -931,7 +931,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid, // save head snapset dout(10) << coid << " new snapset " << snapset << dendl; - hobject_t snapoid(coid.oid, coid.key, snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash); + hobject_t snapoid(coid.oid, coid.get_key(), snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash); ctx->snapset_obc = get_object_context(snapoid, coi.oloc, false); assert(ctx->snapset_obc->registered); if (snapset.clones.empty() && !snapset.head_exists) { @@ -1162,7 +1162,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, ObjectContext *src_obc = 0; if (ceph_osd_op_type_multi(op.op)) { - src_obc = ctx->src_obc[hobject_t(osd_op.soid, soid.key, soid.hash)]; + src_obc = ctx->src_obc[hobject_t(osd_op.soid, soid.get_key(), soid.hash)]; assert(src_obc); } @@ -1683,7 +1683,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector& ops, result = -EINVAL; break; } - t.clone_range(coll, hobject_t(osd_op.soid, obs.oi.soid.key, + t.clone_range(coll, hobject_t(osd_op.soid, obs.oi.soid.get_key(), obs.oi.soid.hash), obs.oi.soid, op.clonerange.src_offset, op.clonerange.length, op.clonerange.offset); @@ -2117,7 +2117,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op) /* a different problem, like degraded pool * with not-yet-restored object. We shouldn't have been able * to get here; recovery should have completed first! */ - hobject_t rollback_target(soid.oid, soid.key, cloneid, soid.hash); + hobject_t rollback_target(soid.oid, soid.get_key(), cloneid, soid.hash); assert(is_missing_object(rollback_target)); dout(20) << "_rollback_to attempted to roll back to a missing object " << rollback_target << " (requested snapid: ) " << snapid << dendl; @@ -2554,7 +2554,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) ctx->op_t.setattr(coll, soid, SS_ATTR, bss); if (!head_existed) { // if we logically recreated the head, remove old _snapdir object - hobject_t snapoid(soid.oid, soid.key, CEPH_SNAPDIR, soid.hash); + hobject_t snapoid(soid.oid, soid.get_key(), CEPH_SNAPDIR, soid.hash); ctx->snapset_obc = get_object_context(snapoid, ctx->new_obs.oi.oloc, false); if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) { @@ -2571,7 +2571,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx) } } else if (ctx->new_snapset.clones.size()) { // save snapset on _snap - hobject_t snapoid(soid.oid, soid.key, CEPH_SNAPDIR, soid.hash); + hobject_t snapoid(soid.oid, soid.get_key(), CEPH_SNAPDIR, soid.hash); dout(10) << " final snapset " << ctx->new_snapset << " in " << snapoid << dendl; ctx->at_version.version++; @@ -3026,13 +3026,13 @@ ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& s SnapSetContext *ssc = NULL; if (can_create) - ssc = get_snapset_context(soid.oid, soid.key, soid.hash, true); + ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true); obc = new ObjectContext(oi, true, ssc); } register_object_context(obc); if (can_create && !obc->ssc) - obc->ssc = get_snapset_context(soid.oid, soid.key, soid.hash, true); + obc->ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, true); if (r >= 0) { obc->obs.oi.decode(bv); @@ -3067,7 +3067,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, snapid_t *psnapid) { // want the head? - hobject_t head(oid.oid, oid.key, CEPH_NOSNAP, oid.hash); + hobject_t head(oid.oid, oid.get_key(), CEPH_NOSNAP, oid.hash); if (oid.snap == CEPH_NOSNAP) { ObjectContext *obc = get_object_context(head, oloc, can_create); if (!obc) @@ -3076,13 +3076,13 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, *pobc = obc; if (can_create && !obc->ssc) - obc->ssc = get_snapset_context(oid.oid, oid.key, oid.hash, true); + obc->ssc = get_snapset_context(oid.oid, oid.get_key(), oid.hash, true); return 0; } // we want a snap - SnapSetContext *ssc = get_snapset_context(oid.oid, oid.key, oid.hash, can_create); + SnapSetContext *ssc = get_snapset_context(oid.oid, oid.get_key(), oid.hash, can_create); if (!ssc) return -ENOENT; @@ -3124,7 +3124,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid, put_snapset_context(ssc); return -ENOENT; } - hobject_t soid(oid.oid, oid.key, ssc->snapset.clones[k], oid.hash); + hobject_t soid(oid.oid, oid.get_key(), ssc->snapset.clones[k], oid.hash); put_snapset_context(ssc); // we're done with ssc ssc = 0; @@ -3613,7 +3613,7 @@ int ReplicatedPG::pull(const hobject_t& soid) } // check snapset - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false); + SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false); dout(10) << " snapset " << ssc->snapset << dendl; calc_clone_subsets(ssc->snapset, soid, missing, data_subset, clone_subsets); @@ -3721,7 +3721,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in return push_start(snapdir, peer); } - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false); + SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false); dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; calc_clone_subsets(ssc->snapset, soid, peer_missing[peer], data_subset, clone_subsets); @@ -3729,7 +3729,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in } else if (soid.snap == CEPH_NOSNAP) { // pushing head or unversioned object. // base this on partially on replica's clones? - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false); + SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false); dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl; calc_head_subsets(ssc->snapset, soid, peer_missing[peer], data_subset, clone_subsets); put_snapset_context(ssc); @@ -4073,7 +4073,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op) if (soid.snap && soid.snap < CEPH_NOSNAP) { // clone. make sure we have enough data. - SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false); + SnapSetContext *ssc = get_snapset_context(soid.oid, soid.get_key(), soid.hash, false); assert(ssc); clone_subsets.clear(); // forget what pusher said; recalculate cloning. diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 2bdf099d0cf63..292836e6fb3ca 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -608,8 +608,10 @@ void object_info_t::decode(bufferlist::iterator& bl) } else if (v >= 6) { ::decode(soid, bl); ::decode(oloc, bl); - if (v == 6) - soid.key = oloc.key; + if (v == 6) { + hobject_t hoid(soid.oid, oloc.key, soid.snap, hoid.hash); + soid = hoid; + } } if (v >= 5) -- 2.39.5