]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objectcacher: coalesce dirty buffers in one object into one IO
authorYan, Zheng <zyan@redhat.com>
Mon, 7 Dec 2015 08:14:43 +0000 (16:14 +0800)
committerGreg Farnum <gfarnum@redhat.com>
Wed, 13 Jan 2016 02:35:47 +0000 (18:35 -0800)
Signed-off-by: Yan, Zheng <zyan@redhat.com>
Update for ceph::real_time
Signed-off-by: Greg Farnum <gfarnum@redhat.com>
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h
src/osdc/WritebackHandler.h

index 0324fcc0066311b834046084afd7aca180cd2242..662208d025c348e8a9f2a9fbf908d0cf9c834815 100644 (file)
@@ -582,6 +582,7 @@ ObjectCacher::ObjectCacher(CephContext *cct_, string name,
 {
   perf_start();
   finisher.start();
+  scattered_write = writeback_handler.can_scattered_write();
 }
 
 ObjectCacher::~ObjectCacher()
@@ -891,6 +892,105 @@ void ObjectCacher::bh_read_finish(int64_t poolid, sobject_t oid,
   read_cond.Signal();
 }
 
+void ObjectCacher::bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
+                                       int64_t *max_amount, int *max_count)
+{
+  list<BufferHead*> blist;
+
+  int count = 0;
+  int64_t total_len = 0;
+  set<BufferHead*, BufferHead::ptr_lt>::iterator it = dirty_or_tx_bh.find(bh);
+  assert(it != dirty_or_tx_bh.end());
+  for (set<BufferHead*, BufferHead::ptr_lt>::iterator p = it;
+       p != dirty_or_tx_bh.end();
+       ++p) {
+    BufferHead *obh = *p;
+    if (obh->ob != bh->ob)
+      break;
+    if (obh->is_dirty() && obh->last_write < cutoff) {
+      blist.push_back(obh);
+      ++count;
+      total_len += obh->length();
+      if ((max_count && count > *max_count) ||
+         (max_amount && total_len > *max_amount))
+       break;
+    }
+  }
+
+  while (it != dirty_or_tx_bh.begin()) {
+    --it;
+    BufferHead *obh = *it;
+    if (obh->ob != bh->ob)
+      break;
+    if (obh->is_dirty() && obh->last_write < cutoff) {
+      blist.push_front(obh);
+      ++count;
+      total_len += obh->length();
+      if ((max_count && count > *max_count) ||
+         (max_amount && total_len > *max_amount))
+       break;
+    }
+  }
+  if (max_count)
+    *max_count -= count;
+  if (max_amount)
+    *max_amount -= total_len;
+
+  bh_write_scattered(blist);
+}
+
+void ObjectCacher::bh_write_scattered(list<BufferHead*>& blist)
+{
+  assert(lock.is_locked());
+
+  Object *ob = blist.front()->ob;
+  ob->get();
+
+  ceph::real_time last_write;
+  SnapContext snapc;
+  vector<pair<loff_t, uint64_t> > ranges;
+  vector<pair<uint64_t, bufferlist> > io_vec;
+
+  ranges.reserve(blist.size());
+  io_vec.reserve(blist.size());
+
+  uint64_t total_len = 0;
+  for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
+    BufferHead *bh = *p;
+    ldout(cct, 7) << "bh_write_scattered " << *bh << dendl;
+    assert(bh->ob == ob);
+    assert(bh->bl.length() == bh->length());
+    ranges.push_back(pair<loff_t, uint64_t>(bh->start(), bh->length()));
+
+    int n = io_vec.size();
+    io_vec.resize(n + 1);
+    io_vec[n].first = bh->start();
+    io_vec[n].second = bh->bl;
+
+    total_len += bh->length();
+    if (bh->snapc.seq > snapc.seq)
+      snapc = bh->snapc;
+    if (bh->last_write > last_write)
+      bh->last_write = bh->last_write;
+  }
+
+  C_WriteCommit *oncommit = new C_WriteCommit(this, ob->oloc.pool, ob->get_soid(), ranges);
+
+  ceph_tid_t tid = writeback_handler.write(ob->get_oid(), ob->get_oloc(),
+                                          io_vec, snapc, last_write,
+                                          ob->truncate_size, ob->truncate_seq,
+                                          oncommit);
+  oncommit->tid = tid;
+  ob->last_write_tid = tid;
+  for (list<BufferHead*>::iterator p = blist.begin(); p != blist.end(); ++p) {
+    BufferHead *bh = *p;
+    bh->last_write_tid = tid;
+    mark_tx(bh);
+  }
+
+  if (perfcounter)
+    perfcounter->inc(l_objectcacher_data_flushed, total_len);
+}
 
 void ObjectCacher::bh_write(BufferHead *bh)
 {
@@ -925,20 +1025,27 @@ void ObjectCacher::bh_write(BufferHead *bh)
   mark_tx(bh);
 }
 
-void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
-                                  uint64_t length, ceph_tid_t tid, int r)
+void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid,
+                                  vector<pair<loff_t, uint64_t> >& ranges,
+                                  ceph_tid_t tid, int r)
 {
   assert(lock.is_locked());
   ldout(cct, 7) << "bh_write_commit " << oid << " tid " << tid
-               << " " << start << "~" << length << " returned " << r
-               << dendl;
+               << " ranges " << ranges << " returned " << r << dendl;
 
   if (objects[poolid].count(oid) == 0) {
     ldout(cct, 7) << "bh_write_commit no object cache" << dendl;
-  } else {
-    Object *ob = objects[poolid][oid];
-    int was_dirty_or_tx = ob->oset->dirty_or_tx;
+    return;
+  }
+
+  Object *ob = objects[poolid][oid];
+  int was_dirty_or_tx = ob->oset->dirty_or_tx;
 
+  for (vector<pair<loff_t, uint64_t> >::iterator p = ranges.begin();
+       p != ranges.end();
+       ++p) {
+    loff_t start = p->first;
+    uint64_t length = p->second;
     if (!ob->exists) {
       ldout(cct, 10) << "bh_write_commit marking exists on " << *ob << dendl;
       ob->exists = true;
@@ -1002,32 +1109,31 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
       assert(*bh);
       ob->try_merge_bh(*bh);
     }
+  }
 
-    // update last_commit.
-    assert(ob->last_commit_tid < tid);
-    ob->last_commit_tid = tid;
-
-    // waiters?
-    list<Context*> ls;
-    if (ob->waitfor_commit.count(tid)) {
-      ls.splice(ls.begin(), ob->waitfor_commit[tid]);
-      ob->waitfor_commit.erase(tid);
-    }
+  // update last_commit.
+  assert(ob->last_commit_tid < tid);
+  ob->last_commit_tid = tid;
 
-    // is the entire object set now clean and fully committed?
-    ObjectSet *oset = ob->oset;
-    ob->put();
+  // waiters?
+  list<Context*> ls;
+  if (ob->waitfor_commit.count(tid)) {
+    ls.splice(ls.begin(), ob->waitfor_commit[tid]);
+    ob->waitfor_commit.erase(tid);
+  }
 
-    if (flush_set_callback &&
-       was_dirty_or_tx > 0 &&
-       oset->dirty_or_tx == 0) {
-      // nothing dirty/tx
-      flush_set_callback(flush_set_callback_arg, oset);
-    }
+  // is the entire object set now clean and fully committed?
+  ObjectSet *oset = ob->oset;
+  ob->put();
 
-    if (!ls.empty())
-      finish_contexts(cct, ls, r);
+  if (flush_set_callback &&
+      was_dirty_or_tx > 0 &&
+      oset->dirty_or_tx == 0) {        // nothing dirty/tx
+    flush_set_callback(flush_set_callback_arg, oset);
   }
+
+  if (!ls.empty())
+    finish_contexts(cct, ls, r);
 }
 
 void ObjectCacher::flush(loff_t amount)
@@ -1043,16 +1149,20 @@ void ObjectCacher::flush(loff_t amount)
    * to the other LRU, so that we can call
    * lru_dirty.lru_get_next_expire() again.
    */
-  loff_t did = 0;
-  while (amount == 0 || did < amount) {
+  int64_t left = amount;
+  while (amount == 0 || left > 0) {
     BufferHead *bh = static_cast<BufferHead*>(
       bh_lru_dirty.lru_get_next_expire());
     if (!bh) break;
     if (bh->last_write > cutoff) break;
 
-    did += bh->length();
-    bh_write(bh);
-  }
+    if (scattered_write) {
+      bh_write_adjacencies(bh, cutoff, amount > 0 ? &left : NULL, NULL);
+    } else {
+      left -= bh->length();
+      bh_write(bh);
+    }
+  }    
 }
 
 
@@ -1173,6 +1283,7 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
                                              ex_it->length, soid.snap)) {
        ldout(cct, 20) << "readx  may copy on write" << dendl;
        bool wait = false;
+       list<BufferHead*> blist;
        for (map<loff_t, BufferHead*>::iterator bh_it = o->data.begin();
             bh_it != o->data.end();
             ++bh_it) {
@@ -1180,10 +1291,16 @@ int ObjectCacher::_readx(OSDRead *rd, ObjectSet *oset, Context *onfinish,
          if (bh->is_dirty() || bh->is_tx()) {
            ldout(cct, 10) << "readx  flushing " << *bh << dendl;
            wait = true;
-           if (bh->is_dirty())
-             bh_write(bh);
+           if (bh->is_dirty()) {
+             if (scattered_write)
+               blist.push_back(bh);
+             else
+               bh_write(bh);
+           }
          }
        }
+       if (scattered_write && !blist.empty())
+         bh_write_scattered(blist);
        if (wait) {
          ldout(cct, 10) << "readx  waiting on tid " << o->last_write_tid
                         << " on " << *o << dendl;
@@ -1651,9 +1768,14 @@ void ObjectCacher::flusher_entry()
       while ((bh = static_cast<BufferHead*>(bh_lru_dirty.
                                            lru_get_next_expire())) != 0 &&
             bh->last_write < cutoff &&
-            --max > 0) {
+            max > 0) {
        ldout(cct, 10) << "flusher flushing aged dirty bh " << *bh << dendl;
-       bh_write(bh);
+       if (scattered_write) {
+         bh_write_adjacencies(bh, cutoff, NULL, &max);
+        } else {
+         bh_write(bh);
+         --max;
+       }
       }
       if (!max) {
        // back off the lock to avoid starving other threads
@@ -1769,6 +1891,7 @@ void ObjectCacher::purge(Object *ob)
 bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
 {
   assert(lock.is_locked());
+  list<BufferHead*> blist;
   bool clean = true;
   ldout(cct, 10) << "flush " << *ob << " " << offset << "~" << length << dendl;
   for (map<loff_t,BufferHead*>::iterator p = ob->data_lower_bound(offset);
@@ -1786,9 +1909,16 @@ bool ObjectCacher::flush(Object *ob, loff_t offset, loff_t length)
     if (!bh->is_dirty()) {
       continue;
     }
-    bh_write(bh);
+
+    if (scattered_write)
+      blist.push_back(bh);
+    else
+      bh_write(bh);
     clean = false;
   }
+  if (scattered_write && !blist.empty())
+    bh_write_scattered(blist);
+
   return clean;
 }
 
@@ -1825,6 +1955,8 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
   C_GatherBuilder gather(cct);
   set<Object*> waitfor_commit;
 
+  list<BufferHead*> blist;
+  Object *last_ob = NULL;
   set<BufferHead*, BufferHead::ptr_lt>::iterator it, p, q;
 
   // Buffer heads in dirty_or_tx_bh are sorted in ObjectSet/Object/offset
@@ -1846,8 +1978,20 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
     if (bh->ob->oset != oset)
       break;
     waitfor_commit.insert(bh->ob);
-    if (bh->is_dirty())
-      bh_write(bh);
+    if (bh->is_dirty()) {
+      if (scattered_write) {
+       if (last_ob != bh->ob) {
+         if (!blist.empty()) {
+           bh_write_scattered(blist);
+           blist.clear();
+         }
+         last_ob = bh->ob;
+       }
+       blist.push_back(bh);
+      } else {
+       bh_write(bh);
+      }
+    }
   }
 
   if (backwards) {
@@ -1860,13 +2004,28 @@ bool ObjectCacher::flush_set(ObjectSet *oset, Context *onfinish)
       if (bh->ob->oset != oset)
        break;
       waitfor_commit.insert(bh->ob);
-      if (bh->is_dirty())
-       bh_write(bh);
+      if (bh->is_dirty()) {
+       if (scattered_write) {
+         if (last_ob != bh->ob) {
+           if (!blist.empty()) {
+             bh_write_scattered(blist);
+             blist.clear();
+           }
+           last_ob = bh->ob;
+         }
+         blist.push_front(bh);
+       } else {
+         bh_write(bh);
+       }
+      }
       if (!backwards)
        break;
     }
   }
 
+  if (scattered_write && !blist.empty())
+    bh_write_scattered(blist);
+
   for (set<Object*>::iterator i = waitfor_commit.begin();
        i != waitfor_commit.end(); ++i) {
     Object *ob = *i;
@@ -1935,6 +2094,8 @@ bool ObjectCacher::flush_all(Context *onfinish)
   C_GatherBuilder gather(cct);
   set<Object*> waitfor_commit;
 
+  list<BufferHead*> blist;
+  Object *last_ob = NULL;
   set<BufferHead*, BufferHead::ptr_lt>::iterator next, it;
   next = it = dirty_or_tx_bh.begin();
   while (it != dirty_or_tx_bh.end()) {
@@ -1942,12 +2103,27 @@ bool ObjectCacher::flush_all(Context *onfinish)
     BufferHead *bh = *it;
     waitfor_commit.insert(bh->ob);
 
-    if (bh->is_dirty())
-      bh_write(bh);
+    if (bh->is_dirty()) {
+      if (scattered_write) {
+       if (last_ob != bh->ob) {
+         if (!blist.empty()) {
+           bh_write_scattered(blist);
+           blist.clear();
+         }
+         last_ob = bh->ob;
+       }
+       blist.push_back(bh);
+      } else {
+       bh_write(bh);
+      }
+    }
 
     it = next;
   }
 
+  if (scattered_write && !blist.empty())
+    bh_write_scattered(blist);
+
   for (set<Object*>::iterator i = waitfor_commit.begin();
        i != waitfor_commit.end();
        ++i) {
index d5abb543406598240c90e15435946cedcdc09c64..791412ee7f19d77055acba568c27579dd5333781 100644 (file)
@@ -395,6 +395,7 @@ class ObjectCacher {
   // ObjectCacher fields
  private:
   WritebackHandler& writeback_handler;
+  bool scattered_write;
 
   string name;
   Mutex& lock;
@@ -521,6 +522,9 @@ class ObjectCacher {
   // io
   void bh_read(BufferHead *bh, int op_flags);
   void bh_write(BufferHead *bh);
+  void bh_write_scattered(list<BufferHead*>& blist);
+  void bh_write_adjacencies(BufferHead *bh, ceph::real_time cutoff,
+                           int64_t *amount, int *max_count);
 
   void trim();
   void flush(loff_t amount=0);
@@ -552,8 +556,10 @@ class ObjectCacher {
                      loff_t offset, uint64_t length,
                      bufferlist &bl, int r,
                      bool trust_enoent);
-  void bh_write_commit(int64_t poolid, sobject_t oid, loff_t offset,
-                      uint64_t length, ceph_tid_t t, int r);
+  void bh_write_commit(int64_t poolid, sobject_t oid,
+                      vector<pair<loff_t, uint64_t> >& ranges,
+                      ceph_tid_t t, int r);
+
 
   class C_ReadFinish : public Context {
     ObjectCacher *oc;
@@ -592,17 +598,23 @@ class ObjectCacher {
     ObjectCacher *oc;
     int64_t poolid;
     sobject_t oid;
-    loff_t start;
-    uint64_t length;
+    vector<pair<loff_t, uint64_t> > ranges;
   public:
     ceph_tid_t tid;
     C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o, loff_t s,
                  uint64_t l) :
-      oc(c), poolid(_poolid), oid(o), start(s), length(l), tid(0) {}
+      oc(c), poolid(_poolid), oid(o), tid(0) {
+       ranges.push_back(make_pair(s, l));
+      }
+    C_WriteCommit(ObjectCacher *c, int64_t _poolid, sobject_t o,
+                 vector<pair<loff_t, uint64_t> >& _ranges) :
+      oc(c), poolid(_poolid), oid(o), tid(0) {
+       ranges.swap(_ranges);
+      }
     void finish(int r) {
-      oc->bh_write_commit(poolid, oid, start, length, tid, r);
+      oc->bh_write_commit(poolid, oid, ranges, tid, r);
     }
 };
+ };
 
   class C_WaitForWrite : public Context {
   public:
index fdafeaad3a3a36e031554dc77dfa3301a2e01854..f0efd2007841cf23012247b572802a4b9a0871e4 100644 (file)
@@ -39,6 +39,15 @@ class WritebackHandler {
   virtual void overwrite_extent(const object_t& oid, uint64_t off, uint64_t len,
                                 ceph_tid_t journal_tid) {}
 
+  virtual bool can_scattered_write() { return false; }
+  virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
+                          vector<pair<uint64_t, bufferlist> >& io_vec,
+                          const SnapContext& snapc, ceph::real_time mtime,
+                          uint64_t trunc_size, __u32 trunc_seq,
+                          Context *oncommit) {
+    return 0;
+  }
+
   virtual void get_client_lock() {}
   virtual void put_client_lock() {}
 };