]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
Filer, Journaler: deglobalize
authorColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Fri, 24 Jun 2011 17:32:35 +0000 (10:32 -0700)
committerColin Patrick McCabe <cmccabe@alumni.cmu.edu>
Fri, 24 Jun 2011 17:35:42 +0000 (10:35 -0700)
Signed-off-by: Colin McCabe <colin.mccabe@dreamhost.com>
src/osdc/Filer.cc
src/osdc/Filer.h
src/osdc/Journaler.cc
src/osdc/Journaler.h
src/osdc/Objecter.h

index 335a879fd0a2fb6857cf7cd1cc864be64a4e49c8..efd5cbdf5878c254357da7b5a7b3d2f78afcaf52 100644 (file)
@@ -61,7 +61,7 @@ int Filer::probe(inodeno_t ino,
                 int flags,
                 Context *onfinish) 
 {
-  dout(10) << "probe " << (fwd ? "fwd ":"bwd ")
+  ldout(cct, 10) << "probe " << (fwd ? "fwd ":"bwd ")
           << hex << ino << dec
           << " starting from " << start_from
           << dendl;
@@ -92,7 +92,7 @@ int Filer::probe(inodeno_t ino,
 
 void Filer::_probe(Probe *probe)
 {
-  dout(10) << "_probe " << hex << probe->ino << dec 
+  ldout(cct, 10) << "_probe " << hex << probe->ino << dec 
           << " " << probe->probing_off << "~" << probe->probing_len 
           << dendl;
   
@@ -105,7 +105,7 @@ void Filer::_probe(Probe *probe)
   for (vector<ObjectExtent>::iterator p = probe->probing.begin();
        p != probe->probing.end();
        p++) {
-    dout(10) << "_probe  probing " << p->oid << dendl;
+    ldout(cct, 10) << "_probe  probing " << p->oid << dendl;
     C_Probe *c = new C_Probe(this, probe, p->oid);
     probe->ops[p->oid] = objecter->stat(p->oid, p->oloc, probe->snapid, &c->size, &c->mtime, 
                                        probe->flags | CEPH_OSD_FLAG_RWORDERED, c);
@@ -114,7 +114,7 @@ void Filer::_probe(Probe *probe)
 
 void Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mtime)
 {
-  dout(10) << "_probed " << probe->ino << " object " << oid
+  ldout(cct, 10) << "_probed " << probe->ino << " object " << oid
           << " has size " << size << " mtime " << mtime << dendl;
 
   probe->known_size[oid] = size;
@@ -144,7 +144,7 @@ void Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt
        p != probe->probing.end();
        p++) {
     uint64_t shouldbe = p->length + p->offset;
-    dout(10) << "_probed  " << probe->ino << " object " << hex << p->oid << dec
+    ldout(cct, 10) << "_probed  " << probe->ino << " object " << hex << p->oid << dec
             << " should be " << shouldbe
             << ", actual is " << probe->known_size[p->oid]
             << dendl;
@@ -164,12 +164,12 @@ void Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt
           i++) {
        if (oleft <= (uint64_t)i->second) {
          end = probe->probing_off + i->first + oleft;
-         dout(10) << "_probed  end is in buffer_extent " << i->first << "~" << i->second << " off " << oleft 
+         ldout(cct, 10) << "_probed  end is in buffer_extent " << i->first << "~" << i->second << " off " << oleft 
                   << ", from was " << probe->probing_off << ", end is " << end 
                   << dendl;
          
          probe->found_size = true;
-         dout(10) << "_probed found size at " << end << dendl;
+         ldout(cct, 10) << "_probed found size at " << end << dendl;
          *probe->psize = end;
          
          if (!probe->pmtime)  // stop if we don't need mtime too
@@ -183,7 +183,7 @@ void Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt
 
   if (!probe->found_size || (probe->probing_off && probe->pmtime)) {
     // keep probing!
-    dout(10) << "_probed probing further" << dendl;
+    ldout(cct, 10) << "_probed probing further" << dendl;
 
     uint64_t period = probe->layout.fl_stripe_count * probe->layout.fl_object_size;
     if (probe->fwd) {
@@ -201,7 +201,7 @@ void Filer::_probed(Probe *probe, const object_t& oid, uint64_t size, utime_t mt
   }
 
   if (probe->pmtime) {
-    dout(10) << "_probed found mtime " << probe->max_mtime << dendl;
+    ldout(cct, 10) << "_probed found mtime " << probe->max_mtime << dendl;
     *probe->pmtime = probe->max_mtime;
   }
 
@@ -271,7 +271,7 @@ struct C_PurgeRange : public Context {
 void Filer::_do_purge_range(PurgeRange *pr, int fin)
 {
   pr->uncommitted -= fin;
-  dout(10) << "_do_purge_range " << pr->ino << " objects " << pr->first << "~" << pr->num
+  ldout(cct, 10) << "_do_purge_range " << pr->ino << " objects " << pr->first << "~" << pr->num
           << " uncommitted " << pr->uncommitted << dendl;
 
   if (pr->num == 0 && pr->uncommitted == 0) {
@@ -301,7 +301,7 @@ void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout,
                             uint64_t offset, uint64_t len,
                             vector<ObjectExtent>& extents)
 {
-  dout(10) << "file_to_extents " << offset << "~" << len 
+  ldout(cct, 10) << "file_to_extents " << offset << "~" << len 
            << " on " << hex << ino << dec
            << dendl;
   assert(len > 0);
@@ -317,7 +317,7 @@ void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout,
   __u32 stripe_count = layout->fl_stripe_count;
   assert(object_size >= su);
   uint64_t stripes_per_object = object_size / su;
-  dout(20) << " stripes_per_object " << stripes_per_object << dendl;
+  ldout(cct, 20) << " stripes_per_object " << stripes_per_object << dendl;
 
   uint64_t cur = offset;
   uint64_t left = len;
@@ -364,8 +364,8 @@ void Filer::file_to_extents(inodeno_t ino, ceph_file_layout *layout,
     }
     ex->buffer_extents[cur-offset] = x_len;
         
-    dout(15) << "file_to_extents  " << *ex << " in " << ex->oloc << dendl;
-    //dout(0) << "map: ino " << ino << " oid " << ex.oid << " osd " << ex.osd << " offset " << ex.offset << " len " << ex.len << " ... left " << left << dendl;
+    ldout(cct, 15) << "file_to_extents  " << *ex << " in " << ex->oloc << dendl;
+    //ldout(cct, 0) << "map: ino " << ino << " oid " << ex.oid << " osd " << ex.osd << " offset " << ex.offset << " len " << ex.len << " ... left " << left << dendl;
     
     left -= x_len;
     cur += x_len;
index 63d24b850e65d068e3e12e688f2e5f5965c8e1c8..8eb819ed1815284b87df52ed37c383e9d9c32dbc 100644 (file)
@@ -40,6 +40,7 @@ class OSDMap;
 /**** Filer interface ***/
 
 class Filer {
+  CephContext *cct;
   Objecter   *objecter;
   
   // probes
@@ -85,7 +86,7 @@ class Filer {
   Filer(const Filer& other);
   const Filer operator=(const Filer& other);
 
-  Filer(Objecter *o) : objecter(o) {}
+  Filer(Objecter *o) : cct(o->cct), objecter(o) {}
   ~Filer() {}
 
   bool is_active() {
@@ -197,9 +198,9 @@ class Filer {
     } else {
       C_Gather *gack = 0, *gcom = 0;
       if (onack)
-       gack = new C_Gather(g_ceph_context, onack);
+       gack = new C_Gather(cct, onack);
       if (oncommit)
-       gcom = new C_Gather(g_ceph_context, oncommit);
+       gcom = new C_Gather(cct, oncommit);
       for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
        vector<OSDOp> ops(1);
        ops[0].op.op = CEPH_OSD_OP_TRIMTRUNC;
@@ -234,9 +235,9 @@ class Filer {
     } else {
       C_Gather *gack = 0, *gcom = 0;
       if (onack)
-       gack = new C_Gather(g_ceph_context, onack);
+       gack = new C_Gather(cct, onack);
       if (oncommit)
-       gcom = new C_Gather(g_ceph_context, oncommit);
+       gcom = new C_Gather(cct, oncommit);
       for (vector<ObjectExtent>::iterator p = extents.begin(); p != extents.end(); p++) {
        if (p->offset == 0 && p->length == layout->fl_object_size)
          objecter->remove(p->oid, p->oloc,
index d9e41ec59eb9a72354ec9d264a278c7df1f43b2e..a53290b2b692efc7a25fc0940fe41b0655feaf16 100644 (file)
  * 
  */
 
-#include "Journaler.h"
-
-#include "include/Context.h"
 #include "common/ProfLogger.h"
+#include "common/dout.h"
+#include "include/Context.h"
 #include "msg/Messenger.h"
-
-#include "common/config.h"
+#include "osdc/Journaler.h"
 
 #define DOUT_SUBSYS journaler
 #undef dout_prefix
 
 void Journaler::set_readonly()
 {
-  dout(1) << "set_readonly" << dendl;
+  ldout(cct, 1) << "set_readonly" << dendl;
   readonly = true;
 }
 
 void Journaler::set_writeable()
 {
-  dout(1) << "set_writeable" << dendl;
+  ldout(cct, 1) << "set_writeable" << dendl;
   readonly = false;
 }
 
 void Journaler::create(ceph_file_layout *l)
 {
   assert(!readonly);
-  dout(1) << "create blank journal" << dendl;
+  ldout(cct, 1) << "create blank journal" << dendl;
   state = STATE_ACTIVE;
 
   set_layout(l);
@@ -60,7 +58,7 @@ void Journaler::set_layout(ceph_file_layout *l)
 
   // prefetch intelligently.
   // (watch out, this is big if you use big objects or weird striping)
-  uint64_t periods = g_conf->journaler_prefetch_periods;
+  uint64_t periods = cct->_conf->journaler_prefetch_periods;
   if (periods < 2)
     periods = 2;  // we need at least 2 periods to make progress.
   fetch_len = layout.fl_stripe_count * layout.fl_object_size * periods;
@@ -123,7 +121,7 @@ public:
 
 void Journaler::recover(Context *onread) 
 {
-  dout(1) << "recover start" << dendl;
+  ldout(cct, 1) << "recover start" << dendl;
   assert(state != STATE_ACTIVE);
   assert(readonly);
 
@@ -131,11 +129,11 @@ void Journaler::recover(Context *onread)
     waitfor_recover.push_back(onread);
   
   if (state != STATE_UNDEF) {
-    dout(1) << "recover - already recoverying" << dendl;
+    ldout(cct, 1) << "recover - already recoverying" << dendl;
     return;
   }
 
-  dout(1) << "read_head" << dendl;
+  ldout(cct, 1) << "read_head" << dendl;
   state = STATE_READHEAD;
   C_ReadHead *fin = new C_ReadHead(this);
   read_head(fin, &fin->bl);
@@ -160,7 +158,7 @@ void Journaler::read_head(Context *on_finish, bufferlist *bl)
  */
 void Journaler::reread_head(Context *onfinish)
 {
-  dout(10) << "reread_head" << dendl;
+  ldout(cct, 10) << "reread_head" << dendl;
   assert(state == STATE_ACTIVE);
 
   state = STATE_REREADHEAD;
@@ -191,11 +189,11 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
   assert(state == STATE_READHEAD);
 
   if (bl.length() == 0) {
-    dout(1) << "_finish_read_head r=" << r << " read 0 bytes, assuming empty log" << dendl;    
+    ldout(cct, 1) << "_finish_read_head r=" << r << " read 0 bytes, assuming empty log" << dendl;    
     state = STATE_ACTIVE;
     list<Context*> ls;
     ls.swap(waitfor_recover);
-    finish_contexts(g_ceph_context, ls, 0);
+    finish_contexts(cct, ls, 0);
     return;
   } 
 
@@ -205,11 +203,11 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
   ::decode(h, p);
 
   if (h.magic != magic) {
-    dout(0) << "on disk magic '" << h.magic << "' != my magic '"
+    ldout(cct, 0) << "on disk magic '" << h.magic << "' != my magic '"
            << magic << "'" << dendl;
     list<Context*> ls;
     ls.swap(waitfor_recover);
-    finish_contexts(g_ceph_context, ls, -EINVAL);
+    finish_contexts(cct, ls, -EINVAL);
     return;
   }
 
@@ -220,7 +218,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
   init_headers(h);
   set_layout(&h.layout);
 
-  dout(1) << "_finish_read_head " << h << ".  probing for end of log (from " << write_pos << ")..." << dendl;
+  ldout(cct, 1) << "_finish_read_head " << h << ".  probing for end of log (from " << write_pos << ")..." << dendl;
   C_ProbeEnd *fin = new C_ProbeEnd(this);
   state = STATE_PROBING;
   probe(fin, &fin->end);
@@ -228,7 +226,7 @@ void Journaler::_finish_read_head(int r, bufferlist& bl)
 
 void Journaler::probe(Context *finish, uint64_t *end)
 {
-  dout(1) << "probing for end of the log" << dendl;
+  ldout(cct, 1) << "probing for end of the log" << dendl;
   assert(state == STATE_PROBING || state == STATE_REPROBING);
   // probe the log
   filer.probe(ino, &layout, CEPH_NOSNAP,
@@ -237,7 +235,7 @@ void Journaler::probe(Context *finish, uint64_t *end)
 
 void Journaler::reprobe(Context *finish)
 {
-  dout(10) << "reprobe" << dendl;
+  ldout(cct, 10) << "reprobe" << dendl;
   assert(state == STATE_ACTIVE);
 
   state = STATE_REPROBING;
@@ -249,7 +247,7 @@ void Journaler::reprobe(Context *finish)
 void Journaler::_finish_reprobe(int r, uint64_t new_end, Context *onfinish) {
   assert(new_end >= write_pos);
   assert(r >= 0);
-  dout(1) << "_finish_reprobe new_end = " << new_end 
+  ldout(cct, 1) << "_finish_reprobe new_end = " << new_end 
          << " (header had " << write_pos << ")."
          << dendl;
   prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = new_end;
@@ -264,14 +262,14 @@ void Journaler::_finish_probe_end(int r, uint64_t end)
   
   if (((int64_t)end) == -1) {
     end = write_pos;
-    dout(1) << "_finish_probe_end write_pos = " << end 
+    ldout(cct, 1) << "_finish_probe_end write_pos = " << end 
            << " (header had " << write_pos << "). log was empty. recovered."
            << dendl;
     assert(0); // hrm.
   } else {
     assert(end >= write_pos);
     assert(r >= 0);
-    dout(1) << "_finish_probe_end write_pos = " << end 
+    ldout(cct, 1) << "_finish_probe_end write_pos = " << end 
            << " (header had " << write_pos << "). recovered."
            << dendl;
   }
@@ -283,7 +281,7 @@ void Journaler::_finish_probe_end(int r, uint64_t end)
   // done.
   list<Context*> ls;
   ls.swap(waitfor_recover);
-  finish_contexts(g_ceph_context, ls, 0);
+  finish_contexts(cct, ls, 0);
 }
 
 class Journaler::C_RereadHeadProbe : public Context
@@ -332,9 +330,9 @@ void Journaler::write_head(Context *oncommit)
   last_written.expire_pos = expire_pos;
   last_written.unused_field = expire_pos;
   last_written.write_pos = safe_pos;
-  dout(10) << "write_head " << last_written << dendl;
+  ldout(cct, 10) << "write_head " << last_written << dendl;
   
-  last_wrote_head = ceph_clock_now(g_ceph_context);
+  last_wrote_head = ceph_clock_now(cct);
 
   bufferlist bl;
   ::encode(last_written, bl);
@@ -342,7 +340,7 @@ void Journaler::write_head(Context *oncommit)
   
   object_t oid = file_object_t(ino, 0);
   object_locator_t oloc(pg_pool);
-  objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(g_ceph_context), 0, 
+  objecter->write_full(oid, oloc, snapc, bl, ceph_clock_now(cct), 0, 
                       NULL, 
                       new C_WriteHead(this, last_written, oncommit));
 }
@@ -351,7 +349,7 @@ void Journaler::_finish_write_head(int r, Header &wrote, Context *oncommit)
 {
   assert(r >= 0); // we can't really recover from write errors here
   assert(!readonly);
-  dout(10) << "_finish_write_head " << wrote << dendl;
+  ldout(cct, 10) << "_finish_write_head " << wrote << dendl;
   last_committed = wrote;
   if (oncommit) {
     oncommit->finish(0);
@@ -385,7 +383,7 @@ void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp)
 
   // calc latency?
   if (logger) {
-    utime_t lat = ceph_clock_now(g_ceph_context);
+    utime_t lat = ceph_clock_now(cct);
     lat -= stamp;
     logger->favg(logger_key_lat, lat);
   }
@@ -398,7 +396,7 @@ void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp)
   else
     safe_pos = *pending_safe.begin();
 
-  dout(10) << "_finish_flush safe from " << start
+  ldout(cct, 10) << "_finish_flush safe from " << start
           << ", pending_safe " << pending_safe
           << ", (prezeroing/prezero)/write/flush/safe positions now "
           << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos
@@ -408,7 +406,7 @@ void Journaler::_finish_flush(int r, uint64_t start, utime_t stamp)
   while (!waitfor_safe.empty()) {
     if (waitfor_safe.begin()->first > safe_pos)
       break;
-    finish_contexts(g_ceph_context, waitfor_safe.begin()->second);
+    finish_contexts(cct, waitfor_safe.begin()->second);
     waitfor_safe.erase(waitfor_safe.begin());
   }
 }
@@ -419,7 +417,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
   assert(!readonly);
   uint32_t s = bl.length();
 
-  if (!g_conf->journaler_allow_split_entries) {
+  if (!cct->_conf->journaler_allow_split_entries) {
     // will we span a stripe boundary?
     int p = layout.fl_stripe_unit;
     if (write_pos / p != (write_pos + (int64_t)(bl.length() + sizeof(s))) / p) {
@@ -438,11 +436,11 @@ uint64_t Journaler::append_entry(bufferlist& bl)
       // now flush.
       flush();
       
-      dout(12) << "append_entry skipped " << (write_pos-owp) << " bytes to " << write_pos << " to avoid spanning stripe boundary" << dendl;
+      ldout(cct, 12) << "append_entry skipped " << (write_pos-owp) << " bytes to " << write_pos << " to avoid spanning stripe boundary" << dendl;
     }
   }
        
-  dout(10) << "append_entry len " << bl.length() << " to " << write_pos << "~" << (bl.length() + sizeof(uint32_t)) << dendl;
+  ldout(cct, 10) << "append_entry len " << bl.length() << " to " << write_pos << "~" << (bl.length() + sizeof(uint32_t)) << dendl;
   
   // append
   ::encode(s, write_buf);
@@ -455,7 +453,7 @@ uint64_t Journaler::append_entry(bufferlist& bl)
   uint64_t write_obj = write_pos / su;
   uint64_t flush_obj = flush_pos / su;
   if (write_obj != flush_obj) {
-    dout(10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl;
+    ldout(cct, 10) << " flushing completed object(s) (su " << su << " wro " << write_obj << " flo " << flush_obj << ")" << dendl;
     _do_flush(write_buf.length() - write_off);
   }
 
@@ -484,22 +482,22 @@ void Journaler::_do_flush(unsigned amount)
 
     int64_t newlen = prezero_pos - flush_pos - period;
     if (newlen <= 0) {
-      dout(10) << "_do_flush wanted to do " << flush_pos << "~" << len
+      ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len
               << " already too close to prezero_pos " << prezero_pos << ", zeroing first" << dendl;
       waiting_for_zero = true;
       return;
     }
     if (newlen < len) {
-      dout(10) << "_do_flush wanted to do " << flush_pos << "~" << len << " but hit prezero_pos " << prezero_pos
+      ldout(cct, 10) << "_do_flush wanted to do " << flush_pos << "~" << len << " but hit prezero_pos " << prezero_pos
               << ", will do " << flush_pos << "~" << newlen << dendl;
       len = newlen;
     }
   }
-  dout(10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
+  ldout(cct, 10) << "_do_flush flushing " << flush_pos << "~" << len << dendl;
   
   // submit write for anything pending
   // flush _start_ pos to _finish_flush
-  utime_t now = ceph_clock_now(g_ceph_context);
+  utime_t now = ceph_clock_now(cct);
   SnapContext snapc;
 
   Context *onsafe = new C_Flush(this, flush_pos, now);  // on COMMIT
@@ -515,14 +513,14 @@ void Journaler::_do_flush(unsigned amount)
   }
 
   filer.write(ino, &layout, snapc,
-             flush_pos, len, write_bl, ceph_clock_now(g_ceph_context),
+             flush_pos, len, write_bl, ceph_clock_now(cct),
              0,
              NULL, onsafe);
 
   flush_pos += len;
   assert(write_buf.length() == write_pos - flush_pos);
     
-  dout(10) << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at "
+  ldout(cct, 10) << "_do_flush (prezeroing/prezero)/write/flush/safe pointers now at "
           << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
 
   _issue_prezero();
@@ -537,7 +535,7 @@ void Journaler::wait_for_flush(Context *onsafe)
   // all flushed and safe?
   if (write_pos == safe_pos) {
     assert(write_buf.length() == 0);
-    dout(10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at " 
+    ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at " 
             << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
     if (onsafe) {
       onsafe->finish(0);
@@ -558,7 +556,7 @@ void Journaler::flush(Context *onsafe)
 
   if (write_pos == flush_pos) {
     assert(write_buf.length() == 0);
-    dout(10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at "
+    ldout(cct, 10) << "flush nothing to flush, (prezeroing/prezero)/write/flush/safe pointers at "
             << "(" << prezeroing_pos << "/" << prezero_pos << ")/" << write_pos << "/" << flush_pos << "/" << safe_pos << dendl;
     if (onsafe) {
       onsafe->finish(0);
@@ -567,15 +565,15 @@ void Journaler::flush(Context *onsafe)
   } else {
     if (1) {
       // maybe buffer
-      if (write_buf.length() < g_conf->journaler_batch_max) {
+      if (write_buf.length() < cct->_conf->journaler_batch_max) {
        // delay!  schedule an event.
-       dout(20) << "flush delaying flush" << dendl;
+       ldout(cct, 20) << "flush delaying flush" << dendl;
        if (delay_flush_event)
          timer->cancel_event(delay_flush_event);
        delay_flush_event = new C_DelayFlush(this);
-       timer->add_event_after(g_conf->journaler_batch_interval, delay_flush_event);    
+       timer->add_event_after(cct->_conf->journaler_batch_interval, delay_flush_event);        
       } else {
-       dout(20) << "flush not delaying flush" << dendl;
+       ldout(cct, 20) << "flush not delaying flush" << dendl;
        _do_flush();
       }
     } else {
@@ -586,7 +584,7 @@ void Journaler::flush(Context *onsafe)
   }
 
   // write head?
-  if (last_wrote_head.sec() + g_conf->journaler_write_head_interval < ceph_clock_now(g_ceph_context).sec()) {
+  if (last_wrote_head.sec() + cct->_conf->journaler_write_head_interval < ceph_clock_now(cct).sec()) {
     write_head();
   }
 }
@@ -609,7 +607,7 @@ void Journaler::_issue_prezero()
 
   // we need to zero at least two periods, minimum, to ensure that we have a full
   // empty object/period in front of us.
-  uint64_t num_periods = MAX(2, g_conf->journaler_prezero_periods);
+  uint64_t num_periods = MAX(2, cct->_conf->journaler_prezero_periods);
 
   /*
    * issue zero requests based on write_pos, even though the invariant
@@ -620,7 +618,7 @@ void Journaler::_issue_prezero()
   to -= to % period;
 
   if (prezeroing_pos >= to) {
-    dout(20) << "_issue_prezero target " << to << " <= prezeroing_pos " << prezeroing_pos << dendl;
+    ldout(cct, 20) << "_issue_prezero target " << to << " <= prezeroing_pos " << prezeroing_pos << dendl;
     return;
   }
 
@@ -628,21 +626,21 @@ void Journaler::_issue_prezero()
     uint64_t len;
     if (prezeroing_pos % period == 0) {
       len = period;
-      dout(10) << "_issue_prezero removing " << prezeroing_pos << "~" << period << " (full period)" << dendl;
+      ldout(cct, 10) << "_issue_prezero removing " << prezeroing_pos << "~" << period << " (full period)" << dendl;
     } else {
       len = period - (prezeroing_pos % period);
-      dout(10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl;
+      ldout(cct, 10) << "_issue_prezero zeroing " << prezeroing_pos << "~" << len << " (partial period)" << dendl;
     }
     SnapContext snapc;
     Context *c = new C_Journaler_Prezero(this, prezeroing_pos, len);
-    filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(g_ceph_context), 0, NULL, c);
+    filer.zero(ino, &layout, snapc, prezeroing_pos, len, ceph_clock_now(cct), 0, NULL, c);
     prezeroing_pos += len;
   }
 }
 
 void Journaler::_prezeroed(int r, uint64_t start, uint64_t len)
 {
-  dout(10) << "_prezeroed to " << start << "~" << len
+  ldout(cct, 10) << "_prezeroed to " << start << "~" << len
           << ", prezeroing/prezero was " << prezeroing_pos << "/" << prezero_pos
           << ", pending " << pending_zero
           << dendl;
@@ -664,7 +662,7 @@ void Journaler::_prezeroed(int r, uint64_t start, uint64_t len)
   } else {
     pending_zero.insert(start, len);
   }
-  dout(10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos << "/" << prezero_pos
+  ldout(cct, 10) << "_prezeroed prezeroing/prezero now " << prezeroing_pos << "/" << prezero_pos
           << ", pending " << pending_zero
           << dendl;
 }
@@ -698,7 +696,7 @@ public:
 void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl)
 {
   if (r < 0) {
-    dout(0) << "_finish_read got error " << r << dendl;
+    ldout(cct, 0) << "_finish_read got error " << r << dendl;
     error = r;
     if (on_readable) {
       Context *f = on_readable;
@@ -710,7 +708,7 @@ void Journaler::_finish_read(int r, uint64_t offset, bufferlist& bl)
   }
   assert(r>=0);
 
-  dout(10) << "_finish_read got " << offset << "~" << bl.length() << dendl;
+  ldout(cct, 10) << "_finish_read got " << offset << "~" << bl.length() << dendl;
   prefetch_buf[offset].swap(bl);
 
   _assimilate_prefetch();
@@ -726,12 +724,12 @@ void Journaler::_assimilate_prefetch()
     map<uint64_t,bufferlist>::iterator p = prefetch_buf.begin();
     if (p->first != received_pos) {
       uint64_t gap = p->first - received_pos;
-      dout(10) << "_assimilate_prefetch gap of " << gap << " from received_pos " << received_pos
+      ldout(cct, 10) << "_assimilate_prefetch gap of " << gap << " from received_pos " << received_pos
               << " to first prefetched buffer " << p->first << dendl;
       break;
     }
 
-    dout(10) << "_assimilate_prefetch " << p->first << "~" << p->second.length() << dendl;
+    ldout(cct, 10) << "_assimilate_prefetch " << p->first << "~" << p->second.length() << dendl;
     received_pos += p->second.length();
     read_buf.claim_append(p->second);
     assert(received_pos <= requested_pos);
@@ -740,14 +738,14 @@ void Journaler::_assimilate_prefetch()
   }
 
   if (got_any)
-    dout(10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length() 
+    ldout(cct, 10) << "_assimilate_prefetch read_buf now " << read_pos << "~" << read_buf.length() 
             << ", read pointers " << read_pos << "/" << received_pos << "/" << requested_pos
             << dendl;
 
   if ((got_any && !was_readable && _is_readable()) ||
       read_pos == write_pos) {
     // readable!
-    dout(10) << "_finish_read now readable (or at journal end)" << dendl;
+    ldout(cct, 10) << "_finish_read now readable (or at journal end)" << dendl;
     if (on_readable) {
       Context *f = on_readable;
       on_readable = 0;
@@ -766,7 +764,7 @@ void Journaler::_issue_read(uint64_t len)
   //  (this is needed if we are reading the tail of a journal we are also writing to)
   assert(requested_pos <= safe_pos);
   if (requested_pos == safe_pos) {
-    dout(10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl;
+    ldout(cct, 10) << "_issue_read requested_pos = safe_pos = " << safe_pos << ", waiting" << dendl;
     assert(write_pos > requested_pos);
     if (flush_pos == safe_pos)
       flush();
@@ -778,11 +776,11 @@ void Journaler::_issue_read(uint64_t len)
   // don't read too much
   if (requested_pos + len > safe_pos) {
     len = safe_pos - requested_pos;
-    dout(10) << "_issue_read reading only up to safe_pos " << safe_pos << dendl;
+    ldout(cct, 10) << "_issue_read reading only up to safe_pos " << safe_pos << dendl;
   }
 
   // go.
-  dout(10) << "_issue_read reading " << requested_pos << "~" << len 
+  ldout(cct, 10) << "_issue_read reading " << requested_pos << "~" << len 
           << ", read pointers " << read_pos << "/" << received_pos << "/" << (requested_pos+len)
           << dendl;
   
@@ -809,7 +807,7 @@ void Journaler::_prefetch()
   // prefetch
   uint64_t pf;
   if (temp_fetch_len) {
-    dout(10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl;
+    ldout(cct, 10) << "_prefetch temp_fetch_len " << temp_fetch_len << dendl;
     pf = temp_fetch_len;
     temp_fetch_len = 0;
   } else {
@@ -828,7 +826,7 @@ void Journaler::_prefetch()
 
   if (requested_pos < target) {
     uint64_t len = target - requested_pos;
-    dout(10) << "_prefetch " << pf << " requested_pos " << requested_pos << " < target " << target
+    ldout(cct, 10) << "_prefetch " << pf << " requested_pos " << requested_pos << " < target " << target
             << " (" << raw_target << "), prefetching " << len << dendl;
     _issue_read(len);
   }
@@ -858,7 +856,7 @@ bool Journaler::_is_readable()
 
   // partial fragment at the end?
   if (received_pos == write_pos) {
-    dout(10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << dendl;
+    ldout(cct, 10) << "is_readable() detected partial entry at tail, adjusting write_pos to " << read_pos << dendl;
 
     // adjust write_pos
     prezeroing_pos = prezero_pos = write_pos = flush_pos = safe_pos = read_pos;
@@ -875,7 +873,7 @@ bool Journaler::_is_readable()
 
   uint64_t need = (sizeof(s)+s-read_buf.length());
   if (need > fetch_len) {
-    dout(10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
+    ldout(cct, 10) << "_is_readable noting temp_fetch_len " << temp_fetch_len
             << " for len " << s << " entry" << dendl;
     temp_fetch_len = need;
   }
@@ -900,7 +898,7 @@ bool Journaler::is_readable()
 bool Journaler::try_read_entry(bufferlist& bl)
 {
   if (!is_readable()) {  // this may start a read. 
-    dout(10) << "try_read_entry at " << read_pos << " not readable" << dendl;
+    ldout(cct, 10) << "try_read_entry at " << read_pos << " not readable" << dendl;
     return false;
   }
   
@@ -911,11 +909,11 @@ bool Journaler::try_read_entry(bufferlist& bl)
   }
   assert(read_buf.length() >= sizeof(s) + s);
   
-  dout(10) << "try_read_entry at " << read_pos << " reading " 
+  ldout(cct, 10) << "try_read_entry at " << read_pos << " reading " 
           << read_pos << "~" << (sizeof(s)+s) << " (have " << read_buf.length() << ")" << dendl;
 
   if (s == 0) {
-    dout(0) << "try_read_entry got 0 len entry at offset " << read_pos << dendl;
+    ldout(cct, 0) << "try_read_entry got 0 len entry at offset " << read_pos << dendl;
     error = -EINVAL;
     return false;
   }
@@ -933,7 +931,7 @@ bool Journaler::try_read_entry(bufferlist& bl)
 
 void Journaler::wait_for_readable(Context *onreadable)
 {
-  dout(10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl;
+  ldout(cct, 10) << "wait_for_readable at " << read_pos << " onreadable " << onreadable << dendl;
   assert(!_is_readable());
   assert(on_readable == 0);
   on_readable = onreadable;
@@ -961,17 +959,17 @@ void Journaler::trim()
   uint64_t period = get_layout_period();
   uint64_t trim_to = last_committed.expire_pos;
   trim_to -= trim_to % period;
-  dout(10) << "trim last_commited head was " << last_committed
+  ldout(cct, 10) << "trim last_commited head was " << last_committed
           << ", can trim to " << trim_to
           << dendl;
   if (trim_to == 0 || trim_to == trimming_pos) {
-    dout(10) << "trim already trimmed/trimming to " 
+    ldout(cct, 10) << "trim already trimmed/trimming to " 
             << trimmed_pos << "/" << trimming_pos << dendl;
     return;
   }
 
   if (trimming_pos > trimmed_pos) {
-    dout(10) << "trim already trimming atm, try again later.  trimmed/trimming is " 
+    ldout(cct, 10) << "trim already trimming atm, try again later.  trimmed/trimming is " 
             << trimmed_pos << "/" << trimming_pos << dendl;
     return;
   }
@@ -980,7 +978,7 @@ void Journaler::trim()
   assert(trim_to <= write_pos);
   assert(trim_to <= expire_pos);
   assert(trim_to > trimming_pos);
-  dout(10) << "trim trimming to " << trim_to 
+  ldout(cct, 10) << "trim trimming to " << trim_to 
           << ", trimmed/trimming/expire are " 
           << trimmed_pos << "/" << trimming_pos << "/" << expire_pos
           << dendl;
@@ -989,7 +987,7 @@ void Journaler::trim()
   uint64_t first = trimming_pos / period;
   uint64_t num = (trim_to - trimming_pos) / period;
   SnapContext snapc;
-  filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(g_ceph_context), 0, 
+  filer.purge_range(ino, &layout, snapc, first, num, ceph_clock_now(cct), 0, 
                    new C_Trim(this, trim_to));
   trimming_pos = trim_to;  
 }
@@ -997,7 +995,7 @@ void Journaler::trim()
 void Journaler::_trim_finish(int r, uint64_t to)
 {
   assert(!readonly);
-  dout(10) << "_trim_finish trimmed_pos was " << trimmed_pos
+  ldout(cct, 10) << "_trim_finish trimmed_pos was " << trimmed_pos
           << ", trimmed/trimming/expire now "
           << to << "/" << trimming_pos << "/" << expire_pos
           << dendl;
@@ -1010,7 +1008,7 @@ void Journaler::_trim_finish(int r, uint64_t to)
   // finishers?
   while (!waitfor_trim.empty() &&
         waitfor_trim.begin()->first <= trimmed_pos) {
-    finish_contexts(g_ceph_context, waitfor_trim.begin()->second, 0);
+    finish_contexts(cct, waitfor_trim.begin()->second, 0);
     waitfor_trim.erase(waitfor_trim.begin());
   }
 }
index 4136ba6ec9a68635b7a2fa944d3043e3840787db..c601da3f0ffe67a4caf84b3e15408a582f0214c9 100644 (file)
 #include <list>
 #include <map>
 
+class CephContext;
 class Context;
 class ProfLogger;
 
 class Journaler {
-
 public:
+  CephContext *cct;
   // this goes at the head of the log "file".
   struct Header {
     uint64_t trimmed_pos;
@@ -229,7 +230,7 @@ private:
 
 public:
   Journaler(inodeno_t ino_, int pool, const char *mag, Objecter *obj, ProfLogger *l, int lkey, SafeTimer *tim) : 
-    last_written(mag), last_committed(mag),
+    cct(obj->cct), last_written(mag), last_committed(mag),
     ino(ino_), pg_pool(pool), readonly(true), magic(mag),
     objecter(obj), filer(objecter), logger(l), logger_key_lat(lkey),
     timer(tim), delay_flush_event(0),
index 07bc907af73e526d6cc6f22f3ac16462ebf995d2..418620fb4e220596b4b5c770004a717881fa7c2c 100644 (file)
@@ -259,10 +259,10 @@ class Objecter {
   Messenger *messenger;
   MonClient *monc;
   OSDMap    *osdmap;
+  CephContext *cct;
 
  
  private:
-  CephContext *cct;
   tid_t last_tid;
   int client_inc;
   uint64_t max_linger_id;