]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
mds: locking. Keep trap of neighbors and coalesce them
authorGreg Farnum <gregf@hq.newdream.net>
Tue, 11 May 2010 20:01:32 +0000 (13:01 -0700)
committerGreg Farnum <gregf@hq.newdream.net>
Mon, 2 Aug 2010 17:39:56 +0000 (10:39 -0700)
src/mds/mdstypes.h

index 4ab9532fb0b11466d48e9b6f4fb4216739384331..8ff11d2852c56ba57d391497dc8da2b5631fd5bf 100644 (file)
@@ -358,9 +358,10 @@ struct ceph_lock_state_t {
    */
   bool add_lock(ceph_filelock& new_lock, bool wait_on_fail) {
     bool ret = false;
-    list<ceph_filelock*> overlapping_locks, self_overlapping_locks;
+    list<ceph_filelock*> overlapping_locks, self_overlapping_locks,
+      neighbor_locks;
     // first, get any overlapping locks and split them into owned-by-us and not
-    if(get_overlapping_locks(new_lock, overlapping_locks)) {
+    if(get_overlapping_locks(new_lock, overlapping_locks, &neighbor_locks)) {
       split_by_owner(new_lock, overlapping_locks, self_overlapping_locks);
     }
     if (!overlapping_locks.empty()) { //overlapping locks owned by others :(
@@ -380,14 +381,14 @@ struct ceph_lock_state_t {
          ret = false;
        } else {
          //yay, we can insert a shared lock
-         adjust_locks(self_overlapping_locks, new_lock);
+         adjust_locks(self_overlapping_locks, new_lock, neighbor_locks);
          held_locks.
            insert(pair<__u64, ceph_filelock>(new_lock.start, new_lock));
          ret = true;
        }
       }
     } else { //no overlapping locks except our own
-      adjust_locks(self_overlapping_locks, new_lock);
+      adjust_locks(self_overlapping_locks, new_lock, neighbor_locks);
       held_locks.insert(pair<__u64, ceph_filelock>(new_lock.start, new_lock));
       ret = true;
     }
@@ -539,7 +540,8 @@ private:
    * old_locks: list of all locks currently held by same
    *    client/process that overlap new_lock.
    */
-  void adjust_locks(list<ceph_filelock*> old_locks, ceph_filelock& new_lock) {
+  void adjust_locks(list<ceph_filelock*> old_locks, ceph_filelock& new_lock,
+                   list<ceph_filelock*> neighbor_locks) {
     bool new_lock_to_end = (0 == new_lock.length);
     bool old_lock_to_end;
     __u64 new_lock_start = new_lock.start;
@@ -617,6 +619,35 @@ private:
        client_held_lock_counts.erase(old_lock->client);
       }
     }
+
+    //make sure to coalesce neighboring locks
+    for (list<ceph_filelock*>::iterator iter = neighbor_locks.begin();
+        iter != neighbor_locks.end();
+        ++iter) {
+      old_lock = *iter;
+      /* because if it's a neibhoring lock there can't be any self-overlapping
+        locks that covered it */
+      if (old_lock->type == new_lock.type) { //merge them
+       if (0 == new_lock.length) {
+         if (old_lock->start + old_lock->length == new_lock.start) {
+           new_lock.start = old_lock->start;
+         } else assert(0); /* if there's no end to new_lock, the neighbor
+                              HAS TO be to left side */
+       } else if (0 == old_lock->length) {
+         if (new_lock.start + new_lock.length == old_lock->start) {
+           new_lock.length = 0;
+         } else assert(0); //same as before, but reversed
+       } else {
+         if (old_lock->start + old_lock->length == new_lock.start) {
+           new_lock.start = old_lock->start;
+           new_lock.length += old_lock->length;
+         } else if (new_lock.start + new_lock.length == old_lock->start) {
+           new_lock.length += old_lock->length;
+         }
+       }
+       held_locks.erase(find_specific_elem(old_lock, held_locks));
+      }
+    }
   }
 
   //this won't reset the counter map value, do that yourself
@@ -698,14 +729,26 @@ private:
    * Returns: true if at least one lock overlaps.
    */
   bool get_overlapping_locks(ceph_filelock& lock,
-                            list<ceph_filelock*>& overlaps) {
+                            list<ceph_filelock*>& overlaps,
+                            list<ceph_filelock*>* self_neighbors) {
     dout(0) << "get_overlapping_locks" << dendl;
+    // create a lock starting one earlier and ending one later
+    // to check for neighbors
+    ceph_filelock neighbor_check_lock = lock;
+    --neighbor_check_lock.start;
+    if (neighbor_check_lock.length) neighbor_check_lock.length += 2;
+    //find the last held lock starting at the point after lock
     multimap<__u64, ceph_filelock>::iterator iter =
-      get_last_before(lock.start + lock.length - 1, held_locks);
+      get_last_before(lock.start + lock.length, held_locks);
     bool cont = iter != held_locks.end();
     while(cont) {
       if (share_space(iter, lock)) {
        overlaps.push_front(&iter->second);
+      } else if (self_neighbors &&
+                (neighbor_check_lock.client == iter->second.client) &&
+                (neighbor_check_lock.pid == iter->second.pid) &&
+                share_space(iter, neighbor_check_lock)) {
+       self_neighbors->push_front(&iter->second);
       }
       if ((iter->first < lock.start) && (CEPH_LOCK_EXCL == iter->second.type)) {
        //can't be any more overlapping locks or they'd interfere with this one
@@ -716,6 +759,11 @@ private:
     return !overlaps.empty();
   }
 
+  bool get_overlapping_locks(ceph_filelock& lock,
+                            list<ceph_filelock*>& overlaps) {
+    return get_overlapping_locks(lock, overlaps, NULL);
+  }
+
   /**
    * Get a list of all waiting locks that overlap with the given lock's range.
    * lock: specifies the range to compare with