]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: track journal commit tid within ObjectCacher for writes
authorJason Dillaman <dillaman@redhat.com>
Wed, 15 Jul 2015 16:20:42 +0000 (12:20 -0400)
committerJason Dillaman <dillaman@redhat.com>
Fri, 13 Nov 2015 04:27:06 +0000 (23:27 -0500)
Writebacks from the journal will provide the associated journal commit
tid so that writebacks can be delayed until after the journal entry is
safe on disk.  This allows asynchronously submitting an event to the
journal and submitting the write operation to the ObjectCacher.

Signed-off-by: Jason Dillaman <dillaman@redhat.com>
src/client/ObjecterWriteback.h
src/librbd/ImageCtx.cc
src/librbd/LibrbdWriteback.cc
src/librbd/LibrbdWriteback.h
src/osdc/ObjectCacher.cc
src/osdc/ObjectCacher.h
src/osdc/WritebackHandler.h
src/test/osdc/FakeWriteback.cc
src/test/osdc/FakeWriteback.h
src/test/osdc/object_cacher_stress.cc

index 8c05e969076d4e88ce39e3a62215beed5fab384c..69a9806df57b8c74839e05be06bed57d4ba8c644 100644 (file)
@@ -30,9 +30,10 @@ class ObjecterWriteback : public WritebackHandler {
   }
 
   virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
-                     uint64_t off, uint64_t len, const SnapContext& snapc,
-                     const bufferlist &bl, utime_t mtime, uint64_t trunc_size,
-                     __u32 trunc_seq, Context *oncommit) {
+                           uint64_t off, uint64_t len, const SnapContext& snapc,
+                           const bufferlist &bl, utime_t mtime,
+                           uint64_t trunc_size, __u32 trunc_seq,
+                           ceph_tid_t journal_tid, Context *oncommit) {
     return m_objecter->write_trunc(oid, oloc, off, len, snapc, bl, mtime, 0,
                                   trunc_size, trunc_seq, NULL,
                                   new C_OnFinisher(new C_Lock(m_lock, oncommit),
index eb815283ef0287939f77ce0a58ab78a7005a97f4..86cd49e88bab83436ad0952dba79bf49b93d4f8f 100644 (file)
@@ -628,7 +628,9 @@ public:
                                int fadvise_flags) {
     snap_lock.get_read();
     ObjectCacher::OSDWrite *wr = object_cacher->prepare_write(snapc, bl,
-                                                             utime_t(), fadvise_flags);
+                                                             utime_t(),
+                                                              fadvise_flags,
+                                                              0);
     snap_lock.put_read();
     ObjectExtent extent(o, 0, off, len, 0);
     extent.oloc.pool = data_ctx.get_id();
index d240c97f2437a24ff170c31610a3060c275bf2c7..b55eb3ffa203088391bd118932130bd7ff01581a 100644 (file)
@@ -160,7 +160,7 @@ namespace librbd {
                               const SnapContext& snapc,
                               const bufferlist &bl, utime_t mtime,
                               uint64_t trunc_size, __u32 trunc_seq,
-                              Context *oncommit)
+                              ceph_tid_t journal_tid, Context *oncommit)
   {
     assert(m_ictx->owner_lock.is_locked());
     uint64_t object_no = oid_to_object_no(oid.name, m_ictx->object_prefix);
@@ -175,6 +175,15 @@ namespace librbd {
     return ++m_tid;
   }
 
+
+  void LibrbdWriteback::overwrite_extent(const object_t& oid, uint64_t off,
+                                         uint64_t len, ceph_tid_t journal_tid) {
+    assert(journal_tid != 0);
+
+    // TODO inform the journal that we no longer expect to receive writebacks
+    //      for the specified extent
+  }
+
   void LibrbdWriteback::get_client_lock() {
     m_ictx->owner_lock.get_read();
   }
index b5578ae62f5c3e92718639ec8aa4eb0a09b4d1c8..b7574ae5dd2d0fd58fcc469366c0b80c7719a670 100644 (file)
@@ -34,9 +34,14 @@ namespace librbd {
 
     // Note that oloc, trunc_size, and trunc_seq are ignored
     virtual ceph_tid_t write(const object_t& oid, const object_locator_t& oloc,
-                       uint64_t off, uint64_t len, const SnapContext& snapc,
-                       const bufferlist &bl, utime_t mtime, uint64_t trunc_size,
-                       __u32 trunc_seq, Context *oncommit);
+                             uint64_t off, uint64_t len,
+                             const SnapContext& snapc, const bufferlist &bl,
+                             utime_t mtime, uint64_t trunc_size,
+                             __u32 trunc_seq, ceph_tid_t journal_tid,
+                             Context *oncommit);
+
+    virtual void overwrite_extent(const object_t& oid, uint64_t off,
+                                  uint64_t len, ceph_tid_t journal_tid);
 
     virtual void get_client_lock();
     virtual void put_client_lock();
index d3aeb5a372cc5a466079b226f1fd68548699d3f9..060b7326d24bce4fc5b0d343a81d7dbdca2d7915 100644 (file)
@@ -40,6 +40,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *left, loff_t o
   right->last_read_tid = left->last_read_tid;
   right->set_state(left->get_state());
   right->snapc = left->snapc;
+  right->set_journal_tid(left->journal_tid);
 
   loff_t newleftlen = off - left->start();
   right->set_start(off);
@@ -88,8 +89,14 @@ void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
   assert(oc->lock.is_locked());
   assert(left->end() == right->start());
   assert(left->get_state() == right->get_state());
+  assert(left->can_merge_journal(right));
 
   ldout(oc->cct, 10) << "merge_left " << *left << " + " << *right << dendl;
+  if (left->get_journal_tid() == 0) {
+    left->set_journal_tid(right->get_journal_tid());
+  }
+  right->set_journal_tid(0);
+
   oc->bh_remove(this, right);
   oc->bh_stat_sub(left);
   left->set_length(left->length() + right->length());
@@ -97,8 +104,8 @@ void ObjectCacher::Object::merge_left(BufferHead *left, BufferHead *right)
 
   // data
   left->bl.claim_append(right->bl);
-  
-  // version 
+
+  // version
   // note: this is sorta busted, but should only be used for dirty buffers
   left->last_write_tid =  MAX( left->last_write_tid, right->last_write_tid );
   left->last_write = MAX( left->last_write, right->last_write );
@@ -134,7 +141,8 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
   if (p != data.begin()) {
     --p;
     if (p->second->end() == bh->start() &&
-       p->second->get_state() == bh->get_state()) {
+        p->second->get_state() == bh->get_state() &&
+        p->second->can_merge_journal(bh)) {
       merge_left(p->second, bh);
       bh = p->second;
     } else {
@@ -146,7 +154,8 @@ void ObjectCacher::Object::try_merge_bh(BufferHead *bh)
   ++p;
   if (p != data.end() &&
       p->second->start() == bh->end() &&
-      p->second->get_state() == bh->get_state())
+      p->second->get_state() == bh->get_state() &&
+      p->second->can_merge_journal(bh))
     merge_left(bh, p->second);
 }
 
@@ -363,6 +372,7 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)
           oc->bh_add(this, final);
           ldout(oc->cct, 10) << "map_write adding trailing bh " << *final << dendl;
         } else {
+          replace_journal_tid(final, wr->journal_tid);
          oc->bh_stat_sub(final);
           final->set_length(final->length() + max);
          oc->bh_stat_add(final);
@@ -371,14 +381,14 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)
         cur += max;
         continue;
       }
-      
+
       ldout(oc->cct, 10) << "cur is " << cur << ", p is " << *p->second << dendl;
       //oc->verify_stats();
 
       if (p->first <= cur) {
         BufferHead *bh = p->second;
         ldout(oc->cct, 10) << "map_write bh " << *bh << " intersected" << dendl;
-        
+
         if (p->first < cur) {
           assert(final == 0);
           if (cur + max >= bh->end()) {
@@ -406,24 +416,26 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)
            oc->mark_dirty(final);
            --p;  // move iterator back to final
            assert(p->second == final);
+            replace_journal_tid(bh, 0);
             merge_left(final, bh);
          } else {
             final = bh;
          }
         }
-        
+
         // keep going.
         loff_t lenfromcur = final->end() - cur;
         cur += lenfromcur;
         left -= lenfromcur;
         ++p;
-        continue; 
+        continue;
       } else {
         // gap!
         loff_t next = p->first;
         loff_t glen = MIN(next - cur, max);
         ldout(oc->cct, 10) << "map_write gap " << cur << "~" << glen << dendl;
         if (final) {
+          replace_journal_tid(final, wr->journal_tid);
          oc->bh_stat_sub(final);
           final->set_length(final->length() + glen);
          oc->bh_stat_add(final);
@@ -433,21 +445,34 @@ ObjectCacher::BufferHead *ObjectCacher::Object::map_write(OSDWrite *wr)
           final->set_length( glen );
           oc->bh_add(this, final);
         }
-        
+
         cur += glen;
         left -= glen;
         continue;    // more?
       }
     }
   }
-  
-  // set versoin
+
+  // set version
   assert(final);
+  replace_journal_tid(final, wr->journal_tid);
   ldout(oc->cct, 10) << "map_write final is " << *final << dendl;
 
   return final;
 }
 
+void ObjectCacher::Object::replace_journal_tid(BufferHead *bh, ceph_tid_t tid) {
+  ceph_tid_t bh_tid = bh->get_journal_tid();
+
+  assert(tid == 0 || bh_tid <= tid);
+  if (bh_tid != 0 && bh_tid != tid) {
+    // inform journal that it should not expect a writeback from this extent
+    oc->writeback_handler.overwrite_extent(get_oid(), bh->start(), bh->length(),
+                                           bh_tid);
+  }
+  bh->set_journal_tid(tid);
+}
+
 void ObjectCacher::Object::truncate(loff_t s)
 {
   assert(oc->lock.is_locked());
@@ -467,6 +492,7 @@ void ObjectCacher::Object::truncate(loff_t s)
     // remove bh entirely
     assert(bh->start() >= s);
     assert(bh->waitfor_read.empty());
+    replace_journal_tid(bh, 0);
     oc->bh_remove(this, bh);
     delete bh;
   }
@@ -507,6 +533,7 @@ void ObjectCacher::Object::discard(loff_t off, loff_t len)
     ++p;
     ldout(oc->cct, 10) << "discard " << *this << " bh " << *bh << dendl;
     assert(bh->waitfor_read.empty());
+    replace_journal_tid(bh, 0);
     oc->bh_remove(this, bh);
     delete bh;
   }
@@ -843,10 +870,11 @@ void ObjectCacher::bh_write(BufferHead *bh)
                                               bh->ob->get_soid(), bh->start(), bh->length());
   // go
   ceph_tid_t tid = writeback_handler.write(bh->ob->get_oid(), bh->ob->get_oloc(),
-                                     bh->start(), bh->length(),
-                                     bh->snapc, bh->bl, bh->last_write,
-                                     bh->ob->truncate_size, bh->ob->truncate_seq,
-                                     oncommit);
+                                           bh->start(), bh->length(),
+                                           bh->snapc, bh->bl, bh->last_write,
+                                           bh->ob->truncate_size,
+                                           bh->ob->truncate_seq,
+                                           bh->journal_tid, oncommit);
   ldout(cct, 20) << " tid " << tid << " on " << bh->ob->get_oid() << dendl;
 
   // set bh last_write_tid
@@ -920,6 +948,7 @@ void ObjectCacher::bh_write_commit(int64_t poolid, sobject_t oid, loff_t start,
       if (r >= 0) {
        // ok!  mark bh clean and error-free
        mark_clean(bh);
+        bh->set_journal_tid(0);
        if (bh->get_nocache())
          bh_lru_rest.lru_bottouch(bh);
        hit.push_back(bh);
@@ -2192,6 +2221,7 @@ void ObjectCacher::bh_add(Object *ob, BufferHead *bh)
 void ObjectCacher::bh_remove(Object *ob, BufferHead *bh)
 {
   assert(lock.is_locked());
+  assert(bh->get_journal_tid() == 0);
   ldout(cct, 30) << "bh_remove " << *ob << " " << *bh << dendl;
   ob->remove_bh(bh);
   if (bh->is_dirty()) {
index c3b3b639c2227abf920a78a70251df070b1931b7..87fe351b4698a6ca54f5f4d9c2baf971d05c58c9 100644 (file)
@@ -71,13 +71,16 @@ class ObjectCacher {
     bufferlist bl;
     utime_t mtime;
     int fadvise_flags;
-    OSDWrite(const SnapContext& sc, const bufferlist& b, utime_t mt, int f)
-      : snapc(sc), bl(b), mtime(mt), fadvise_flags(f) {}
+    ceph_tid_t journal_tid;
+    OSDWrite(const SnapContext& sc, const bufferlist& b, utime_t mt, int f,
+             ceph_tid_t _journal_tid)
+      : snapc(sc), bl(b), mtime(mt), fadvise_flags(f),
+        journal_tid(_journal_tid) {}
   };
 
   OSDWrite *prepare_write(const SnapContext& sc, const bufferlist &b,
-                         utime_t mt, int f) { 
-    return new OSDWrite(sc, b, mt, f); 
+                         utime_t mt, int f, ceph_tid_t journal_tid) {
+    return new OSDWrite(sc, b, mt, f, journal_tid);
   }
 
 
@@ -111,6 +114,7 @@ class ObjectCacher {
     ceph_tid_t last_read_tid;   // tid of last read op (if any)
     utime_t last_write;
     SnapContext snapc;
+    ceph_tid_t journal_tid;
     int error; // holds return value for failed reads
     
     map< loff_t, list<Context*> > waitfor_read;
@@ -124,6 +128,7 @@ class ObjectCacher {
       ob(o),
       last_write_tid(0),
       last_read_tid(0),
+      journal_tid(0),
       error(0) {
       ex.start = ex.length = 0;
     }
@@ -143,7 +148,15 @@ class ObjectCacher {
       state = s;
     }
     int get_state() const { return state; }
-    
+
+    inline ceph_tid_t get_journal_tid() const {
+      return journal_tid;
+    }
+    inline void set_journal_tid(ceph_tid_t _journal_tid) {
+      
+      journal_tid = _journal_tid;
+    }
+
     bool is_missing() { return state == STATE_MISSING; }
     bool is_dirty() { return state == STATE_DIRTY; }
     bool is_clean() { return state == STATE_CLEAN; }
@@ -178,6 +191,11 @@ class ObjectCacher {
     bool get_nocache() {
       return nocache;
     }
+
+    inline bool can_merge_journal(BufferHead *bh) const {
+      return (get_journal_tid() == 0 || bh->get_journal_tid() == 0 ||
+              get_journal_tid() == bh->get_journal_tid());
+    }
   };
 
   // ******* Object *********
@@ -308,7 +326,8 @@ class ObjectCacher {
                  map<loff_t, BufferHead*>& rx,
                 map<loff_t, BufferHead*>& errors);
     BufferHead *map_write(OSDWrite *wr);
-    
+
+    void replace_journal_tid(BufferHead *bh, ceph_tid_t tid);
     void truncate(loff_t s);
     void discard(loff_t off, loff_t len);
 
@@ -687,7 +706,7 @@ public:
   int file_write(ObjectSet *oset, ceph_file_layout *layout, const SnapContext& snapc,
                  loff_t offset, uint64_t len, 
                  bufferlist& bl, utime_t mtime, int flags) {
-    OSDWrite *wr = prepare_write(snapc, bl, mtime, flags);
+    OSDWrite *wr = prepare_write(snapc, bl, mtime, flags, 0);
     Striper::file_to_extents(cct, oset->ino, layout, offset, len, oset->truncate_size, wr->extents);
     return writex(wr, oset, NULL);
   }
@@ -708,6 +727,9 @@ inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh)
       << " " << bh.ob
       << " (" << bh.bl.length() << ")"
       << " v " << bh.last_write_tid;
+  if (bh.get_journal_tid() != 0) {
+    out << " j " << bh.get_journal_tid();
+  }
   if (bh.is_tx()) out << " tx";
   if (bh.is_rx()) out << " rx";
   if (bh.is_dirty()) out << " dirty";
index cbcf20dbcdc70d0451fb7cbc1b324561b9b1cd6d..0e51cb003c212884a9818c88159167b9ea4f5ea1 100644 (file)
@@ -32,7 +32,10 @@ class WritebackHandler {
                           uint64_t off, uint64_t len, const SnapContext& snapc,
                           const bufferlist &bl, utime_t mtime,
                           uint64_t trunc_size, __u32 trunc_seq,
-                          Context *oncommit) = 0;
+                           ceph_tid_t journal_tid, Context *oncommit) = 0;
+
+  virtual void overwrite_extent(const object_t& oid, uint64_t off, uint64_t len,
+                                ceph_tid_t journal_tid) {}
 
   virtual void get_client_lock() {}
   virtual void put_client_lock() {}
index 69f32991b89ea763476662ca00502bb2617950aa..32ae4f52a58b04ff67b6fd239a14f5af8b09cc5a 100644 (file)
@@ -74,7 +74,7 @@ ceph_tid_t FakeWriteback::write(const object_t& oid,
                           const SnapContext& snapc,
                           const bufferlist &bl, utime_t mtime,
                           uint64_t trunc_size, __u32 trunc_seq,
-                          Context *oncommit)
+                          ceph_tid_t journal_tid, Context *oncommit)
 {
   C_Delay *wrapper = new C_Delay(m_cct, oncommit, m_lock, off, NULL, m_delay_ns);
   m_finisher->queue(wrapper, 0);
index 9b9598ed94800886a728640aff4dc6a2346fd54c..351d521073a989baa8f1b9654077f36cc6d6770c 100644 (file)
@@ -26,7 +26,8 @@ public:
                           uint64_t off, uint64_t len,
                           const SnapContext& snapc, const bufferlist &bl,
                           utime_t mtime, uint64_t trunc_size,
-                          __u32 trunc_seq, Context *oncommit);
+                          __u32 trunc_seq, ceph_tid_t journal_tid,
+                           Context *oncommit);
 
   virtual bool may_copy_on_write(const object_t&, uint64_t, uint64_t, snapid_t);
 private:
index 39aabfdeb225dcad96ab8380538c2b3eae4487d8..30438b77f2b0fd1677261138079a796fad309379 100644 (file)
@@ -109,7 +109,8 @@ int stress_test(uint64_t num_ops, uint64_t num_objs,
       else
        assert(r == 0);
     } else {
-      ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0);
+      ObjectCacher::OSDWrite *wr = obc.prepare_write(snapc, bl, utime_t(), 0,
+                                                     random());
       wr->extents.push_back(op->extent);
       lock.Lock();
       obc.writex(wr, &object_set, NULL);