]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: push wrlocks into object_info_t
authorSage Weil <sage@newdream.net>
Wed, 28 Jan 2009 21:06:05 +0000 (13:06 -0800)
committerSage Weil <sage@newdream.net>
Thu, 29 Jan 2009 18:17:45 +0000 (10:17 -0800)
src/osd/PG.cc
src/osd/PG.h
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h
src/osd/osd_types.h

index 71762bfd71c7ac492b98fc151dda76c4c62e4be9..29b744a8f628802a99e9e0b70d4b9be1ce3cfe1d 100644 (file)
@@ -1855,23 +1855,16 @@ coll_t PG::make_snap_collection(ObjectStore::Transaction& t, snapid_t s)
 // If the target object of the operation op is locked for writing by another client, the function puts op to the waiting queue waiting_for_wr_unlock
 // returns true if object was locked, otherwise returns false
 // 
-bool PG::block_if_wrlocked(MOSDOp* op)
+bool PG::block_if_wrlocked(MOSDOp* op, object_info_t& oi)
 {
   pobject_t poid(info.pgid.pool(), 0, op->get_oid());
 
-  bufferlist bs;
-  int len = osd->store->getattr(info.pgid.to_coll(), poid, "wrlock", bs);
-  if (len > 0) {
-    entity_name_t source;
-    bufferlist::iterator bp = bs.begin();
-    ::decode(source, bp);
-    //dout(0) << "getattr returns " << len << " on " << oid << dendl;
-  
-    if (source != op->get_orig_source()) {
-      //the object is locked for writing by someone else -- add the op to the waiting queue      
-      waiting_for_wr_unlock[poid.oid].push_back(op);
-      return true;
-    }
+  if (oi.wrlock_by.tid &&
+      oi.wrlock_by.name != op->get_orig_source()) {
+    //the object is locked for writing by someone else -- add the op to the waiting queue      
+    dout(10) << "blocked on wrlock on " << oi << dendl;
+    waiting_for_wr_unlock[poid.oid].push_back(op);
+    return true;
   }
   
   return false; //the object wasn't locked, so the operation can be handled right away
index 4c5e0f5c5c5c54250e7e579473c7b1f0d56fe286..0f0ea530f12e51e63f162fbbb4f56b528321a408 100644 (file)
@@ -654,7 +654,7 @@ protected:
   
   hash_map<object_t, list<Message*> > waiting_for_wr_unlock; 
 
-  bool block_if_wrlocked(MOSDOp* op);
+  bool block_if_wrlocked(MOSDOp* op, object_info_t& oi);
 
 
   // stats
index a5480f0aabf657bd52c2bebec34072130dc176cd..3d2ff1d830517ec2cfc28d78ca2b3599682a8e3b 100644 (file)
@@ -573,7 +573,7 @@ bool ReplicatedPG::snap_trimmer()
 /*
  * return false if object doesn't (logically) exist
  */
-int ReplicatedPG::pick_read_snap(pobject_t& poid)
+int ReplicatedPG::pick_read_snap(pobject_t& poid, object_info_t& coi)
 {
   pobject_t head = poid;
   head.oid.snap = CEPH_NOSNAP;
@@ -621,15 +621,6 @@ int ReplicatedPG::pick_read_snap(pobject_t& poid)
   }
 
   // clone
-  bl.clear();
-  r = osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bl);
-  if (r < 0) {
-    dout(20) << "pick_read_snap  " << poid << " dne, wtf" << dendl;
-    assert(0);
-    return -ENOENT;
-  }
-  object_info_t coi(bl);
-
   dout(20) << "pick_read_snap  " << poid << " snaps " << coi.snaps << dendl;
   snapid_t first = coi.snaps[coi.snaps.size()-1];
   snapid_t last = coi.snaps[0];
@@ -650,8 +641,16 @@ void ReplicatedPG::op_read(MOSDOp *op)
 
   dout(10) << "op_read " << oid << " " << op->ops << dendl;
   
+  object_info_t oi(poid);
+
+  bufferlist bv;
+  osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
+  if (bv.length())
+    oi.decode(bv);
+
   // wrlocked?
-  if (block_if_wrlocked(op)) 
+  if (poid.oid.snap == CEPH_NOSNAP &&
+      block_if_wrlocked(op, oi)) 
     return;
 
 
@@ -707,7 +706,7 @@ void ReplicatedPG::op_read(MOSDOp *op)
 
   // do it.
   if (poid.oid.snap) {
-    result = pick_read_snap(poid);
+    result = pick_read_snap(poid, oi);
     if (result == -EAGAIN) {
       wait_for_missing_object(poid.oid, op);
       return;
@@ -891,15 +890,15 @@ void ReplicatedPG::add_interval_usage(interval_set<__u64>& s, pg_stat_t& stats)
 
 // low level object operations
 int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
-                                   pobject_t poid, __u64& old_size, bool& exists,
+                                   pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
                                    ceph_osd_op& op, bufferlist::iterator& bp,
-                                   SnapSet& snapset, SnapContext& snapc)
+                                   SnapContext& snapc)
 {
   int eop = op.op;
 
   // munge ZERO -> DELETE or TRUNCATE?
   if (eop == CEPH_OSD_OP_ZERO &&
-      snapset.head_exists &&
+      oi.snapset.head_exists &&
       op.offset + op.length >= old_size) {
     if (op.offset == 0) {
       // FIXME: no, this will zap object attributes... do we really want
@@ -911,32 +910,27 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
       dout(10) << " munging ZERO " << op.offset << "~" << op.length
               << " -> TRUNCATE (size is " << old_size << ")" << dendl;
       eop = CEPH_OSD_OP_TRUNCATE;
-      snapset.head_exists = true;
+      oi.snapset.head_exists = true;
     }
   }
   // munge DELETE -> TRUNCATE?
   if (eop == CEPH_OSD_OP_DELETE &&
-      snapset.clones.size()) {
-    dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << snapset.clones << dendl;
+      oi.snapset.clones.size()) {
+    dout(10) << " munging DELETE -> TRUNCATE(0) bc of clones " << oi.snapset.clones << dendl;
     eop = CEPH_OSD_OP_TRUNCATE;
     op.length = 0;
-    snapset.head_exists = false;
+    oi.snapset.head_exists = false;
   }
 
   switch (eop) {
 
     // -- locking --
   case CEPH_OSD_OP_WRLOCK:
-    { // lock object
-      bufferlist b(sizeof(entity_name_t));
-      ::encode(reqid.name, b);
-      t.setattr(info.pgid.to_coll(), poid, "wrlock", b);
-    }
-    break;  
+    oi.wrlock_by = reqid;
+    break;
+
   case CEPH_OSD_OP_WRUNLOCK:
-    { // unlock objects
-      t.rmattr(info.pgid.to_coll(), poid, "wrlock");
-    }
+    oi.wrlock_by = osd_reqid_t();
     break;
 
   case CEPH_OSD_OP_BALANCEREADS:
@@ -960,12 +954,12 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
       bufferlist nbl;
       bp.copy(op.length, nbl);
       t.write(info.pgid.to_coll(), poid, op.offset, op.length, nbl);
-      if (snapset.clones.size()) {
-       snapid_t newest = *snapset.clones.rbegin();
+      if (oi.snapset.clones.size()) {
+       snapid_t newest = *oi.snapset.clones.rbegin();
        interval_set<__u64> ch;
        ch.insert(op.offset, op.length);
-       ch.intersection_of(snapset.clone_overlap[newest]);
-       snapset.clone_overlap[newest].subtract(ch);
+       ch.intersection_of(oi.snapset.clone_overlap[newest]);
+       oi.snapset.clone_overlap[newest].subtract(ch);
        add_interval_usage(ch, st);
       }
       if (op.offset + op.length > old_size) {
@@ -974,7 +968,7 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
        st.num_kb += SHIFT_ROUND_UP(new_size, 10) - SHIFT_ROUND_UP(old_size, 10);
        old_size = new_size;
       }
-      snapset.head_exists = true;
+      oi.snapset.head_exists = true;
     }
     break;
     
@@ -984,9 +978,9 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
       bp.copy(op.length, nbl);
       t.truncate(info.pgid.to_coll(), poid, 0);
       t.write(info.pgid.to_coll(), poid, op.offset, op.length, nbl);
-      if (snapset.clones.size()) {
-       snapid_t newest = *snapset.clones.rbegin();
-       snapset.clone_overlap.erase(newest);
+      if (oi.snapset.clones.size()) {
+       snapid_t newest = *oi.snapset.clones.rbegin();
+       oi.snapset.clone_overlap.erase(newest);
        old_size = 0;
       }
       if (op.length != old_size) {
@@ -996,7 +990,7 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
        st.num_kb += SHIFT_ROUND_UP(op.length, 10);
        old_size = op.length;
       }
-      snapset.head_exists = true;
+      oi.snapset.head_exists = true;
     }
     break;
     
@@ -1006,15 +1000,15 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
       if (!exists)
        t.touch(info.pgid.to_coll(), poid);
       t.zero(info.pgid.to_coll(), poid, op.offset, op.length);
-      if (snapset.clones.size()) {
-       snapid_t newest = *snapset.clones.rbegin();
+      if (oi.snapset.clones.size()) {
+       snapid_t newest = *oi.snapset.clones.rbegin();
        interval_set<__u64> ch;
        ch.insert(op.offset, op.length);
-       ch.intersection_of(snapset.clone_overlap[newest]);
-       snapset.clone_overlap[newest].subtract(ch);
+       ch.intersection_of(oi.snapset.clone_overlap[newest]);
+       oi.snapset.clone_overlap[newest].subtract(ch);
        add_interval_usage(ch, st);
       }
-      snapset.head_exists = true;
+      oi.snapset.head_exists = true;
     }
     break;
 
@@ -1023,18 +1017,18 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
       if (!exists)
        t.touch(info.pgid.to_coll(), poid);
       t.truncate(info.pgid.to_coll(), poid, op.length);
-      if (snapset.clones.size()) {
-       snapid_t newest = *snapset.clones.rbegin();
+      if (oi.snapset.clones.size()) {
+       snapid_t newest = *oi.snapset.clones.rbegin();
        interval_set<__u64> trim;
        if (old_size > op.length) {
          trim.insert(op.length, old_size-op.length);
-         trim.intersection_of(snapset.clone_overlap[newest]);
+         trim.intersection_of(oi.snapset.clone_overlap[newest]);
          add_interval_usage(trim, st);
        }
        interval_set<__u64> keep;
        if (op.length)
          keep.insert(0, op.length);
-       snapset.clone_overlap[newest].intersection_of(keep);
+       oi.snapset.clone_overlap[newest].intersection_of(keep);
       }
       if (op.length != old_size) {
        st.num_bytes -= old_size;
@@ -1050,10 +1044,10 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
   case CEPH_OSD_OP_DELETE:
     { // delete
       t.remove(info.pgid.to_coll(), poid);
-      if (snapset.clones.size()) {
-       snapid_t newest = *snapset.clones.rbegin();
-       add_interval_usage(snapset.clone_overlap[newest], st);
-       snapset.clone_overlap.erase(newest);  // ok, redundant.
+      if (oi.snapset.clones.size()) {
+       snapid_t newest = *oi.snapset.clones.rbegin();
+       add_interval_usage(oi.snapset.clone_overlap[newest], st);
+       oi.snapset.clone_overlap.erase(newest);  // ok, redundant.
       }
       if (exists) {
        st.num_objects--;
@@ -1061,7 +1055,7 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
        st.num_kb -= SHIFT_ROUND_UP(old_size, 10);
        old_size = 0;
        exists = false;
-       snapset.head_exists = false;
+       oi.snapset.head_exists = false;
       }      
     }
     break;
@@ -1078,10 +1072,10 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
       bp.copy(op.name_len, name.data()+1);
       bufferlist bl;
       bp.copy(op.value_len, bl);
-      if (!snapset.head_exists)  // create object if it doesn't yet exist.
+      if (!oi.snapset.head_exists)  // create object if it doesn't yet exist.
        t.touch(info.pgid.to_coll(), poid);
       t.setattr(info.pgid.to_coll(), poid, name, bl);
-      snapset.head_exists = true;
+      oi.snapset.head_exists = true;
     }
     break;
 
@@ -1104,7 +1098,7 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
       newop.op = CEPH_OSD_OP_WRITE;
       newop.offset = old_size;
       newop.length = op.length;
-      prepare_simple_op(t, reqid, st, poid, old_size, exists, newop, bp, snapset, snapc);
+      prepare_simple_op(t, reqid, st, poid, old_size, exists, oi, newop, bp, snapc);
     }
     break;
 
@@ -1116,7 +1110,7 @@ int ReplicatedPG::prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t req
     return -EINVAL;
   }
 
-  if (!exists && snapset.head_exists) {
+  if (!exists && oi.snapset.head_exists) {
     st.num_objects++;
     exists = true;
   }
@@ -1148,9 +1142,8 @@ void ReplicatedPG::prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t
                    at_version, snapc);
       did_snap = true;
     }
-    prepare_simple_op(t, reqid, info.stats, poid, size, exists,
-                     ops[i], bp,
-                     oi.snapset, snapc);
+    prepare_simple_op(t, reqid, info.stats, poid, size, exists, oi,
+                     ops[i], bp, snapc);
   }
 
   // finish.
@@ -1506,13 +1499,6 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   int whoami = osd->get_nodeid();
   pobject_t poid(info.pgid.pool(), 0, op->get_oid());
 
-  // --- locking ---
-
-  // wrlock?
-  if (!op->ops.empty() &&  // except WRNOOP; we just want to flush
-      block_if_wrlocked(op)) 
-    return; // op will be handled later, after the object unlocks
-  
   // balance-reads set?
 #if 0
   char v;
@@ -1543,6 +1529,18 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   }
 #endif
 
+  // get existing object info
+  ProjectedObjectInfo *pinfo = get_projected_object(poid);
+
+  // --- locking ---
+
+  // wrlock?
+  if (!op->ops.empty() &&  // except noop; we just want to flush
+      block_if_wrlocked(op, pinfo->oi)) {
+    put_projected_object(pinfo);
+    return; // op will be handled later, after the object unlocks
+  }
+
   // dup op?
   bool noop = false;
   const char *opname;
@@ -1572,9 +1570,6 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   snapc.seq = op->get_snap_seq();
   snapc.snaps = op->get_snaps();
 
-  // get existing object info
-  ProjectedObjectInfo *pinfo = get_projected_object(poid);
-
   // set version in op, for benefit of client and our eventual reply
   op->set_version(at_version);
 
index 865db920717e2a1a4c534248f1ea24fa49f2c3c4..62f746d695d21845ab89f954574a0905d033db11 100644 (file)
@@ -184,9 +184,8 @@ protected:
                     eversion_t& at_version, SnapContext& snapc);
   void add_interval_usage(interval_set<__u64>& s, pg_stat_t& st);  
   int prepare_simple_op(ObjectStore::Transaction& t, osd_reqid_t reqid, pg_stat_t& st,
-                       pobject_t poid, __u64& old_size, bool& exists,
-                       ceph_osd_op& op, bufferlist::iterator& bp,
-                       SnapSet& snapset, SnapContext& snapc); 
+                       pobject_t poid, __u64& old_size, bool& exists, object_info_t& oi,
+                       ceph_osd_op& op, bufferlist::iterator& bp, SnapContext& snapc); 
   void prepare_transaction(ObjectStore::Transaction& t, osd_reqid_t reqid,
                           pobject_t poid, 
                           vector<ceph_osd_op>& ops, bufferlist& bl,
@@ -208,7 +207,7 @@ protected:
   int recover_primary(int max);
   int recover_replicas(int max);
 
-  int pick_read_snap(pobject_t& poid);
+  int pick_read_snap(pobject_t& poid, object_info_t& coi);
   void op_read(MOSDOp *op);
   void op_modify(MOSDOp *op);
 
index aa68f54e39909c4d0c2af8a044484ad3cf411c35..d98acfde2496af1c1ef065ed48df4de39ca4be8d 100644 (file)
@@ -724,6 +724,7 @@ struct object_info_t {
   osd_reqid_t last_reqid;
   utime_t mtime;
 
+  osd_reqid_t wrlock_by;   // [head]
   SnapSet snapset;         // [head]
   vector<snapid_t> snaps;  // [clone]
 
@@ -733,9 +734,10 @@ struct object_info_t {
     ::encode(prior_version, bl);
     ::encode(last_reqid, bl);
     ::encode(mtime, bl);
-    if (poid.oid.snap == CEPH_NOSNAP)
+    if (poid.oid.snap == CEPH_NOSNAP) {
       ::encode(snapset, bl);
-    else
+      ::encode(wrlock_by, bl);
+    } else
       ::encode(snaps, bl);
   }
   void decode(bufferlist::iterator& bl) {
@@ -744,9 +746,10 @@ struct object_info_t {
     ::decode(prior_version, bl);
     ::decode(last_reqid, bl);
     ::decode(mtime, bl);
-    if (poid.oid.snap == CEPH_NOSNAP)
+    if (poid.oid.snap == CEPH_NOSNAP) {
       ::decode(snapset, bl);
-    else
+      ::decode(wrlock_by, bl);
+    } else
       ::decode(snaps, bl);
   }
   void decode(bufferlist& bl) {
@@ -765,6 +768,8 @@ WRITE_CLASS_ENCODER(object_info_t)
 inline ostream& operator<<(ostream& out, const object_info_t& oi) {
   out << oi.poid << "(" << oi.version
       << " " << oi.last_reqid;
+  if (oi.wrlock_by.tid)
+    out << " wrlock_by=" << oi.wrlock_by;
   if (oi.poid.oid.snap == CEPH_NOSNAP)
     out << " " << oi.snapset << ")";
   else