]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osdc: locking for Journaler
authorJohn Spray <john.spray@redhat.com>
Wed, 23 Jul 2014 11:48:57 +0000 (12:48 +0100)
committerJohn Spray <john.spray@redhat.com>
Mon, 25 Aug 2014 00:34:03 +0000 (01:34 +0100)
also remove the lock pointers that JournalPointer load/save
used to require in order to use the objecter.

Signed-off-by: John Spray <john.spray@redhat.com>
src/mds/JournalPointer.cc
src/mds/JournalPointer.h
src/mds/MDLog.cc
src/mds/MDLog.h
src/osdc/Journaler.cc
src/osdc/Journaler.h
src/tools/cephfs/Dumper.cc
src/tools/cephfs/Resetter.cc

index 8c8c54c08de92f1f3c7a35b33f3e591ad2396a6a..0ceb758a9b7540e0b40fc396643e7590ed91c80b 100644 (file)
@@ -40,21 +40,17 @@ std::string JournalPointer::get_object_id() const
 /**
  * Blocking read of JournalPointer for this MDS
  */
-int JournalPointer::load(Objecter *objecter, Mutex *lock)
+int JournalPointer::load(Objecter *objecter)
 {
-  assert(lock != NULL);
   assert(objecter != NULL);
-  assert(!lock->is_locked_by_me());
 
   // Blocking read of data
   std::string const object_id = get_object_id();
   dout(4) << "Reading journal pointer '" << object_id << "'" << dendl;
   bufferlist data;
   C_SaferCond waiter;
-  lock->Lock();
   objecter->read_full(object_t(object_id), object_locator_t(pool_id),
       CEPH_NOSNAP, &data, 0, &waiter);
-  lock->Unlock();
   int r = waiter.wait();
 
   // Construct JournalPointer result, null or decoded data
@@ -73,11 +69,9 @@ int JournalPointer::load(Objecter *objecter, Mutex *lock)
  *
  * @return objecter write op status code
  */
-int JournalPointer::save(Objecter *objecter, Mutex *lock) const
+int JournalPointer::save(Objecter *objecter) const
 {
-  assert(lock != NULL);
   assert(objecter != NULL);
-  assert(!lock->is_locked_by_me());
   // It is not valid to persist a null pointer
   assert(!is_null());
 
@@ -91,10 +85,8 @@ int JournalPointer::save(Objecter *objecter, Mutex *lock) const
     << std::hex << front << ":0x" << back << std::dec << dendl;
 
   C_SaferCond waiter;
-  lock->Lock();
   objecter->write_full(object_t(object_id), object_locator_t(pool_id),
       SnapContext(), data, ceph_clock_now(g_ceph_context), 0, NULL, &waiter);
-  lock->Unlock();
   int write_result = waiter.wait();
   if (write_result < 0) {
     derr << "Error writing pointer object '" << object_id << "': " << cpp_strerror(write_result) << dendl;
index 7724de35ef3c2fc21dee23df8d001291ecc3cccc..559543b44a1f03262cee64d80a818258ab92b307 100644 (file)
@@ -57,8 +57,8 @@ class JournalPointer {
 
   JournalPointer() : node_id(-1), pool_id(-1), front(0), back(0) {}
 
-  int load(Objecter *objecter, Mutex *lock);
-  int save(Objecter *objecter, Mutex *lock) const;
+  int load(Objecter *objecter);
+  int save(Objecter *objecter) const;
   void save(Objecter *objecter, Context *completion) const;
 
   bool is_null() const {
index 1eb6096665e31bd03503ad8dc512b511f5d3d2d6..7e44ea273f378bf5d76445696a56dce422eee2e7 100644 (file)
@@ -700,11 +700,11 @@ void MDLog::_recovery_thread(Context *completion)
   // If the pointer object is not present, then create it with
   // front = default ino and back = null
   JournalPointer jp(mds->get_nodeid(), mds->mdsmap->get_metadata_pool());
-  int const read_result = jp.load(mds->objecter, &(mds->mds_lock));
+  int const read_result = jp.load(mds->objecter);
   if (read_result == -ENOENT) {
     inodeno_t const default_log_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
     jp.front = default_log_ino;
-    int write_result = jp.save(mds->objecter, &(mds->mds_lock));
+    int write_result = jp.save(mds->objecter);
     // Nothing graceful we can do for this
     assert(write_result >= 0);
   } else if (read_result != 0) {
@@ -753,7 +753,7 @@ void MDLog::_recovery_thread(Context *completion)
     } else {
       dout(1) << "Successfully erased journal, updating journal pointer" << dendl;
       jp.back = 0;
-      int write_result = jp.save(mds->objecter, &(mds->mds_lock));
+      int write_result = jp.save(mds->objecter);
       // Nothing graceful we can do for this
       assert(write_result >= 0);
     }
@@ -823,7 +823,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
   inodeno_t primary_ino = MDS_INO_LOG_OFFSET + mds->get_nodeid();
   inodeno_t secondary_ino = MDS_INO_LOG_BACKUP_OFFSET + mds->get_nodeid();
   jp.back = (jp.front == primary_ino ? secondary_ino : primary_ino);
-  int write_result = jp.save(mds->objecter, &(mds->mds_lock));
+  int write_result = jp.save(mds->objecter);
   assert(write_result == 0);
 
   /* Create the new Journaler file */
@@ -907,7 +907,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
   inodeno_t const tmp = jp.front;
   jp.front = jp.back;
   jp.back = tmp;
-  write_result = jp.save(mds->objecter, &(mds->mds_lock));
+  write_result = jp.save(mds->objecter);
   assert(write_result == 0);
 
   /* Delete the old journal to free space */
@@ -922,7 +922,7 @@ void MDLog::_reformat_journal(JournalPointer const &jp_in, Journaler *old_journa
 
   /* Update the pointer to reflect we're back in clean single journal state. */
   jp.back = 0;
-  write_result = jp.save(mds->objecter, &(mds->mds_lock));
+  write_result = jp.save(mds->objecter);
   assert(write_result == 0);
 
   /* Reset the Journaler object to its default state */
index ac27efc094cc742059c96eab6413436d98673ed4..f3b3a04f989de164ced20e5b2ad9f3fc2472a7f3 100644 (file)
@@ -59,6 +59,8 @@ class PerfCounters;
 #include <map>
 using std::map;
 
+#include "common/Finisher.h"
+
 
 class MDLog {
 public:
index 94a7fd2a25599f52355f5e6b283fb1830656e650..b4b51b19a7fd38b21e63a8ed3f79f79de329fc92 100644 (file)
 
 void Journaler::set_readonly()
 {
+  Mutex::Locker l(lock);
+
   ldout(cct, 1) << "set_readonly" << dendl;
   readonly = true;
 }
 
 void Journaler::set_writeable()
 {
+  Mutex::Locker l(lock);
+
   ldout(cct, 1) << "set_writeable" << dendl;
   readonly = false;
 }
 
 void Journaler::create(ceph_file_layout *l, stream_format_t const sf)
 {
+  Mutex::Locker lk(lock);
+
   assert(!readonly);
   state = STATE_ACTIVE;
 
   stream_format = sf;
   journal_stream.set_format(sf);
-  set_layout(l);
+  _set_layout(l);
 
   prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos =
     read_pos = requested_pos = received_pos =
@@ -54,7 +60,13 @@ void Journaler::create(ceph_file_layout *l, stream_format_t const sf)
     << ", format=" << stream_format << dendl;
 }
 
-void Journaler::set_layout(ceph_file_layout *l)
+void Journaler::set_layout(ceph_file_layout const *l)
+{
+    Mutex::Locker lk(lock);
+    _set_layout(l);
+}
+
+void Journaler::_set_layout(ceph_file_layout const *l)
 {
   layout = *l;
 
@@ -127,6 +139,8 @@ public:
 
 void Journaler::recover(Context *onread) 
 {
+  Mutex::Locker l(lock);
+
   ldout(cct, 1) << "recover start" << dendl;
   assert(state != STATE_ACTIVE);
   assert(readonly);
@@ -154,6 +168,12 @@ void Journaler::read_head(Context *on_finish, bufferlist *bl)
   objecter->read_full(oid, oloc, CEPH_NOSNAP, bl, 0, on_finish);
 }
 
+void Journaler::reread_head(Context *onfinish)
+{
+  Mutex::Locker l(lock);
+  _reread_head(onfinish);
+}
+
 /**
  * Re-read the head from disk, and set the write_pos, expire_pos, trimmed_pos
  * from the on-disk header. This switches the state to STATE_REREADHEAD for
@@ -162,7 +182,7 @@ void Journaler::read_head(Context *on_finish, bufferlist *bl)
  * Also, don't call this until the Journaler has finished its recovery and has
  * gone STATE_ACTIVE!
  */
-void Journaler::reread_head(Context *onfinish)
+void Journaler::_reread_head(Context *onfinish)
 {
   ldout(cct, 10) << "reread_head" << dendl;
   assert(state == STATE_ACTIVE);
@@ -174,6 +194,8 @@ void Journaler::reread_head(Context *onfinish)
 
 void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
 {
+  Mutex::Locker l(lock);
+
   //read on-disk header into
   assert(bl.length() || r < 0 );
 
@@ -191,6 +213,8 @@ void Journaler::_finish_reread_head(int r, bufferlist& bl, Context *finish)
 
 void Journaler::_finish_read_head(int r, bufferlist& bl)
 {
+  Mutex::Locker l(lock);
+
   assert(state == STATE_READHEAD);
 
   if (r!=0) {
@@ -238,7 +262,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
   trimmed_pos = trimming_pos = h.trimmed_pos;
 
   init_headers(h);
-  set_layout(&h.layout);
+  _set_layout(&h.layout);
   stream_format = h.stream_format;
   journal_stream.set_format(h.stream_format);
 
@@ -259,6 +283,8 @@ void Journaler::probe(Context *finish, uint64_t *end)
 
 void Journaler::reprobe(Context *finish)
 {
+  Mutex::Locker l(lock);
+
   ldout(cct, 10) << "reprobe" << dendl;
   assert(state == STATE_ACTIVE);
 
@@ -268,7 +294,10 @@ void Journaler::reprobe(Context *finish)
 }
 
 
-void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish) {
+void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish)
+{
+  Mutex::Locker l(lock);
+
   assert(new_end >= write_pos || r < 0);
   ldout(cct, 1) << "_finish_reprobe new_end = " << new_end 
          << " (header had " << write_pos << ")."
@@ -280,6 +309,8 @@ void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish) {
 
 void Journaler::_finish_probe_end(int r, uint64_t end)
 {
+  Mutex::Locker l(lock);
+
   assert(state == STATE_PROBING);
   if (r < 0) { // error in probing
     goto out;
@@ -322,12 +353,17 @@ public:
 
 void Journaler::reread_head_and_probe(Context *onfinish)
 {
+  Mutex::Locker l(lock);
+
   assert(state == STATE_ACTIVE);
-  reread_head(new C_RereadHeadProbe(this, onfinish));
+  _reread_head(new C_RereadHeadProbe(this, onfinish));
 }
 
 void Journaler::_finish_reread_head_and_probe(int r, Context *onfinish)
 {
+  // Expect to be called back from finish_reread_head, which already takes lock
+  assert(lock.is_locked_by_me());
+
   assert(!r); //if we get an error, we're boned
   reprobe(onfinish);
 }
@@ -347,6 +383,13 @@ public:
 };
 
 void Journaler::write_head(Context *oncommit)
+{
+  Mutex::Locker l(lock);
+  _write_head(oncommit);
+}
+
+
+void Journaler::_write_head(Context *oncommit)
 {
   assert(!readonly);
   assert(state == STATE_ACTIVE);
@@ -376,6 +419,8 @@ void Journaler::write_head(Context *oncommit)
 
 void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit)
 {
+  Mutex::Locker l(lock);
+
   if (r < 0) {
     lderr(cct) << "_finish_write_head got " << cpp_strerror(r) << dendl;
     handle_write_error(r);
@@ -388,7 +433,7 @@ void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit)
     oncommit->complete(r);
   }
 
-  trim();  // trim?
+  _trim();  // trim?
 }
 
 
@@ -407,7 +452,9 @@ public:
 
 void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp)
 {
+  Mutex::Locker l(lock);
   assert(!readonly);
+
   if (r < 0) {
     lderr(cct) << "_finish_flush got " << cpp_strerror(r) << dendl;
     handle_write_error(r);
@@ -452,6 +499,8 @@ void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp)
 
 uint64_t Journaler::append_entry(bufferlist& bl)
 {
+  Mutex::Locker l(lock);
+
   assert(!readonly);
   uint32_t s = bl.length();
 
@@ -569,8 +618,13 @@ void Journaler::_do_flush(unsigned amount)
 }
 
 
-
 void Journaler::wait_for_flush(Context *onsafe)
+{
+  Mutex::Locker l(lock);
+  _wait_for_flush(onsafe);
+}
+
+void Journaler::_wait_for_flush(Context *onsafe)
 {
   assert(!readonly);
   
@@ -592,6 +646,12 @@ void Journaler::wait_for_flush(Context *onsafe)
 }  
 
 void Journaler::flush(Context *onsafe)
+{
+  Mutex::Locker l(lock);
+  _flush(onsafe);
+}
+
+void Journaler::_flush(Context *onsafe)
 {
   assert(!readonly);
 
@@ -603,29 +663,25 @@ void Journaler::flush(Context *onsafe)
       onsafe->complete(0);
     }
   } else {
-    if (1) {
-      // maybe buffer
-      if (write_buf.length() < cct->_conf->journaler_batch_max) {
-       // delay!  schedule an event.
-       ldout(cct, 20) << "flush delaying flush" << dendl;
-       if (delay_flush_event)
-         timer->cancel_event(delay_flush_event);
-       delay_flush_event = new C_DelayFlush(this);
-       timer->add_event_after(cct->_conf->journaler_batch_interval, delay_flush_event);        
-      } else {
-       ldout(cct, 20) << "flush not delaying flush" << dendl;
-       _do_flush();
+    // maybe buffer
+    if (write_buf.length() < cct->_conf->journaler_batch_max) {
+      // delay!  schedule an event.
+      ldout(cct, 20) << "flush delaying flush" << dendl;
+      if (delay_flush_event) {
+        timer->cancel_event(delay_flush_event);
       }
+      delay_flush_event = new C_DelayFlush(this);
+      timer->add_event_after(cct->_conf->journaler_batch_interval, delay_flush_event); 
     } else {
-      // always flush
+      ldout(cct, 20) << "flush not delaying flush" << dendl;
       _do_flush();
     }
-    wait_for_flush(onsafe);
+    _wait_for_flush(onsafe);
   }
 
   // write head?
   if (last_wrote_head.sec() + cct->_conf->journaler_write_head_interval < ceph_clock_now(cct).sec()) {
-    write_head();
+    _write_head();
   }
 }
 
@@ -637,7 +693,7 @@ struct C_Journaler_Prezero : public Context {
   uint64_t from, len;
   C_Journaler_Prezero(Journaler *j, uint64_t f, uint64_t l) : journaler(j), from(f), len(l) {}
   void finish(int r) {
-    journaler->_prezeroed(r, from, len);
+    journaler->_finish_prezero(r, from, len);
   }
 };
 
@@ -678,8 +734,10 @@ void Journaler::_issue_prezero()
   }
 }
 
-void Journaler::_prezeroed(int r, uint64_t start, uint64_t len)
+void Journaler::_finish_prezero(int r, uint64_t start, uint64_t len)
 {
+  Mutex::Locker l(lock);
+
   ldout(cct, 10) << "_prezeroed to " << start << "~" << len
           << ", prezeroing/prezero was " << prezeroing_pos << "/" << prezero_pos
           << ", pending " << pending_zero
@@ -733,13 +791,15 @@ class Journaler::C_RetryRead : public Context {
 public:
   C_RetryRead(Journaler *l) : ls(l) {}
   void finish(int r) {
-    // kickstart.
+    Mutex::Locker l(ls->lock);
     ls->_prefetch();
   }  
 };
 
 void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl)
 {
+  Mutex::Locker l(lock);
+
   if (r < 0) {
     ldout(cct, 0) << "_finish_read got error " << r << dendl;
     error = r;
@@ -809,8 +869,9 @@ void Journaler::_issue_read(uint64_t len)
   if (requested_pos == safe_pos) {
     ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl;
     assert(write_pos > requested_pos);
-    if (flush_pos == safe_pos)
-      flush();
+    if (flush_pos == safe_pos) {
+      _flush(NULL);
+    }
     assert(flush_pos > safe_pos);
     waitfor_safe[flush_pos].push_back(new C_RetryRead(this));
     return;
@@ -939,7 +1000,7 @@ class Journaler::C_EraseFinish : public Context {
   public:
   C_EraseFinish(Journaler *j, Context *c) : journaler(j), completion(c) {}
   void finish(int r) {
-    journaler->_erase_finish(r, completion);
+    journaler->_finish_erase(r, completion);
   }
 };
 
@@ -949,19 +1010,23 @@ class Journaler::C_EraseFinish : public Context {
  */
 void Journaler::erase(Context *completion)
 {
+  Mutex::Locker l(lock);
+
   // Async delete the journal data
   uint64_t first = trimmed_pos / get_layout_period();
   uint64_t num = (write_pos - trimmed_pos) / get_layout_period() + 2;
   filer.purge_range(ino, &layout, SnapContext(), first, num, ceph_clock_now(cct), 0,
       new C_EraseFinish(this, completion));
 
-  // We will not start the operation to delete the header until _erase_finish has
+  // We will not start the operation to delete the header until _finish_erase has
   // seen the data deletion succeed: otherwise if there was an error deleting data
   // we might prematurely delete the header thereby lose our reference to the data.
 }
 
-void Journaler::_erase_finish(int data_result, Context *completion)
+void Journaler::_finish_erase(int data_result, Context *completion)
 {
+  Mutex::Locker l(lock);
+
   if (data_result == 0) {
     // Async delete the journal header
     filer.purge_range(ino, &layout, SnapContext(), 0, 1, ceph_clock_now(cct), 0,
@@ -1019,11 +1084,17 @@ class Journaler::C_Trim : public Context {
 public:
   C_Trim(Journaler *l, int64_t t) : ls(l), to(t) {}
   void finish(int r) {
-    ls->_trim_finish(r, to);
+    ls->_finish_trim(r, to);
   }
 };
 
 void Journaler::trim()
+{
+  Mutex::Locker l(lock);
+  _trim();
+}
+
+void Journaler::_trim()
 {
   assert(!readonly);
   uint64_t period = get_layout_period();
@@ -1062,15 +1133,17 @@ void Journaler::trim()
   trimming_pos = trim_to;  
 }
 
-void Journaler::_trim_finish(int r, uint64_t to)
+void Journaler::_finish_trim(int r, uint64_t to)
 {
+  Mutex::Locker l(lock);
+
   assert(!readonly);
-  ldout(cct, 10) << "_trim_finish trimmed_pos was " << trimmed_pos
+  ldout(cct, 10) << "_finish_trim trimmed_pos was " << trimmed_pos
           << ", trimmed/trimming/expire now "
           << to << "/" << trimming_pos << "/" << expire_pos
           << dendl;
   if (r < 0 && r != -ENOENT) {
-    lderr(cct) << "_trim_finish got " << cpp_strerror(r) << dendl;
+    lderr(cct) << "_finish_trim got " << cpp_strerror(r) << dendl;
     handle_write_error(r);
     return;
   }
index 19e724d38094577ea13140216ab0b6037a0eb10d..76192353aea64cbdd7a19a8bbc1f369bfae5d8fe 100644 (file)
@@ -109,7 +109,8 @@ class JournalStream
 class Journaler {
 public:
   // this goes at the head of the log "file".
-  struct Header {
+  class Header {
+    public:
     uint64_t trimmed_pos;
     uint64_t expire_pos;
     uint64_t unused_field;
@@ -186,7 +187,7 @@ public:
       ls.push_back(new Header());
       ls.back()->stream_format = JOURNAL_FORMAT_RESILIENT;
     }
-  } last_written, last_committed;
+  };
   WRITE_CLASS_ENCODER(Header)
 
   uint32_t get_stream_format() const {
@@ -196,6 +197,9 @@ public:
 private:
   // me
   CephContext *cct;
+  Mutex lock;
+  Header last_written;
+  Header last_committed;
   inodeno_t ino;
   int64_t pg_pool;
   bool readonly;
@@ -220,11 +224,20 @@ private:
     public:
     C_DelayFlush(Journaler *j) : journaler(j) {}
     void finish(int r) {
-      journaler->delay_flush_event = 0;
-      journaler->_do_flush();
+      journaler->_do_delayed_flush();
     }
   } *delay_flush_event;
 
+  /*
+   * Do a flush as a result of a C_DelayFlush context.
+   */
+  void _do_delayed_flush()
+  {
+    assert(delay_flush_event != NULL);
+    Mutex::Locker l(lock);
+    delay_flush_event = NULL;
+    _do_flush();
+  }
 
   // my state
   static const int STATE_UNDEF = 0;
@@ -237,12 +250,18 @@ private:
   int state;
   int error;
 
+  void _write_head(Context *oncommit=NULL);
+  void _wait_for_flush(Context *onsafe);
+  void _trim();
+
   // header
   utime_t last_wrote_head;
   void _finish_write_head(int r, Header &wrote, Context *oncommit);
   class C_WriteHead;
   friend class C_WriteHead;
 
+  void _reread_head(Context *onfinish);
+  void _set_layout(ceph_file_layout const *l);
   list<Context*> waitfor_recover;
   void read_head(Context *on_finish, bufferlist *bl);
   void _finish_read_head(int r, bufferlist& bl);
@@ -275,6 +294,7 @@ private:
   std::set<uint64_t> pending_safe;
   std::map<uint64_t, std::list<Context*> > waitfor_safe; // when safe through given offset
 
+  void _flush(Context *onsafe);
   void _do_flush(unsigned amount=0);
   void _finish_flush(int r, uint64_t start, utime_t stamp);
   class C_Flush;
@@ -297,6 +317,7 @@ private:
   Context    *on_write_error;
 
   void _finish_read(int r, uint64_t offset, bufferlist &bl); // read completion callback
+  void _finish_retry_read(int r);
   void _assimilate_prefetch();
   void _issue_read(uint64_t len);  // read some more
   void _prefetch();             // maybe read ahead
@@ -311,12 +332,12 @@ private:
   uint64_t trimmed_pos;   // what has been trimmed
   map<uint64_t, list<Context*> > waitfor_trim;
 
-  void _trim_finish(int r, uint64_t to);
+  void _finish_trim(int r, uint64_t to);
   class C_Trim;
   friend class C_Trim;
 
   void _issue_prezero();
-  void _prezeroed(int r, uint64_t from, uint64_t len);
+  void _finish_prezero(int r, uint64_t from, uint64_t len);
   friend struct C_Journaler_Prezero;
 
   // only init_headers when following or first reading off-disk
@@ -338,9 +359,14 @@ private:
 
   bool _is_readable();
 
+  void _finish_erase(int data_result, Context *completion);
+  class C_EraseFinish;
+  friend class C_EraseFinish;
+
 public:
   Journaler(inodeno_t ino_, int64_t pool, const char *mag, Objecter *obj, PerfCounters *l, int lkey, SafeTimer *tim) : 
-    last_written(mag), last_committed(mag), cct(obj->cct),
+    cct(obj->cct), lock("Journaler"),
+    last_written(mag), last_committed(mag), 
     ino(ino_), pg_pool(pool), readonly(true),
     stream_format(-1), journal_stream(-1),
     magic(mag),
@@ -363,9 +389,11 @@ public:
    * an "erase" method.
    */
   void reset() {
+    Mutex::Locker l(lock);
     assert(state == STATE_ACTIVE);
+
     readonly = true;
-    delay_flush_event = 0;
+    delay_flush_event = NULL;
     state = STATE_UNDEF;
     error = 0;
     prezeroing_pos = 0;
@@ -384,58 +412,51 @@ public:
     waiting_for_zero = false;
   }
 
+  // Asynchronous operations
+  // =======================
   void erase(Context *completion);
-private:
-  void _erase_finish(int data_result, Context *completion);
-  class C_EraseFinish;
-  friend class C_EraseFinish;
-
-public:
   void create(ceph_file_layout *layout, stream_format_t const sf);
   void recover(Context *onfinish);
   void reread_head(Context *onfinish);
   void reprobe(Context *onfinish);
   void reread_head_and_probe(Context *onfinish);
   void write_head(Context *onsave=0);
-
-  void set_layout(ceph_file_layout *l);
-
-  void set_readonly();
-  void set_writeable();
-  bool is_readonly() { return readonly; }
-
-  bool is_active() { return state == STATE_ACTIVE; }
-  int get_error() { return error; }
-
-  uint64_t get_write_pos() const { return write_pos; }
-  uint64_t get_write_safe_pos() const { return safe_pos; }
-  uint64_t get_read_pos() const { return read_pos; }
-  uint64_t get_expire_pos() const { return expire_pos; }
-  uint64_t get_trimmed_pos() const { return trimmed_pos; }
-
-  uint64_t get_layout_period() const { return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; }
-  ceph_file_layout& get_layout() { return layout; }
-
-  // write
-  uint64_t append_entry(bufferlist& bl);
   void wait_for_flush(Context *onsafe = 0);
   void flush(Context *onsafe = 0);
+  void wait_for_readable(Context *onfinish);
 
-  // read
+  // Synchronous setters
+  // ===================
+  void set_layout(ceph_file_layout const *l);
+  void set_readonly();
+  void set_writeable();
+  void set_write_pos(int64_t p) { 
+    Mutex::Locker l(lock);
+    prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p;
+  }
   void set_read_pos(int64_t p) { 
+    Mutex::Locker l(lock);
     assert(requested_pos == received_pos);  // we can't cope w/ in-progress read right now.
     read_pos = requested_pos = received_pos = p;
     read_buf.clear();
   }
-
-  bool is_readable();
-  bool try_read_entry(bufferlist& bl);
-  void wait_for_readable(Context *onfinish);
-  
-  void set_write_pos(int64_t p) { 
-    prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = p;
+  uint64_t append_entry(bufferlist& bl);
+  void set_expire_pos(int64_t ep) {
+      Mutex::Locker l(lock);
+      expire_pos = ep;
+  }
+  void set_trimmed_pos(int64_t p) {
+      Mutex::Locker l(lock);
+      trimming_pos = trimmed_pos = p;
   }
 
+  void trim();
+  void trim_tail() {
+    Mutex::Locker l(lock);
+
+    assert(!readonly);
+    _issue_prezero();
+  }
   /**
    * set write error callback
    *
@@ -451,19 +472,25 @@ public:
    * @param c callback/context to trigger on error
    */
   void set_write_error_handler(Context *c) {
+    Mutex::Locker l(lock);
     assert(!on_write_error);
     on_write_error = c;
   }
 
-  // trim
-  void set_expire_pos(int64_t ep) { expire_pos = ep; }
-  void set_trimmed_pos(int64_t p) { trimming_pos = trimmed_pos = p; }
-
-  void trim();
-  void trim_tail() {
-    assert(!readonly);
-    _issue_prezero();
-  }
+  // Synchronous getters
+  // ===================
+  uint64_t get_layout_period() const { return (uint64_t)layout.fl_stripe_count * (uint64_t)layout.fl_object_size; }
+  ceph_file_layout& get_layout() { return layout; }
+  bool is_active() { return state == STATE_ACTIVE; }
+  int get_error() { return error; }
+  bool is_readonly() { return readonly; }
+  bool is_readable();
+  bool try_read_entry(bufferlist& bl);
+  uint64_t get_write_pos() const { return write_pos; }
+  uint64_t get_write_safe_pos() const { return safe_pos; }
+  uint64_t get_read_pos() const { return read_pos; }
+  uint64_t get_expire_pos() const { return expire_pos; }
+  uint64_t get_trimmed_pos() const { return trimmed_pos; }
 };
 WRITE_CLASS_ENCODER(Journaler::Header)
 
index 665fa88a5050535dff76dd69dd17d5241a4b1a0c..5070f5ea14a9bb2f108b6ebf4f8ca06c7caadd66 100644 (file)
@@ -40,7 +40,7 @@ int Dumper::init(int rank_)
   }
 
   JournalPointer jp(rank, mdsmap->get_metadata_pool());
-  int jp_load_result = jp.load(objecter, &lock);
+  int jp_load_result = jp.load(objecter);
   if (jp_load_result != 0) {
     std::cerr << "Error loading journal: " << cpp_strerror(jp_load_result) << std::endl;
     return jp_load_result;
index fde2dddd161a41f6ec20d6b33f6e72dba86096c4..bcfa10eec231011e9b4eca863cb12097c4102bf9 100644 (file)
@@ -31,7 +31,7 @@ void Resetter::reset(int rank)
   int r;
 
   JournalPointer jp(rank, mdsmap->get_metadata_pool());
-  int jp_load_result = jp.load(objecter, &lock);
+  int jp_load_result = jp.load(objecter);
   if (jp_load_result != 0) {
     std::cerr << "Error loading journal: " << cpp_strerror(jp_load_result) << std::endl;
     return;