]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
ReplicatedPG: Update hobject_t construction
authorSamuel Just <samuel.just@dreamhost.com>
Thu, 1 Sep 2011 23:08:50 +0000 (16:08 -0700)
committerSamuel Just <samuel.just@dreamhost.com>
Fri, 2 Sep 2011 22:06:36 +0000 (15:06 -0700)
hobject_t now must be supplied with a key during construction.
Also, get_snapset_context must be supplied with the locator key.

Signed-off-by: Samuel Just <samuel.just@dreamhost.com>
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index b9a50f959363431a77533ea5c288ca0d09025873..73b42acf13364d1ffa2901d2de8925e34fafdaaf 100644 (file)
@@ -404,7 +404,9 @@ void ReplicatedPG::do_op(MOSDOp *op)
   ObjectContext *obc;
   bool can_create = op->may_write();
   snapid_t snapid;
-  int r = find_object_context(hobject_t(op->get_oid(), op->get_snapid(), op->get_pg().ps()),
+  int r = find_object_context(hobject_t(op->get_oid(), 
+                                       op->get_object_locator().key,
+                                       op->get_snapid(), op->get_pg().ps()),
                              op->get_object_locator(),
                              &obc, can_create, &snapid);
 
@@ -416,7 +418,8 @@ void ReplicatedPG::do_op(MOSDOp *op)
       if (is_primary() || (!(op->get_rmw_flags() & CEPH_OSD_FLAG_LOCALIZE_READS))) {
        // missing the specific snap we need; requeue and wait.
        assert(!can_create); // only happens on a read
-       hobject_t soid(op->get_oid(), snapid, op->get_pg().ps());
+       hobject_t soid(op->get_oid(), op->get_object_locator().key,
+                      snapid, op->get_pg().ps());
        wait_for_missing_object(soid, op);
        return;
       }
@@ -476,17 +479,19 @@ void ReplicatedPG::do_op(MOSDOp *op)
   map<hobject_t,ObjectContext*> src_obc;
   for (vector<OSDOp>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
     OSDOp& osd_op = *p;
-    hobject_t toid(osd_op.soid, op->get_pg().ps());
+    hobject_t toid(osd_op.soid, op->get_object_locator().key, op->get_pg().ps());
     if (osd_op.soid.oid.name.length()) {
       if (!src_obc.count(toid)) {
        ObjectContext *sobc;
        snapid_t ssnapid;
-       int r = find_object_context(hobject_t(toid.oid, toid.snap, op->get_pg().ps()),
+       int r = find_object_context(hobject_t(toid.oid, op->get_object_locator().key,
+                                             toid.snap, op->get_pg().ps()),
                                    op->get_object_locator(),
                                    &sobc, false, &ssnapid);
        if (r == -EAGAIN) {
          // missing the specific snap we need; requeue and wait.
-         hobject_t wait_oid(osd_op.soid.oid, ssnapid, op->get_pg().ps());
+         hobject_t wait_oid(osd_op.soid.oid, op->get_object_locator().key, 
+                            ssnapid, op->get_pg().ps());
          wait_for_missing_object(wait_oid, op);
        } else if (r) {
          osd->reply_op_error(op, r);
@@ -816,7 +821,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
   // load clone info
   bufferlist bl;
   ObjectContext *obc = 0;
-  int r = find_object_context(hobject_t(coid.oid, sn, coid.hash),
+  int r = find_object_context(hobject_t(coid.oid, coid.key, sn, coid.hash),
                              OLOC_BLANK, &obc, false, NULL);
   if (r == -ENOENT || coid.snap != obc->obs.oi.soid.snap) {
     if (obc) put_object_context(obc);
@@ -829,7 +834,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
 
   // get snap set context
   if (!obc->ssc)
-    obc->ssc = get_snapset_context(coid.oid, coid.hash, false);
+    obc->ssc = get_snapset_context(coid.oid, coid.key, coid.hash, false);
   SnapSetContext *ssc = obc->ssc;
   assert(ssc);
   SnapSet& snapset = ssc->snapset;
@@ -926,7 +931,7 @@ ReplicatedPG::RepGather *ReplicatedPG::trim_object(const hobject_t &coid,
   // save head snapset
   dout(10) << coid << " new snapset " << snapset << dendl;
 
-  hobject_t snapoid(coid.oid, snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash);
+  hobject_t snapoid(coid.oid, coid.key, snapset.head_exists ? CEPH_NOSNAP:CEPH_SNAPDIR, coid.hash);
   ctx->snapset_obc = get_object_context(snapoid, coi.oloc, false);
   assert(ctx->snapset_obc->registered);
   if (snapset.clones.empty() && !snapset.head_exists) {
@@ -1157,7 +1162,7 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
 
     ObjectContext *src_obc = 0;
     if (ceph_osd_op_type_multi(op.op)) {
-      src_obc = ctx->src_obc[hobject_t(osd_op.soid, soid.hash)];
+      src_obc = ctx->src_obc[hobject_t(osd_op.soid, soid.key, soid.hash)];
       assert(src_obc);
     }
 
@@ -1678,8 +1683,11 @@ int ReplicatedPG::do_osd_ops(OpContext *ctx, vector<OSDOp>& ops,
          result = -EINVAL;
          break;
        }
-       t.clone_range(coll, hobject_t(osd_op.soid, obs.oi.soid.hash), obs.oi.soid,
-                     op.clonerange.src_offset, op.clonerange.length, op.clonerange.offset);
+       t.clone_range(coll, hobject_t(osd_op.soid, obs.oi.soid.key, 
+                                     obs.oi.soid.hash),
+                     obs.oi.soid, op.clonerange.src_offset,
+                     op.clonerange.length, op.clonerange.offset);
+                     
 
        write_update_size_and_usage(ctx->delta_stats, oi, ssc->snapset, ctx->modified_ranges,
                                    op.clonerange.offset, op.clonerange.length, false);
@@ -2096,7 +2104,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
   dout(10) << "_rollback_to " << soid << " snapid " << snapid << dendl;
 
   ObjectContext *rollback_to;
-  int ret = find_object_context(hobject_t(soid.oid, snapid, soid.hash), 
+  int ret = find_object_context(hobject_t(soid.oid, oi.oloc.key, snapid, soid.hash), 
                                oi.oloc, &rollback_to, false, &cloneid);
   if (ret) {
     if (-ENOENT == ret) {
@@ -2109,7 +2117,7 @@ int ReplicatedPG::_rollback_to(OpContext *ctx, ceph_osd_op& op)
       /* a different problem, like degraded pool
        * with not-yet-restored object. We shouldn't have been able
        * to get here; recovery should have completed first! */
-      hobject_t rollback_target(soid.oid, cloneid, soid.hash);
+      hobject_t rollback_target(soid.oid, soid.key, cloneid, soid.hash);
       assert(is_missing_object(rollback_target));
       dout(20) << "_rollback_to attempted to roll back to a missing object " 
               << rollback_target << " (requested snapid: ) " << snapid << dendl;
@@ -2546,7 +2554,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
     ctx->op_t.setattr(coll, soid, SS_ATTR, bss);   
     if (!head_existed) {
       // if we logically recreated the head, remove old _snapdir object
-      hobject_t snapoid(soid.oid, CEPH_SNAPDIR, soid.hash);
+      hobject_t snapoid(soid.oid, soid.key, CEPH_SNAPDIR, soid.hash);
 
       ctx->snapset_obc = get_object_context(snapoid, ctx->new_obs.oi.oloc, false);
       if (ctx->snapset_obc && ctx->snapset_obc->obs.exists) {
@@ -2563,7 +2571,7 @@ int ReplicatedPG::prepare_transaction(OpContext *ctx)
     }
   } else if (ctx->new_snapset.clones.size()) {
     // save snapset on _snap
-    hobject_t snapoid(soid.oid, CEPH_SNAPDIR, soid.hash);
+    hobject_t snapoid(soid.oid, soid.key, CEPH_SNAPDIR, soid.hash);
     dout(10) << " final snapset " << ctx->new_snapset
             << " in " << snapoid << dendl;
     ctx->at_version.version++;
@@ -3017,13 +3025,13 @@ ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(const hobject_t& s
 
       SnapSetContext *ssc = NULL;
       if (can_create)
-       ssc = get_snapset_context(soid.oid, soid.hash, true);
+       ssc = get_snapset_context(soid.oid, soid.key, soid.hash, true);
       obc = new ObjectContext(oi, true, ssc);
     }
     register_object_context(obc);
 
     if (can_create && !obc->ssc)
-      obc->ssc = get_snapset_context(soid.oid, soid.hash, true);
+      obc->ssc = get_snapset_context(soid.oid, soid.key, soid.hash, true);
 
     if (r >= 0) {
       obc->obs.oi.decode(bv);
@@ -3058,7 +3066,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
                                      snapid_t *psnapid)
 {
   // want the head?
-  hobject_t head(oid.oid, CEPH_NOSNAP, oid.hash);
+  hobject_t head(oid.oid, oid.key, CEPH_NOSNAP, oid.hash);
   if (oid.snap == CEPH_NOSNAP) {
     ObjectContext *obc = get_object_context(head, oloc, can_create);
     if (!obc)
@@ -3067,13 +3075,13 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
     *pobc = obc;
 
     if (can_create && !obc->ssc)
-      obc->ssc = get_snapset_context(oid.oid, oid.hash, true);
+      obc->ssc = get_snapset_context(oid.oid, oid.key, oid.hash, true);
 
     return 0;
   }
 
   // we want a snap
-  SnapSetContext *ssc = get_snapset_context(oid.oid, oid.hash, can_create);
+  SnapSetContext *ssc = get_snapset_context(oid.oid, oid.key, oid.hash, can_create);
   if (!ssc)
     return -ENOENT;
 
@@ -3115,7 +3123,7 @@ int ReplicatedPG::find_object_context(const hobject_t& oid,
     put_snapset_context(ssc);
     return -ENOENT;
   }
-  hobject_t soid(oid.oid, ssc->snapset.clones[k], oid.hash);
+  hobject_t soid(oid.oid, oid.key, ssc->snapset.clones[k], oid.hash);
 
   put_snapset_context(ssc); // we're done with ssc
   ssc = 0;
@@ -3179,7 +3187,9 @@ void ReplicatedPG::put_object_contexts(map<hobject_t,ObjectContext*>& obcv)
   obcv.clear();
 }
 
-ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid, ps_t seed,
+ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t& oid,
+                                                               const string& key,
+                                                               ps_t seed,
                                                                bool can_create)
 {
   SnapSetContext *ssc;
@@ -3188,11 +3198,11 @@ ReplicatedPG::SnapSetContext *ReplicatedPG::get_snapset_context(const object_t&
     ssc = p->second;
   } else {
     bufferlist bv;
-    hobject_t head(oid, CEPH_NOSNAP, seed);
+    hobject_t head(oid, key, CEPH_NOSNAP, seed);
     int r = osd->store->getattr(coll, head, SS_ATTR, bv);
     if (r < 0) {
       // try _snapset
-      hobject_t snapdir(oid, CEPH_SNAPDIR, seed);
+      hobject_t snapdir(oid, key, CEPH_SNAPDIR, seed);
       r = osd->store->getattr(coll, snapdir, SS_ATTR, bv);
       if (r < 0 && !can_create)
        return NULL;
@@ -3602,7 +3612,7 @@ int ReplicatedPG::pull(const hobject_t& soid)
     }
 
     // check snapset
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false);
+    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
     dout(10) << " snapset " << ssc->snapset << dendl;
     calc_clone_subsets(ssc->snapset, soid, missing,
                       data_subset, clone_subsets);
@@ -3710,7 +3720,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in
       return push_start(snapdir, peer);
     }
     
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false);
+    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
     calc_clone_subsets(ssc->snapset, soid, peer_missing[peer],
                       data_subset, clone_subsets);
@@ -3718,7 +3728,7 @@ void ReplicatedPG::push_to_replica(ObjectContext *obc, const hobject_t& soid, in
   } else if (soid.snap == CEPH_NOSNAP) {
     // pushing head or unversioned object.
     // base this on partially on replica's clones?
-    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false);
+    SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
     dout(15) << "push_to_replica snapset is " << ssc->snapset << dendl;
     calc_head_subsets(ssc->snapset, soid, peer_missing[peer], data_subset, clone_subsets);
     put_snapset_context(ssc);
@@ -4062,7 +4072,7 @@ void ReplicatedPG::sub_op_push(MOSDSubOp *op)
 
     if (soid.snap && soid.snap < CEPH_NOSNAP) {
       // clone.  make sure we have enough data.
-      SnapSetContext *ssc = get_snapset_context(soid.oid, soid.hash, false);
+      SnapSetContext *ssc = get_snapset_context(soid.oid, soid.key, soid.hash, false);
       assert(ssc);
 
       clone_subsets.clear();   // forget what pusher said; recalculate cloning.
index faadd3f03bf8891a74b6882039a4337f1885aeab..f9295e4e13c13a3072f7cb95175d115f9b340094 100644 (file)
@@ -510,7 +510,8 @@ protected:
                          ObjectContext **pobc,
                          bool can_create, snapid_t *psnapid=NULL);
 
-  SnapSetContext *get_snapset_context(const object_t& oid, ps_t seed, bool can_create);
+  SnapSetContext *get_snapset_context(const object_t& oid, const string &key,
+                                     ps_t seed, bool can_create);
   void register_snapset_context(SnapSetContext *ssc) {
     if (!ssc->registered) {
       ssc->registered = true;