]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
objectcacher compiles. some more osd changes (still not done yet)
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 10 May 2006 22:29:20 +0000 (22:29 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Wed, 10 May 2006 22:29:20 +0000 (22:29 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@763 29311d96-e01e-0410-9327-a35deaab8ce9

24 files changed:
ceph/Makefile
ceph/TODO
ceph/client/Client.cc
ceph/common/Cond.h
ceph/config.cc
ceph/config.h
ceph/ebofs/Ebofs.cc
ceph/ebofs/Ebofs.h
ceph/ebofs/types.h
ceph/include/bufferlist.h
ceph/messages/MOSDPGLog.h [new file with mode: 0644]
ceph/messages/MOSDPGSummary.h
ceph/msg/Message.h
ceph/msg/Messenger.cc
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/PG.cc
ceph/osd/PG.h
ceph/osdc/Filer.cc
ceph/osdc/Filer.h
ceph/osdc/ObjectCacher.cc
ceph/osdc/ObjectCacher.h
ceph/osdc/Objecter.cc
ceph/osdc/Objecter.h

index 93cc6eb39fc1e0e142b496e886340a3d2c225c43..aedba67e073ee4204b95e1c42a02aa215e344fd8 100644 (file)
@@ -9,7 +9,7 @@
 # behave just fine...  change ${CC} back to mpicxx if you get paranoid.
 CC = g++
 CFLAGS = -g -Wall -I. -D_FILE_OFFSET_BITS=64 -DMPICH_IGNORE_CXX_SEEK -D_REENTRANT -D_THREAD_SAFE
-LIBS = -lpthread -lrt -ldb
+LIBS = -lpthread -lrt 
 
 #for normal mpich2 machines
 MPICC = mpicxx
@@ -65,8 +65,7 @@ COMMON_OBJS= \
 CLIENT_OBJS= \
        client/Client.o\
        client/SyntheticClient.o\
-       client/Trace.o\
-       client/Buffercache.o
+       client/Trace.o
 
 TCP_OBJS = \
        msg/TCPMessenger.o\
@@ -131,7 +130,7 @@ ebofs: mkfs.ebofs test.ebofs
 
 
 # libceph
-libceph.o: client/ldceph.o client/Client.o client/Buffercache.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS}
+libceph.o: client/ldceph.o client/Client.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS}
        ld -i $^ -o $@
 
 bench/mdtest/mdtest.o: bench/mdtest/mdtest.c
index fa939faa9cdbf7373a7ac487e2f84c6c634f9dbe..ef1ee5c85b2cd12c3a08b4270d110685da067abb 100644 (file)
--- a/ceph/TODO
+++ b/ceph/TODO
@@ -1,4 +1,6 @@
 
+
+
 - stability
  - ebofs table.remove() thing
  - fakestore crapping out.. missing timer events?
@@ -54,6 +56,7 @@ filer/osd
 
 
 ebofs
+- hey.. why do we drop on->commit_waiters on delete?  that's stupid!
 - fix NEAR_LAST_FWD   (?)
 - combine inodes into same blocks?
 - delay allocation
index cfa2accb7554f563b6eb7585a7137d903577b1fc..78091be392be064c3edb26d95384778af9f51e49 100644 (file)
@@ -78,7 +78,7 @@ Client::Client(Messenger *m)
   osdmap = new OSDMap();     // initially blank.. see mount()
   objecter = new Objecter(messenger, osdmap);
   objectcacher = new ObjectCacher(objecter);
-  filer = new Filer(objecter); //, objectcacher);
+  filer = new Filer(objecter, objectcacher);
 }
 
 
@@ -694,7 +694,7 @@ void Client::handle_file_caps(MClientFileCaps *m)
   in->inode = m->get_inode();      // might have updated size... FIXME this is overkill!
 
   // flush buffers?
-  if (g_conf.client_bcache &&
+  if (g_conf.client_oc &&
          !(in->file_caps() & CAP_FILE_WRBUFFER)) {
 
        // **** write me ****
@@ -1622,7 +1622,7 @@ int Client::close(fh_t fh)
        dout(10) << "  flushing dirty buffers on " << hex << in->ino() << dec << endl;
        
 
-       /*if (g_conf.client_bcache && 
+       /*if (g_conf.client_oc && 
                (fc->is_dirty() || fc->is_inflight())) {
          // flushing.
          dout(10) << "  waiting for inflight buffers on " << hex << in->ino() << dec << endl;
@@ -1648,51 +1648,8 @@ int Client::close(fh_t fh)
 // ------------
 // read, write
 
-// ------------------------
 // blocking osd interface
 
-class C_Client_Cond : public Context {
-public:
-  Cond *cond;
-  bool *done;
-  int *rvalue;
-  C_Client_Cond(Cond *cond, bool *d, int *rvalue) {
-       this->done = d;
-    this->cond = cond;
-    this->rvalue = rvalue;
-    *done = false;
-  }
-  void finish(int r) {
-    *rvalue = r;
-       *done = true;
-    cond->Signal();
-  }
-};
-
-
-class C_Client_LockedCond : public Context {
-public:
-  bool *done;
-  Cond *cond;
-  Mutex *mutex;
-  int *rvalue;
-  C_Client_LockedCond(Cond *cond, Mutex *mutex, bool *d, int *rvalue) {
-       this->done = d;
-    this->cond = cond;
-    this->mutex = mutex;
-    this->rvalue = rvalue;
-    *done = false;
-  }
-  void finish(int r) {
-    mutex->Lock();
-    *rvalue = r;
-       *done = true;
-    cond->Signal();
-    mutex->Unlock();
-  }
-};
-
-
 int Client::read(fh_t fh, char *buf, off_t size, off_t offset) 
 {
   client_lock.Lock();
@@ -1747,13 +1704,13 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
   // adjust fd pos
   f->pos = offset+size;
 
-  if (!g_conf.client_bcache) {
+  if (!g_conf.client_oc) {
        // buffer cache OFF
     Cond cond;
     bufferlist blist;   // data will go here
        
        bool done = false;
-    C_Client_Cond *onfinish = new C_Client_Cond(&cond, &done, &rvalue);
+    C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
     filer->read(in->inode, size, offset, &blist, onfinish);
        while (!done)
          cond.Wait(client_lock);
@@ -1832,58 +1789,62 @@ int Client::write(fh_t fh, const char *buf, off_t size, off_t offset)
   // adjust fd pos
   f->pos = offset+size;
 
-  if (g_conf.client_bcache &&          // buffer cache ON?
-         (in->file_caps() & CAP_FILE_WRBUFFER)) {   // caps buffered write?
+  // time it.
+  utime_t start = g_clock.now();
        
-       // ***
+  if (g_conf.client_oc) { // buffer cache ON?
+       assert(objectcacher);
+
+       bufferlist blist;
+       blist.push_back( new buffer(buf, size) );
 
+       // wait?  (this may block!)
+       objectcacher->wait_for_write(size, client_lock);
+       
+       if (in->file_caps() & CAP_FILE_WRBUFFER) {   // caps buffered write?
+         // async, caching, non-blocking.
+         filer->caching_write(in->inode, size, offset, blist);
+       } else {
+         // atomic, synchronous, blocking.
+         filer->atomic_sync_write(in->inode, size, offset, blist, client_lock);          
+       }
   } else {
-       // synchronous write
-    // FIXME: do not bypass buffercache
-       //if (g_conf.client_bcache) {
-         // write me
-       //} else
-       {
-         dout(7) << "synchronous write" << endl;
-         
-         // create a buffer that refers to *buf, but doesn't try to free it when it's done.
-         bufferlist blist;
-         blist.push_back( new buffer(buf, size, BUFFER_MODE_NOCOPY|BUFFER_MODE_NOFREE) );
-         
-         // issue write
-         Cond cond;
-         int rvalue = 0;
+       // legacy, inconsistent synchronous write.
+       dout(7) << "synchronous write" << endl;
          
-         utime_t start = g_clock.now();
-
-         bool done = false;
-         C_Client_Cond *onfinish = new C_Client_Cond(&cond, &done, &rvalue);
-         C_Client_HackUnsafe *onsafe = new C_Client_HackUnsafe(this);
-         unsafe_sync_write++;
-
-         dout(20) << " sync write start " << onfinish << endl;
-
-         filer->write(in->inode, size, offset, blist, 0, 
-                                  //NULL,NULL);  // no wait hack
-                                  onfinish, onsafe);
+       // create a buffer that refers to *buf, but doesn't try to free it when it's done.
+       bufferlist blist;
+       blist.push_back( new buffer(buf, size, BUFFER_MODE_NOCOPY|BUFFER_MODE_NOFREE) );
          
-         while (!done) {
-               cond.Wait(client_lock);
-               dout(20) << " sync write bump " << onfinish << endl;
-         }
-
-         // time
-         utime_t lat = g_clock.now();
-         lat -= start;
-         if (client_logger) {
-               client_logger->finc("wrlsum",(double)lat);
-               client_logger->inc("wrlnum");
-         }
-
-         dout(20) << " sync write done " << onfinish << endl;
+       // issue write
+       Cond cond;
+       int rvalue = 0;
+       
+       bool done = false;
+       C_Cond *onfinish = new C_Cond(&cond, &done, &rvalue);
+       C_Client_HackUnsafe *onsafe = new C_Client_HackUnsafe(this);
+       unsafe_sync_write++;
+       
+       dout(20) << " sync write start " << onfinish << endl;
+       
+       filer->write(in->inode, size, offset, blist, 0, 
+                                onfinish, onsafe);
+       
+       while (!done) {
+         cond.Wait(client_lock);
+         dout(20) << " sync write bump " << onfinish << endl;
        }
+       dout(20) << " sync write done " << onfinish << endl;
   }
 
+  // time
+  utime_t lat = g_clock.now();
+  lat -= start;
+  if (client_logger) {
+       client_logger->finc("wrlsum",(double)lat);
+       client_logger->inc("wrlnum");
+  }
+       
   // assume success for now.  FIXME.
   off_t totalwritten = size;
   
index 3661878ac7494dfedcfe182351cd882d8e743f5c..91021b5cc0ded1355586660aa90d463301630f05 100644 (file)
@@ -21,6 +21,8 @@
 #include "Mutex.h"
 #include "Clock.h"
 
+#include "include/Context.h"
+
 #include <pthread.h>
 #include <cassert>
 
@@ -90,4 +92,37 @@ class Cond
   }
 };
 
+class C_Cond : public Context {
+  Cond *cond;
+  bool *done;
+  int *rval;
+public:
+  C_Cond(Cond *c, bool *d, int *r=0) : cond(c), done(d), rval(r) {
+       *done = false;
+  }
+  void finish(int r) {
+       if (rval) *rval = r;
+       *done = true;
+       cond->Signal();
+  }
+};
+
+class C_SafeCond : public Context {
+  Mutex *lock;
+  Cond *cond;
+  bool *done;
+  int *rval;
+public:
+  C_SafeCond(Mutex *l, Cond *c, bool *d, int *r=0) : lock(l), cond(c), done(d), rval(r) {
+       *done = false;
+  }
+  void finish(int r) {
+       lock->Lock();
+       if (rval) *rval = r;
+       *done = false;
+       cond->Signal();
+       lock->Unlock();
+  }
+};
+
 #endif // !_Cond_Posix_
index 1b48c19167b776aca61460a134d1c54928ed6703..71587b091d7b9fca85624575fc9b4f7a9c5003a7 100644 (file)
@@ -92,6 +92,10 @@ md_config_t g_conf = {
 
   client_sync_writes: 0,
 
+  client_oc: false,
+  client_oc_max_dirty: 1024*1024* 100, 
+
+  /*
   client_bcache: 0,
   client_bcache_alloc_minsize: 1<<10, // 1KB
   client_bcache_alloc_maxsize: 1<<18, // 256KB
@@ -101,6 +105,7 @@ md_config_t g_conf = {
   client_bcache_lowater: 60, // % of size
   client_bcache_hiwater: 80, // % of size
   client_bcache_align: 1<<10, // 1KB splice alignment
+  */
 
   client_trace: 0,
   fuse_direct_io: 0,
@@ -461,10 +466,10 @@ void parse_config_options(vector<char*>& args)
 
        else if (strcmp(args[i], "--client_sync_writes") == 0)
          g_conf.client_sync_writes = atoi(args[++i]);
-       else if (strcmp(args[i], "--client_bcache") == 0)
-         g_conf.client_bcache = atoi(args[++i]);
-       else if (strcmp(args[i], "--client_bcache_ttl") == 0)
-         g_conf.client_bcache_ttl = atoi(args[++i]);
+       else if (strcmp(args[i], "--client_oc") == 0)
+         g_conf.client_oc = atoi(args[++i]);
+       else if (strcmp(args[i], "--client_oc_max_dirty") == 0)
+         g_conf.client_oc_max_dirty = atoi(args[++i]);
 
 
        else if (strcmp(args[i], "--ebofs") == 0) 
index 2c2f0c8401cbc1b28f9849ee41660cab58405933..486669e9c81b74df322e28a7984ec697cbf4aad7 100644 (file)
@@ -63,6 +63,11 @@ struct md_config_t {
   bool     client_use_random_mds;          // debug flag
 
   bool     client_sync_writes;
+
+  bool     client_oc;
+  int      client_oc_max_dirty;
+
+  /*
   bool     client_bcache;
   int      client_bcache_alloc_minsize;
   int      client_bcache_alloc_maxsize;
@@ -71,6 +76,7 @@ struct md_config_t {
   int      client_bcache_lowater;
   int      client_bcache_hiwater;
   size_t   client_bcache_align;
+  */
 
   int      client_trace;
   int      fuse_direct_io;
index 6c9bca9e8593df775f469c1e55dfaee4d7dbceea..7c55299cf1096d9c787ee01bc7d158b8a72d91a3 100644 (file)
@@ -1553,7 +1553,8 @@ void Ebofs::apply_write(Onode *on, size_t len, off_t off, bufferlist& bl)
 
 // *** file i/o ***
 
-bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond *will_wait_on)
+bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, 
+                                                Cond *will_wait_on, bool *will_wait_on_bool)
 {
   dout(10) << "attempt_read " << *on << " len " << len << " off " << off << endl;
   ObjectCache *oc = on->get_oc(&bc);
@@ -1579,7 +1580,7 @@ bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond
        }
        BufferHead *wait_on = missing.begin()->second;
        block_t b = MAX(wait_on->start(), bstart);
-       wait_on->waitfor_read[b].push_back(new C_Cond(will_wait_on));
+       wait_on->waitfor_read[b].push_back(new C_Cond(will_wait_on, will_wait_on_bool));
        return false;
   }
   
@@ -1594,7 +1595,7 @@ bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond
        if (!i->second->have_partial_range(start, end)) {
          if (partials_ok) {
                // wait on this one
-               Context *c = new C_Cond(will_wait_on);
+               Context *c = new C_Cond(will_wait_on, will_wait_on_bool);
                dout(10) << "attempt_read insufficient partial buffer " << *(i->second) << " c " << c << endl;
                i->second->waitfor_read[i->second->start()].push_back(c);
          }
@@ -1606,7 +1607,7 @@ bool Ebofs::attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond
   // wait on rx?
   if (!rx.empty()) {
        BufferHead *wait_on = rx.begin()->second;
-       Context *c = new C_Cond(will_wait_on);
+       Context *c = new C_Cond(will_wait_on, will_wait_on_bool);
        dout(1) << "attempt_read waiting for read to finish on " << *wait_on << " c " << c << endl;
        block_t b = MAX(wait_on->start(), bstart);
        wait_on->waitfor_read[b].push_back(c);
@@ -1695,11 +1696,13 @@ int Ebofs::read(object_t oid,
 
        size_t will_read = MIN(off+len, on->object_size) - off;
        
-       if (attempt_read(on, will_read, off, bl, &cond))
+       bool done;
+       if (attempt_read(on, will_read, off, bl, &cond, &done))
          break;  // yay
        
        // wait
-       cond.Wait(ebofs_lock);
+       while (!done) 
+         cond.Wait(ebofs_lock);
 
        if (on->deleted) {
          dout(7) << "read " << hex << oid << dec << " len " << len << " off " << off << " ... object deleted" << endl;
@@ -1742,14 +1745,16 @@ int Ebofs::write(object_t oid,
   if (fsync) {
        // wait for flush.
        Cond cond;
+       bool done;
        int flush = 1;    // write never returns positive
-       Context *c = new C_Cond(&cond, &flush);
+       Context *c = new C_Cond(&cond, &done, &flush);
        int r = write(oid, len, off, bl, c);
        if (r < 0) return r;
        
        ebofs_lock.Lock();
-       if (flush == 1) { // write never returns positive
-         cond.Wait(ebofs_lock);
+       {
+         while (!done) 
+               cond.Wait(ebofs_lock);
          assert(flush <= 0);
        }
        ebofs_lock.Unlock();
index 82359ecda988c475b494bebe1f0a24ead53160d2..683cdceb39da929eea90658ca12e51ac9e784d59 100644 (file)
@@ -177,7 +177,8 @@ class Ebofs : public ObjectStore {
                                   interval_set<block_t>& alloc,
                                   block_t& old_bfirst, block_t& old_blast);
   void apply_write(Onode *on, size_t len, off_t off, bufferlist& bl);
-  bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, Cond *will_wait_on);
+  bool attempt_read(Onode *on, size_t len, off_t off, bufferlist& bl, 
+                                       Cond *will_wait_on, bool *will_wait_on_bool);
 
   // ** finisher **
   // async write notification to users
index 7028e51660c3d710750a87b1bb6bd45e758b656f..18654d13ac173b4f55385bc9e3e71ce3263dbea2 100644 (file)
@@ -36,32 +36,6 @@ using namespace __gnu_cxx;
 #endif
 
 
-class C_Cond : public Context {
-  Cond *cond;
-  int *rval;
-public:
-  C_Cond(Cond *c, int *r=0) : cond(c), rval(r) {}
-  void finish(int r) {
-       if (rval) *rval = r;
-       //cout << "C_Cond signal " << this << " cond " << (void*)cond << " rval " << (void*)rval << " r " << r  << endl;
-       cond->Signal();
-  }
-};
-
-class C_SafeCond : public Context {
-  Mutex *lock;
-  Cond *cond;
-  int *rval;
-public:
-  C_SafeCond(Mutex *l, Cond *c, int *r=0) : lock(l), cond(c), rval(r) {}
-  void finish(int r) {
-       if (rval) *rval = r;
-       lock->Lock();
-       //cout << "C_Cond signal " << this << " cond " << (void*)cond << " rval " << (void*)rval << " r " << r  << endl;
-       cond->Signal();
-       lock->Unlock();
-  }
-};
 
 
 /*
index d2d47458a7d03b28ab3b9187a85af0c236dce3ed..6d4935a064d4b412ccd09eaa4dfac7ebac277281 100644 (file)
@@ -509,11 +509,11 @@ inline void _decode(vector<T>& s, bufferlist& bl, int& off)
 
 // list<T>
 template<class T>
-inline void _encode(list<T>& s, bufferlist& bl)
+inline void _encode(const list<T>& s, bufferlist& bl)
 {
   int n = s.size();
   bl.append((char*)&n, sizeof(n));
-  for (typename list<T>::iterator it = s.begin();
+  for (typename list<T>::const_iterator it = s.begin();
           it != s.end();
           it++) {
        T v = *it;
@@ -540,11 +540,11 @@ inline void _decode(list<T>& s, bufferlist& bl, int& off)
 
 // map<T,U>
 template<class T, class U>
-inline void _encode(map<T, U>& s, bufferlist& bl)
+inline void _encode(const map<T, U>& s, bufferlist& bl)
 {
   int n = s.size();
   bl.append((char*)&n, sizeof(n));
-  for (typename map<T, U>::iterator it = s.begin();
+  for (typename map<T, U>::const_iterator it = s.begin();
           it != s.end();
           it++) {
        T k = it->first;
diff --git a/ceph/messages/MOSDPGLog.h b/ceph/messages/MOSDPGLog.h
new file mode 100644 (file)
index 0000000..9b0a691
--- /dev/null
@@ -0,0 +1,58 @@
+// -*- mode:C++; tab-width:4; c-basic-offset:2; indent-tabs-mode:t -*- 
+/*
+ * Ceph - scalable distributed file system
+ *
+ * Copyright (C) 2004-2006 Sage Weil <sage@newdream.net>
+ *
+ * This is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License version 2.1, as published by the Free Software 
+ * Foundation.  See file COPYING.
+ * 
+ */
+
+
+#ifndef __MOSDPGLOG_H
+#define __MOSDPGLOG_H
+
+#include "msg/Message.h"
+
+class MOSDPGLog : public Message {
+  epoch_t epoch;
+  pg_t    pgid;
+
+public:
+  PG::PGLog log;
+  PG::PGInfo info;
+
+  epoch_t get_epoch() { return epoch; }
+  pg_t get_pgid() { return pgid; }
+
+  MOSDPGLog() {}
+  MOSDPGLog(version_t mv, pg_t pgid) :
+       Message(MSG_OSD_PG_LOG) {
+       this->epoch = mv;
+       this->pgid = pgid;
+  }
+
+  char *get_type_name() { return "PGlog"; }
+
+  void encode_payload() {
+       payload.append((char*)&epoch, sizeof(epoch));
+       payload.append((char*)&pgid, sizeof(pgid));
+       payload.append((char*)&info, sizeof(info));
+       log._encode(payload);
+  }
+  void decode_payload() {
+       int off = 0;
+       payload.copy(off, sizeof(epoch), (char*)&epoch);
+       off += sizeof(epoch);
+       payload.copy(off, sizeof(pgid), (char*)&pgid);
+       off += sizeof(pgid);
+       payload.copy(off, sizeof(info), (char*)&info);
+       off += sizeof(info);
+       log._decode(payload, off);
+  }
+};
+
+#endif
index 6fe4ae2c3742616c38256bae80e819c1d29cd11a..191ad9a74d603f9cef21fa7d057c49325db0e8f0 100644 (file)
@@ -26,19 +26,16 @@ public:
   epoch_t get_epoch() { return epoch; }
   
   MOSDPGSummary() {}
-  MOSDPGSummary(version_t mv, pg_t pgid, PG::PGContentSummary *sum) :
+  MOSDPGSummary(version_t mv, pg_t pgid, PG::PGSummary &summary) :
        Message(MSG_OSD_PG_SUMMARY) {
        this->epoch = mv;
        this->pgid = pgid;
-       sum->_encode(sumbl);
+       summary._encode(sumbl);
   }
 
   pg_t get_pgid() { return pgid; }
-  PG::PGContentSummary *get_summary() {
-       PG::PGContentSummary *sum = new PG::PGContentSummary;
-       int off = 0;
-       sum->_decode(sumbl,off);
-       return sum;
+  bufferlist& get_summary_bl() {
+       return sumbl;
   }
   
   char *get_type_name() { return "PGsum"; }
index cff10e2f127d2c2d8f1f332262bbcd67652a8949..e0640ad3e9ad4cfc5b336288946a37a039be956c 100644 (file)
@@ -47,6 +47,7 @@
 #define MSG_OSD_PG_NOTIFY      50
 #define MSG_OSD_PG_QUERY        51
 #define MSG_OSD_PG_SUMMARY     52
+#define MSG_OSD_PG_LOG     53
 
 #define MSG_OSD_PG_UPDATE      57
 #define MSG_OSD_PG_REMOVE      58
index 09de05750b209a1e94ba1ad02592fd6778052908..e39063064c3a32ef5363f787112c2d9c6cce9ee7 100644 (file)
@@ -44,6 +44,7 @@ using namespace std;
 #include "messages/MOSDMap.h"
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGQuery.h"
+#include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGSummary.h"
 //#include "messages/MOSDPGUpdate.h"
 
@@ -323,6 +324,9 @@ decode_message(msg_envelope_t& env, bufferlist& payload)
   case MSG_OSD_PG_QUERY:
        m = new MOSDPGQuery();
        break;
+  case MSG_OSD_PG_LOG:
+       m = new MOSDPGLog();
+       break;
   case MSG_OSD_PG_SUMMARY:
        m = new MOSDPGSummary();
        break;
index 8a1094a69e67a300f1a125ccd8abe7d11fa40979..78b24937ebe50ec5098402b4b097e55066c5178e 100644 (file)
@@ -45,6 +45,7 @@
 #include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGQuery.h"
 #include "messages/MOSDPGSummary.h"
+#include "messages/MOSDPGLog.h"
 
 #include "common/Logger.h"
 #include "common/LogType.h"
@@ -632,7 +633,7 @@ void OSD::update_map(bufferlist& state)
                pg->last_epoch_started_any = osdmap->get_epoch();
                pg->mark_complete();
                pg->mark_active();
-                 
+               
                dout(7) << "created " << *pg << endl;
                pg_list.push_back(pgid);
          }
@@ -724,16 +725,18 @@ void OSD::advance_map(list<pg_t>& ls)
                pg->waiting_for_missing_object.clear();
 
                // drop peers
-               pg->drop_peers();
+               pg->clear_content_recovery_state();
                pg->state_clear(PG::STATE_CLEAN);
          }
          
          // new primary?
          if (role == 0) {
+               // i am new primary
                pg->state_clear(PG::STATE_ACTIVE);
          } else {
-               // we need to announce
-               pg->state_set(PG::STATE_ACTIVE);
+               // i am now replica|stray.  we need to send a notify.
+               pg->state_clear(PG::STATE_ACTIVE);
+               pg->state_set(PG::STATE_STRAY);
 
                if (nrep == 0) 
                  dout(1) << "crashed pg " << *pg << endl;
@@ -769,11 +772,9 @@ void OSD::advance_map(list<pg_t>& ls)
        for (set<int>::const_iterator down = osdmap->get_down_osds().begin();
                 down != osdmap->get_down_osds().end();
                 down++) {
-         PG::PGPeer *pgp = pg->get_peer(*down);
-         if (!pgp) continue;
+         if (!pg->is_acting(*down)) continue;
          
-         dout(10) << " " << *pg << " peer osd" << *down << " is down, removing" << endl;
-         pg->remove_peer(*down);
+         dout(10) << " " << *pg << " peer osd" << *down << " is down" << endl;
          
          // NAK any ops to the down osd
          if (replica_pg_osd_tids[pgid].count(*down)) {
@@ -805,7 +806,8 @@ void OSD::activate_map(list<pg_t>& ls)
 
        if (pg->get_role() == 0) {
          // i am primary
-         repeer(pg, query_map);
+         pg->build_prior();
+         pg->peer(query_map);
        }
        else if (pg->is_stray()) {
          // i am residual|replica
@@ -1003,7 +1005,7 @@ void OSD::do_queries(map< int, map<pg_t,version_t> >& query_map)
        int who = pit->first;
        dout(7) << "do_queries querying osd" << who
                        << " on " << pit->second.size() << " PGs" << endl;
-       
+
        MOSDPGQuery *m = new MOSDPGQuery(osdmap->get_epoch(),
                                                                         pit->second);
        messenger->send_message(m,
@@ -1012,163 +1014,6 @@ void OSD::do_queries(map< int, map<pg_t,version_t> >& query_map)
 }
 
 
-/** repeer()
- * primary: check, query whatever replicas i need to.
- */
-void OSD::repeer(PG *pg, map< int, map<pg_t,version_t> >& query_map) 
-{
-  dout(10) << "repeer " << *pg << endl;
-
-  // determine initial peer set
-  map<int,int> peerset;  // peer -> role
-  
-  // prior map(s), if OSDs are still up
-  for (version_t epoch = pg->last_epoch_started_any;
-          epoch < osdmap->get_epoch();
-          epoch++) {
-       OSDMap *omap = get_osd_map(epoch);
-       assert(omap);
-       
-       vector<int> acting;
-       omap->pg_to_acting_osds(pg->get_pgid(), acting);
-       
-       for (unsigned i=0; i<acting.size(); i++) 
-         if (osdmap->is_up(acting[i]))
-               peerset[acting[i]] = -1;
-  }
-  
-  // current map
-  for (unsigned i=1; i<pg->acting.size(); i++)
-       peerset[pg->acting[i]] = i>0 ? 1:0;
-
-
-  // -- query info from everyone.
-  bool haveallinfo = true;
-  for (map<int,int>::iterator it = peerset.begin();
-          it != peerset.end();
-          it++) {
-       int who = it->first;
-       int role = it->second;
-       if (who == whoami) continue;      // nevermind me
-
-       PG::PGPeer *pgp = pg->get_peer(who);
-       if (pgp && pgp->have_info()) {
-         dout(10) << *pg << " have info from osd" << who << " role " << role << endl;
-         continue;
-       } 
-       if (pgp && pgp->state_test(PG::PGPeer::STATE_QINFO)) {
-         dout(10) << *pg << " waiting for osd" << who << " role " << role << endl;
-       } else {
-         dout(10) << *pg << " querying info from osd" << who << " role " << role << endl;
-         query_map[who][pg->get_pgid()] = 0;
-       }
-       haveallinfo = false;
-  }
-  if (!haveallinfo) return;
-  
-
-  // -- ok, we have all info.  who has latest PG content summary?
-  version_t newest_update = pg->info.last_update;
-  int       newest_update_osd = whoami;
-  version_t oldest_update = pg->info.last_update;
-  PG::PGPeer   *newest_update_peer = 0;
-  
-  for (map<int,PG::PGPeer*>::iterator it = pg->peers.begin();
-          it != pg->peers.end();
-          it++) {
-       PG::PGPeer *pgp = it->second;
-       assert(pgp->have_info());
-       
-       if (pgp->info.last_update > newest_update) {
-         newest_update = pgp->info.last_update;
-         newest_update_osd = it->first;
-         newest_update_peer = pgp;
-       }
-       if (pgp->get_role() == 1 &&
-               pgp->info.last_update < oldest_update) 
-         oldest_update = pgp->info.last_update;
-  }
-
-  if (newest_update_peer) {
-       // get contents from newest.
-       assert(!newest_update_peer->have_summary());
-       
-       dout(10) << *pg << " newest PG on osd" << newest_update_osd
-                        << " v " << newest_update 
-                        << ", querying summary"
-                        << endl;
-       query_map[newest_update_osd][pg->get_pgid()] = 1;
-       return;
-  } else {
-       dout(10) << *pg << " i have the latest: " << pg->info.last_update << endl;
-  }
-
-  // -- find pg contents?
-  if (pg->info.last_complete < pg->info.last_update) {
-       if (pg->content_summary->missing > 0) {
-         // search!
-         dout(10) << *pg << " searching for PG contents, querying all peers" << endl;
-         bool didquery = false;
-         for (map<int,PG::PGPeer*>::iterator it = pg->peers.begin();
-                  it != pg->peers.end();
-                  it++) {
-               PG::PGPeer *pgp = it->second;
-               if (pgp->have_summary()) continue;
-               query_map[it->first][pg->get_pgid()] = 1;
-               didquery = true;
-         }
-         
-         if (didquery) return;
-       } else {
-         dout(10) << *pg << " i have located all objects" << endl;
-       }
-  } else {
-       dout(10) << *pg << " i have all objects" << endl;
-  }
-
-
-  // -- distribute summary?
-  
-  // does anyone need it?
-  //if (oldest_update < pg->info.last_update) {
-  
-  // generate summary?
-  if (pg->content_summary == 0) 
-       pg->generate_content_summary();
-  
-  // distribute summary!
-  for (map<int,PG::PGPeer*>::iterator it = pg->peers.begin();
-          it != pg->peers.end();
-          it++) {
-       PG::PGPeer *pgp = it->second;
-       if (pgp->get_role() != 1) continue;
-       
-       pgp->state_clear(PG::PGPeer::STATE_WAITING);
-       pgp->state_set(PG::PGPeer::STATE_ACTIVE);
-       
-       //if (pgp->info.last_update < pg->info.last_update) {
-       dout(10) << *pg << " sending summary to osd" << it->first << endl;
-       MOSDPGSummary *m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary);
-       messenger->send_message(m, MSG_ADDR_OSD(it->first));
-       //}
-  }
-  //} else {
-  //dout(10) << *pg << " nobody needs the summary" << endl;
-  //}
-  
-  // plan my own recovery
-  pg->plan_recovery();
-  
-  // i am active!
-  pg->state_set(PG::STATE_ACTIVE);
-
-  take_waiters(pg->waiting_for_active);
-
-}
-
-
-
 /** PGNotify
  * from non-primary to primary
  * includes PGInfo.
@@ -1198,32 +1043,28 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
          assert(pg->acting[0] == whoami);
          pg->info.same_primary_since = it->same_primary_since;
          pg->set_role(0);
-         
-         dout(10) << " " << *pg << " is new, nrep=" << nrep << endl;     
 
-         // start peers
-         repeer(pg, query_map);
+         pg->last_epoch_started_any = it->last_epoch_started;
+         pg->build_prior();
 
+         dout(10) << " " << *pg << " is new" << endl;
+       
          // kick any waiters
          if (waiting_for_pg.count(pgid)) {
                take_waiters(waiting_for_pg[pgid]);
                waiting_for_pg.erase(pgid);
          }
-       } else {
-         // already had pg.
+       }
 
-         // peered with this guy specifically?
-         PG::PGPeer *pgp = pg->get_peer(from);
-         if (!pgp) {
-               int role = osdmap->get_pg_role(pg->get_pgid(), from);
-               pgp = pg->new_peer(from, role);
-         }
+       // save info.
+       pg->peer_info[from] = *it;
 
-         pgp->info = *it;
-         pgp->state_set(PG::PGPeer::STATE_INFO);
+       // adjust prior?
+       if (it->last_epoch_started > pg->last_epoch_started_any) 
+         pg->adjust_prior();
 
-         repeer(pg, query_map);
-       }
+       // peer
+       pg->peer(query_map);
   }
   
   do_queries(query_map);
@@ -1231,6 +1072,12 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
   delete m;
 }
 
+void OSD::handle_pg_log(MOSDPGLog *m) 
+{
+
+
+}
+
 
 /** PGQuery
  * from primary to replica | other
@@ -1274,31 +1121,37 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
          dout(10) << *pg << " dne (before), but i am role " << role << endl;
        }
 
-       if (it->second) {
+       if (it->second == 0) {
+         // info
+         dout(10) << *pg << " sending info" << endl;
+         notify_list[from].push_back(pg->info);
+       } else if (it->second == 1) {
          // summary
-         MOSDPGSummary *m;
-         if (pg->content_summary == 0) {
-               pg->generate_content_summary();
-               m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary);
-               delete pg->content_summary;
-               pg->content_summary = 0;
-         } else {
-               m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), pg->content_summary);
-         }
+         dout(10) << *pg << " sending content summary" << endl;
+         PG::PGSummary summary;
+         pg->generate_summary(summary);
+         MOSDPGSummary *m = new MOSDPGSummary(osdmap->get_epoch(), pg->get_pgid(), summary);
          messenger->send_message(m, MSG_ADDR_OSD(from));
        } else {
-         // notify
-         notify_list[from].push_back(pg->info);
+         // log + info
+         dout(10) << *pg << " sending info+log since " << it->second << endl;
+         MOSDPGLog *m = new MOSDPGLog(osdmap->get_epoch(), pg->get_pgid());
+         m->info = pg->info;
+         m->log.copy_after(pg->log, it->second);
+         messenger->send_message(m, MSG_ADDR_OSD(from));
        }       
   }
   
-  do_notifies(notify_list);
+  do_notifies(notify_list);   
 
   delete m;
 }
 
+
+
 void OSD::handle_pg_summary(MOSDPGSummary *m)
 {
+  /*
   dout(7) << "handle_pg_summary from " << m->get_source() << endl;
   int from = MSG_ADDR_NUM(m->get_source());
 
@@ -1307,7 +1160,7 @@ void OSD::handle_pg_summary(MOSDPGSummary *m)
   map< int, map<pg_t,version_t> > query_map;    // peer -> PG -> get_summary_since
 
   pg_t pgid = m->get_pgid();
-  PG::PGContentSummary *sum = m->get_summary();
+  PG::PGSummary *sum = m->get_summary();
   PG *pg = get_pg(pgid);
   assert(pg);
 
@@ -1381,7 +1234,7 @@ void OSD::handle_pg_summary(MOSDPGSummary *m)
        // initiate any recovery?
        pg->plan_recovery();
   }
-  
+  */
   delete m;
 }
 
@@ -1404,9 +1257,10 @@ void OSD::pg_pull(PG *pg, int maxops)
   int ops = pg->num_active_ops();
 
   dout(7) << "pg_pull pg " << *pg 
-                 << " " << pg->objects_missing.size() << " to do, " 
+                 << " " << pg->missing.num_missing() << " to do, " 
                  << ops << "/" << maxops << " active" <<  endl;
   
+  /*
   while (ops < maxops &&
                 !pg->recovery_queue.empty()) {
        map<version_t, PG::ObjectInfo>::iterator first = pg->recovery_queue.upper_bound(pg->requested_through);
@@ -1415,12 +1269,13 @@ void OSD::pg_pull(PG *pg, int maxops)
        pg->requested_through = first->first;
 
        ops++;
-  }  
+  } 
+  */ 
 }
 
-void OSD::pull_replica(PG *pg, PG::ObjectInfo& oi)
+void OSD::pull_replica(PG *pg, object_t oid, version_t v)
 {
-  // get peer
+/*  // get peer
   dout(7) << "pull_replica " << hex << oi.oid << dec 
                  << " v " << oi.version 
                  << " from osd" << oi.osd << endl;
@@ -1438,8 +1293,10 @@ void OSD::pull_replica(PG *pg, PG::ObjectInfo& oi)
   // take note
   pull_ops[tid] = oi;
   pg->objects_pulling[oi.oid] = oi;
+*/
 }
 
+
 void OSD::op_rep_pull(MOSDOp *op)
 {
   long got = 0;
@@ -1481,6 +1338,7 @@ void OSD::op_rep_pull(MOSDOp *op)
 
 void OSD::op_rep_pull_reply(MOSDOpReply *op)
 {
+  /*
   object_t o = op->get_oid();
   version_t v = op->get_version();
 
@@ -1529,6 +1387,7 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op)
        take_waiters(pg->waiting_for_missing_object[o]);
 
   delete op;
+  */
 }
 
 
@@ -1677,13 +1536,13 @@ void OSD::handle_op(MOSDOp *op)
 
          if (!pg->is_complete()) {
                // consult PG object map
-               if (pg->objects_missing.count(oid)) {
+               if (pg->missing.missing.count(oid)) {
                  // need to pull
-                 version_t v = pg->objects_missing[oid];
+                 version_t v = pg->missing.missing[oid];
                  dout(7) << "need to pull object " << hex << oid << dec 
                                  << " v " << v << endl;
                  if (!pg->objects_pulling.count(oid)) 
-                       pull_replica(pg, pg->recovery_queue[v]);
+                       pull_replica(pg, oid, v);
                  pg->waiting_for_missing_object[oid].push_back(op);
                  return;
                }
@@ -2131,9 +1990,9 @@ void OSD::op_modify(MOSDOp *op)
   
   // do it
   Context *oncommit = new C_OSD_WriteCommit(this, repop);
-
   op_apply(op, nv, oncommit);
 
+  // local ack
   get_repop(repop);
   assert(repop->waitfor_ack.count(0));
   repop->waitfor_ack.erase(0);
index d4eebf190986ace626fa8d4e704a83b4c8928121..dabbb8557a5295ab1edebef9b79001f988d0dd64 100644 (file)
@@ -176,14 +176,14 @@ public:
   void handle_rep_op_ack(__uint64_t tid, int result, bool commit, int fromosd);
 
   // recovery
-  map<tid_t,PG::ObjectInfo>  pull_ops;   // tid -> PGPeer*
+  //map<tid_t,PG::ObjectInfo>  pull_ops;   // tid -> PGPeer*
 
   void do_notifies(map< int, list<PG::PGInfo> >& notify_list);
   void do_queries(map< int, map<pg_t,version_t> >& query_map);
   void repeer(PG *pg, map< int, map<pg_t,version_t> >& query_map);
 
   void pg_pull(PG *pg, int maxops);
-  void pull_replica(PG *pg, PG::ObjectInfo& oi);
+  void pull_replica(PG *pg, object_t, version_t);
 
   bool require_current_map(Message *m, version_t v);
   bool require_same_or_newer_map(Message *m, epoch_t e);
@@ -191,6 +191,7 @@ public:
   void handle_pg_query(class MOSDPGQuery *m);
   void handle_pg_notify(class MOSDPGNotify *m);
   void handle_pg_summary(class MOSDPGSummary *m);
+  void handle_pg_log(class MOSDPGLog *m);
 
   void op_rep_pull(class MOSDOp *op);
   void op_rep_pull_reply(class MOSDOpReply *op);
index 27207cd583d18a493b5b4e22af41242ecb8ef819..c24b81b9198ffc915efa8409665d8f8a2bf860cc 100644 (file)
 #include "config.h"
 #include "OSD.h"
 
+
+#include "messages/MOSDPGLog.h"
+#include "messages/MOSDPGSummary.h"
+
+
 #undef dout
 #define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << "osd" << osd->whoami << " " << *this << " "
 
 
+/******* PGLog ********/
+
+
+void PG::PGLog::trim(version_t s) 
+{
+  // trim updated
+  while (!updated.empty()) {
+       map<version_t,object_t>::iterator rit = rupdated.begin();
+       if (rit->first > s) break;
+       updated.erase(rit->second);
+       rupdated.erase(rit);
+  }
+  
+  // trim deleted
+  while (!deleted.empty()) {
+       map<version_t, object_t>::iterator rit = rdeleted.begin();
+       if (rit->first > s) break;
+       deleted.erase(rit->second);
+       rdeleted.erase(rit);
+  }
+  
+  bottom = s;
+}
+
+
+void PG::PGLog::copy_after(const PGLog &other, version_t v) 
+{
+  assert(v >= other.bottom);
+  bottom = top = v;
+  assert(updated.empty() && deleted.empty());
+  
+  // updated
+  for (map<version_t, object_t>::const_iterator it = other.rupdated.upper_bound(v); // first > v
+          it != other.updated.end();
+          it++) 
+       add_update(it->second, it->first);
+  
+  // deleted
+  for (map<version_t, object_t>::const_iterator it = other.rdeleted.upper_bound(v); // first > v
+          it != other.rdeleted.end();
+          it++) 
+       add_delete(it->second, it->first);
+  
+  assert(top == other.top);
+}
+
+
+void PG::PGLog::merge_after(version_t after, const PGLog &other) 
+{
+  // extend on bottom?
+  if (other.bottom < bottom &&
+         after < bottom) {
+       version_t newbottom = after;
+       if (after > other.bottom) 
+         newbottom = other.bottom;  // skip part of other
+       
+       // updated
+       for (map<version_t,object_t>::const_iterator p = other.rupdated.upper_bound(newbottom); // first > newbottom
+                p != other.rupdated.end();
+                p++) {
+         if (p->first > bottom) break;
+         if (updated.count(p->second) || deleted.count(p->second)) continue;
+         updated[p->second] = p->first;
+         rupdated[p->first] = p->second;
+       }
+       
+       // deleted
+       for (map<version_t,object_t>::const_iterator p = other.rdeleted.upper_bound(newbottom);
+                p != other.rdeleted.end();
+                p++) {
+         if (p->first > bottom) break;
+         if (updated.count(p->second) || deleted.count(p->second)) continue;
+         deleted[p->second] = p->first;
+         rdeleted[p->first] = p->second;
+       }
+       bottom = newbottom;
+  }
+  
+  // extend on top?
+  if (other.top > top) {
+       map<version_t,object_t>::const_iterator pu = other.rupdated.lower_bound(top);
+       map<version_t,object_t>::const_iterator pd = other.rdeleted.lower_bound(top);
+       
+       // both
+       while (pu != other.rupdated.end() && pd != other.rdeleted.end()) {
+         assert(pd->first != pu->first);
+         if (pu->first > pd->first) {
+               add_update(pu->second, pu->first);
+               pu++;
+         } else {
+               add_delete(pd->second, pd->first);
+               pd++;
+         }
+       }
+       // tail
+       while (pu != other.rupdated.end()) {
+         add_update(pu->second, pu->first);
+         pu++;
+       }
+       while (pd != other.rdeleted.end()) {
+         add_delete(pd->second, pd->first);
+         pd++;
+       }
+       top = other.top;
+  }
+}
+
+void PG::PGLog::print(ostream& out) const 
+{
+  out << " " << bottom << " - " << top << endl;
+  
+  map<version_t,object_t>::const_iterator pu = rupdated.begin();
+  map<version_t,object_t>::const_iterator pd = rdeleted.begin();
+  
+  // both
+  while (pu != rupdated.end() && pd != rdeleted.end()) {
+       assert(pd->first != pu->first);
+       if (pu->first > pd->first) {
+         out << " " << pu->first << "   " << hex << pu->second << dec << endl;
+         pu++;
+       } else {
+         out << " " << pd->first << " - " << hex << pd->second << dec << endl;
+         pd++;
+       }
+  }
+  // tail
+  while (pu != rupdated.end()) {
+       out << " " << pu->first << "   " << hex << pu->second << dec << endl;
+       pu++;
+  }
+  while (pd != rdeleted.end()) {
+       out << " " << pd->first << " - " << hex << pd->second << dec << endl;
+       pd++;
+  }
+  out << " " << top << " top" << endl;
+}
+
+
+
+/******* PG ***********/
+void PG::build_prior()
+{
+  // build prior set.
+  prior_set.clear();
+  
+  // current
+  for (unsigned i=1; i<acting.size(); i++)
+       prior_set.insert(acting[i]);
+
+  // and prior map(s), if OSDs are still up
+  for (version_t epoch = last_epoch_started_any;
+          epoch < osd->osdmap->get_epoch();
+          epoch++) {
+       OSDMap *omap = osd->get_osd_map(epoch);
+       assert(omap);
+       
+       vector<int> acting;
+       omap->pg_to_acting_osds(get_pgid(), acting);
+       
+       for (unsigned i=0; i<acting.size(); i++) 
+         if (osd->osdmap->is_up(acting[i]) &&  // is up now
+                 omap->is_up(acting[i]) &&         // and was up then
+                 acting[i] != osd->whoami)         // and not me
+               prior_set.insert(acting[i]);
+  }
+
+  dout(10) << "build_prior " << prior_set << endl;
+}
+
+void PG::adjust_prior()
+{
+  assert(!prior_set.empty());
+
+  // raise last_epoch_started_any
+  epoch_t max;
+  for (map<int,PGInfo>::iterator it = peer_info.begin();
+          it != peer_info.end();
+          it++) {
+       if (it->second.last_epoch_started > max)
+         max = it->second.last_epoch_started;
+  }
+  assert(max > last_epoch_started_any);
+  last_epoch_started_any = max;
+  
+  // rebuild prior set
+  build_prior();
+}
+
+
+void PG::peer(map< int, map<pg_t,version_t> >& query_map)
+{
+  dout(10) << "peer" << endl;
+
+  // -- query info from everyone in prior_set.
+  bool missing_info = false;
+  for (set<int>::iterator it = prior_set.begin();
+          it != prior_set.end();
+          it++) {
+       if (peer_info.count(*it)) {
+         dout(10) << " have info from osd" << *it << endl;       
+         continue;
+       }
+       missing_info = true;
+
+       if (peer_info_requested.count(*it)) {
+         dout(10) << " waiting for osd" << *it << endl;
+         continue;
+       }
+       
+       dout(10) << " querying info from osd" << *it << endl;
+       query_map[*it][info.pgid] = 0;
+       peer_info_requested.insert(*it);
+  }
+  if (missing_info) return;
+  
+  
+  // -- ok, we have all (prior_set) info.  (and maybe others.)
+  // who (of _everyone_ we're heard from) has the latest PG version?
+  version_t newest_update = info.last_update;
+  int       newest_update_osd = osd->whoami;
+  version_t oldest_update_needed = info.last_update;  // only of acting (current) osd set
+  
+  for (map<int,PGInfo>::iterator it = peer_info.begin();
+          it != peer_info.end();
+          it++) {
+       if (it->second.last_update > newest_update) {
+         newest_update = it->second.last_update;
+         newest_update_osd = it->first;
+       }
+       if (is_acting(it->first) &&
+               it->second.last_update < oldest_update_needed) 
+         oldest_update_needed = it->second.last_update;
+  }
+  
+  // get log?
+  if (newest_update_osd != osd->whoami) {
+       if (peer_log_requested.count(newest_update_osd)) {
+         dout(10) << " newest update on osd" << newest_update_osd
+                          << " v " << newest_update 
+                          << ", already queried" 
+                          << endl;
+       } else {
+         dout(10) << " newest update on osd" << newest_update_osd
+                          << " v " << newest_update 
+                          << ", querying since " << oldest_update_needed
+                          << endl;
+         query_map[newest_update_osd][info.pgid] = oldest_update_needed;
+         peer_info_requested.insert(newest_update_osd);
+       }
+       return;
+  } else {
+       dout(10) << " i have the most up-to-date log " << info.last_update << endl;
+  }
+
+  // -- is that the whole story?  (is my log sufficient?)
+  if (info.last_complete < log.bottom) {
+       // nope.  fetch a summary from someone.
+       if (peer_summary.count(newest_update_osd)) {
+         dout(10) << "i am complete thru " << info.last_complete
+                          << ", but my log starts at " << log.bottom 
+                          << ".  already waiting for summary from osd" << newest_update_osd
+                          << endl;
+       } else {
+         dout(10) << "i am complete thru " << info.last_complete
+                          << ", but my log starts at " << log.bottom 
+                          << ".  fetching summary from osd" << newest_update_osd
+                          << endl;
+         assert(newest_update_osd != osd->whoami);  // can't be me!
+         query_map[newest_update_osd][info.pgid] = 1;
+         peer_summary_requested.insert(newest_update_osd);
+       }
+       return;
+  }
+  
+  // -- ok.  and have i located all pg contents?
+  if (missing.num_lost()) {
+       dout(10) << "there are still " << missing.num_lost() << " lost objects" << endl;
+
+       // ok, let's get more summaries!
+       bool waiting = false;
+       for (map<int,PGInfo>::iterator it = peer_info.begin();
+                it != peer_info.end();
+                it++) {
+         int peer = it->first;
+         if (peer_summary.count(peer)) {
+               dout(10) << " have summary from osd" << peer << endl;
+               continue;
+         }
+         if (peer_summary_requested.count(peer)) {
+               dout(10) << " already requested summary from osd" << peer << endl;
+               waiting = true;
+               continue;
+         }
+
+         dout(10) << " requesting summary from osd" << peer << endl;     
+         query_map[peer][info.pgid] = 1;
+         peer_summary_requested.insert(peer);
+         waiting = true;
+       }
+
+       if (!waiting) {
+         dout(10) << missing.num_lost() << " objects are still lost, waiting+hoping for a notify from someone else!" << endl;
+       }
+       return;
+  }
+
+
+  // -- do i need to generate a larger log for any of my peers?
+  PGSummary summary;
+  if (oldest_update_needed > log.bottom) {
+       dout(10) << "my log isn't long enough for all peers: bottom " 
+                        << log.bottom << " > " << oldest_update_needed
+                        << endl;
+       generate_summary(summary);
+  }
+
+
+  // -- ok, activate!
+  bool allclean = true;
+  for (unsigned i=1; i<acting.size(); i++) {
+       int peer = acting[i];
+       assert(peer_info.count(peer));
+       
+       if (peer_info[peer].last_update < log.bottom) {
+         // need full summary
+         dout(10) << "sending complete summary to osd" << peer
+                          << ", their last_update was " << peer_info[peer].last_update 
+                          << endl;
+         MOSDPGSummary *m = new MOSDPGSummary(osd->osdmap->get_epoch(),
+                                                                                  info.pgid,
+                                                                                  summary);
+         allclean = false;
+         osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
+       } else {
+         // need incremental (or no) log update.
+         dout(10) << "sending incremental|empty log " 
+                          << peer_info[peer].last_update << " - " << info.last_update
+                          << " to osd" << peer << endl;
+         MOSDPGLog *m = new MOSDPGLog(osd->osdmap->get_epoch(), 
+                                                                  info.pgid);
+         if (peer_info[peer].last_update < info.last_update) {
+               m->log.copy_after(log, peer_info[peer].last_update);
+               allclean = false;
+         }
+         osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
+       }
+  }
+
+  // do anything about allclean?
+  // ???
+  
+  // clean up some
+  clear_content_recovery_state();
+
+  // i am active!
+  state_set(PG::STATE_ACTIVE);
+  osd->take_waiters(waiting_for_active);
+}
+
 
-void PG::generate_content_summary()
+void PG::generate_summary(PGSummary &summary)
 {  
   dout(10) << "generating summary" << endl;
 
   list<object_t> olist;
   osd->store->collection_list(info.pgid, olist);
   
-  content_summary = new PGContentSummary;
-
   for (list<object_t>::iterator it = olist.begin();
           it != olist.end();
           it++) {
-       ObjectInfo item(*it);
-       osd->store->getattr(item.oid
+       version_t v;
+       osd->store->getattr(*it
                                                "version",
-                                               &item.version, sizeof(item.version));
-       item.osd = osd->whoami;
-       content_summary->ls.push_back(item);
+                                               &v, sizeof(v));
+       summary.objects[*it] = v;
   }
+
+  dout(10) << summary.objects.size() << " local objects.  " << endl;
 }
 
 
@@ -50,8 +413,8 @@ void PG::plan_recovery()
   dout(10) << "plan_recovery " << endl;  
   
   assert(is_active());
-  assert(content_summary);
   
+  /*
   // load local contents
   list<object_t> olist;
   osd->store->collection_list(info.pgid, olist);
@@ -96,6 +459,8 @@ void PG::plan_recovery()
                         << " v " << it->second << endl;
        osd->store->remove(it->first);
   }
+
+  */
 }
 
 void PG::do_recovery()
index 50c1243b68e6194ccc9df225c66e0c06e697e936..9ce6c76a6493679ff4ced4a831dd0ece3544e77f 100644 (file)
@@ -39,112 +39,152 @@ class OSD;
 class PG {
 public:
   
-  /** ObjectInfo
-   * summary info about an object (replica)
+  /*
+   * PGInfo - summary of PG statistics.
    */
-  struct ObjectInfo {
-       object_t oid;
-       version_t version;
-       int osd;   // -1 = unknown.  if local, osd == whoami.
-       ObjectInfo(object_t o=0, version_t v=0, int os=-1) : oid(o), version(v), osd(os) {}
-  };
-  
   struct PGInfo {
        pg_t pgid;
-       version_t last_update;    // last object version applied.
-       version_t last_complete;  // last pg version pg was complete.
+       version_t last_update;    // last object version logged/updated.
+       version_t last_complete;  // last version pg was complete through.
+       version_t log_floor;      // oldest log entry.
        epoch_t last_epoch_started;  // last epoch started.
        epoch_t last_epoch_finished; // last epoch finished.
-       epoch_t same_primary_since;  // 
+       epoch_t same_primary_since;  // first epoch the current primary was primary.
        PGInfo(pg_t p=0) : pgid(p), 
                                           last_update(0), last_complete(0),
                                           last_epoch_started(0), last_epoch_finished(0),
                                           same_primary_since(0) {}
   };
   
-  struct PGContentSummary {
-       //version_t since;
-       int remote, missing;
-       list<ObjectInfo> ls;
-
+  /*
+   * PGSummary - snapshot of full pg contents
+   */
+  class PGSummary {
+  public:
+       map<object_t, version_t> objects;  // objects i currently store.
+       //PGMissing                missing;  // objects i am missing (to get thru info.last_update).
+       
        void _encode(bufferlist& blist) {
-         //blist.append((char*)&since, sizeof(since));
-         blist.append((char*)&remote, sizeof(remote));
-         blist.append((char*)&missing, sizeof(missing));
-         ::_encode(ls, blist);
+         ::_encode(objects, blist);
+         //missing._encode(blist);
        }
        void _decode(bufferlist& blist, int& off) {
-         //blist.copy(off, sizeof(since), (char*)&since);
-         //off += sizeof(since);
-         blist.copy(off, sizeof(remote), (char*)&remote);
-         off += sizeof(remote);
-         blist.copy(off, sizeof(missing), (char*)&missing);
-         off += sizeof(missing);
-         ::_decode(ls, blist, off);
+         ::_decode(objects, blist, off);
+         //missing._decode(blist, off);
        }
-
-       PGContentSummary() : remote(0), missing(0) {}
   };
 
-  
-  /** PGPeer
-   * state associated with non-primary OSDS with PG content.
-   * only used by primary.
+  /*
+   * PGMissing - summary of missing objects.
+   *  kept in memory, as a supplement to PGLog.
+   *  also used to pass missing info in messages.
    */
-  
-  class PGPeer {
+  class PGMissing {
   public:
-       // bits
-       static const int STATE_INFO      = 1;  // we have info
-       static const int STATE_SUMMARY  = 2;  // we have summary
-       static const int STATE_QINFO     = 4;  // we are querying info|summary.
-       static const int STATE_QSUMMARY = 8;  // we are querying info|summary.
-       static const int STATE_WAITING   = 16; // peer is waiting for go.
-       static const int STATE_ACTIVE    = 32; // peer is active.
-       //static const int STATE_COMPLETE  = 64; // peer is complete.
-
-       class PG *pg;
-  private:
-       int       peer;
-       int       role;
-       int       state;
-       
+       map<object_t, version_t> missing;   // oid -> v
+       map<version_t, object_t> rmissing;  // v -> oid
+
+       map<object_t, int>       loc;       // where i think i can get them.
+
+       int num_lost() { return missing.size() - loc.size(); }
+       int num_missing() { return missing.size(); }
+
+       void _encode(bufferlist& blist) {
+         ::_encode(missing, blist);
+         ::_encode(loc, blist);
+       }
+       void _decode(bufferlist& blist, int& off) {
+         ::_decode(missing, blist, off);
+         ::_decode(loc, blist, off);
+
+         for (map<object_t,version_t>::iterator it = missing.begin();
+                  it != missing.end();
+                  it++) 
+               rmissing[it->second] = it->first;
+       }
+  };
+
+  /*
+   * PGLog - incremental log of recent pg changes.
+   *  summary of persistent on-disk copy:
+   *   multiply-modified objects are implicitly trimmed from in-memory log.
+   *  also, serves as a recovery queue.
+   */
+  class PGLog {
   public:
-       // peer state
-       PGInfo            info;
-       PGContentSummary *content_summary;
-       
-       friend class PG;
+       version_t top;           // corresponds to newest entry.
+       version_t bottom;        // corresponds to entry prio to oldest entry (t=bottom is trimmed).
+       map<object_t, version_t> updated;  // oid -> v. items > bottom, + version.
+       map<version_t, object_t> rupdated; // v -> oid.
+       map<object_t, version_t> deleted;  // oid -> when.  items <= bottom that no longer exist
+       map<version_t, object_t> rdeleted; // when -> oid.
        
-  public:
-       PGPeer(class PG *pg, int p, int ro) : 
-         pg(pg), 
-         peer(p),
-         role(ro),
-         state(0),
-         content_summary(NULL) { }
-       ~PGPeer() {
-         if (content_summary) delete content_summary;
+       PGLog() : top(0), bottom(0) {}
+
+       void _reverse(map<object_t, version_t> &fw, map<version_t, object_t> &bw) {
+         for (map<object_t,version_t>::iterator it = fw.begin();
+                  it != fw.end();
+                  it++) 
+               bw[it->second] = it->first;
+       }
+       void _encode(bufferlist& blist) const {
+         blist.append((char*)&top, sizeof(top));
+         blist.append((char*)&bottom, sizeof(bottom));
+         ::_encode(updated, blist);
+         ::_encode(deleted, blist);
+       }
+       void _decode(bufferlist& blist, int& off) {
+         blist.copy(off, sizeof(top), (char*)&top);
+         off += sizeof(top);
+         blist.copy(off, sizeof(bottom), (char*)&bottom);
+         off += sizeof(bottom);
+         ::_decode(updated, blist, off);
+         ::_decode(deleted, blist, off);
+
+         _reverse(updated, rupdated);
+         _reverse(deleted, rdeleted);
        }
-       
-       int get_peer() { return peer; }
-       int get_role() { return role; }
-       
-       int get_state() { return state; } 
-       bool state_test(int m) { return (state & m) != 0; }
-       void state_set(int m) { state |= m; }
-       void state_clear(int m) { state &= ~m; }
-       
-       bool have_info() { return state_test(STATE_INFO); }
-       bool have_summary() { return state_test(STATE_SUMMARY); }
 
-       bool is_waiting() { return state_test(STATE_WAITING); }
-       bool is_active() { return state_test(STATE_ACTIVE); }
-       bool is_complete() { return have_info() &&
-                                                  info.last_update == info.last_complete; }
+
+
+       // accessors
+       version_t is_updated(object_t oid) {
+         if (updated.count(oid)) return updated[oid];
+         return 0;
+       }
+       version_t is_deleted(object_t oid) {
+         if (deleted.count(oid)) return deleted[oid];
+         return 0;
+       }
+
+       // actors
+       void add_update(object_t oid, version_t v) {
+         updated[oid] = v;
+         rupdated[v] = oid;
+         if (deleted.count(oid)) {
+               assert(v > deleted[oid]);      // future deletions or past mods impossible.
+               rdeleted.erase(deleted[oid]);
+               deleted.erase(oid);
+         }
+         assert(v > top);
+         top = v;
+       }
+       void add_delete(object_t oid, version_t when) {
+         deleted[oid] = when;
+         rdeleted[when] = oid;
+         assert(when > top);
+         top = when;
+       }
+
+       void trim(version_t s);
+       void copy_after(const PGLog &other, version_t v);
+       void merge_after(version_t after, const PGLog &other);
+       void print(ostream& out) const;
   };
   
 
+
+
   /*** PG ****/
 public:
   // any
@@ -161,22 +201,57 @@ public:
  protected:
   OSD *osd;
 
-  // generic state
 public:
+  // pg state
   PGInfo      info;
-  PGContentSummary *content_summary;
+  PGLog       log;
+  PGMissing   missing;
 
 protected:
   int         role;    // 0 = primary, 1 = replica, -1=none.
   int         state;   // see bit defns above
 
   // primary state
+ public:
+  vector<int> acting;
+  epoch_t     last_epoch_started_any;
+
+ protected:
+  // [primary only] content recovery state
+  set<int>    prior_set;   // current+prior OSDs, as defined by last_epoch_started_any.
+  set<int>    stray_set;   // non-acting osds that have PG data.
+  map<int, PGInfo>      peer_info;  // info from peers (stray or prior)
+  set<int>              peer_info_requested;
+  map<int, PGLog*>      peer_log;   // logs from peers (for recovering pg content)
+  map<int, PGMissing*>  peer_missing;
+  set<int>              peer_log_requested;
+  map<int, PGSummary*>  peer_summary;   // full contents of peers
+  set<int>              peer_summary_requested;
+  friend class OSD;
+
 public:
-  epoch_t           last_epoch_started_any;
-  map<int, PGPeer*> peers;  // primary: (soft state) active peers
+  void clear_content_recovery_state() {
+       prior_set.clear();
+       stray_set.clear();
+       peer_info.clear();
+       peer_info_requested.clear();
+       peer_log.clear();
+       peer_missing.clear();
+       peer_log_requested.clear();
+       peer_summary.clear();
+  }
 
  public:
-  vector<int> acting;
+  bool is_acting(int osd) const { 
+       for (unsigned i=0; i<acting.size(); i++)
+         if (acting[i] == osd) return true;
+       return false;
+  }
+  bool is_prior(int osd) const { return prior_set.count(osd); }
+  bool is_stray(int osd) const { return stray_set.count(osd); }
+
+  void build_prior();
+  void adjust_prior();  // based on new peer_info.last_epoch_started_any
 
   // pg waiters
   list<class Message*>            waiting_for_active;
@@ -184,20 +259,21 @@ public:
                   list<class Message*> > waiting_for_missing_object;   
 
   // recovery
-  map<object_t, version_t>   objects_missing;  // objects (versions) i need
-  map<version_t, ObjectInfo> recovery_queue;   // objects i need to pull (in order)
-  version_t requested_through;
-  map<object_t, ObjectInfo>  objects_pulling;  // which objects are currently being pulled
+  version_t                requested_thru;
+  map<object_t, version_t> objects_pulling;  // which objects are currently being pulled
   
+  void peer(map< int, map<pg_t,version_t> >& query_map);
+  void generate_summary(PGSummary &summary);
+
   void plan_recovery();
-  void generate_content_summary();
   void do_recovery();
+  void do_clean();
 
 
  public:  
   PG(OSD *o, pg_t p) : 
        osd(o), 
-       info(p), content_summary(0),
+       info(p),
        role(0),
        state(0)
   { }
@@ -240,28 +316,7 @@ public:
        return objects_pulling.size();
   }
 
-  // peers
-  map<int, PGPeer*>& get_peers() { return peers; }
-  PGPeer* get_peer(int p) {
-       if (peers.count(p)) return peers[p];
-       return 0;
-  }
-  PGPeer* new_peer(int p, int r) {
-       return peers[p] = new PGPeer(this, p, r);
-  }
-  void remove_peer(int p) {
-       assert(peers.count(p));
-       delete peers[p];
-       peers.erase(p);
-  }
-  void drop_peers() {
-       for (map<int,PGPeer*>::iterator it = peers.begin();
-                it != peers.end();
-                it++)
-         delete it->second;
-       peers.clear();
-  }
-
 
   // pg state storage
   /*
@@ -283,14 +338,6 @@ public:
 };
 
 
-inline ostream& operator<<(ostream& out, PG::ObjectInfo& oi) 
-{
-  return out << "object[" << hex << oi.oid << dec 
-                        << " v " << oi.version 
-                        << " osd" << oi.osd
-                        << "]";
-}
-
 inline ostream& operator<<(ostream& out, PG::PGInfo& pgi) 
 {
   return out << "pgi(" << hex << pgi.pgid << dec 
@@ -299,6 +346,12 @@ inline ostream& operator<<(ostream& out, PG::PGInfo& pgi)
                         << ")";
 }
 
+inline ostream& operator<<(ostream& out, PG::PGLog& log) 
+{
+  log.print(out);
+  return out;
+}
+
 inline ostream& operator<<(ostream& out, PG& pg)
 {
   out << "pg[" << pg.info 
index 23689822bc3c36cba299c52c6cbda4d74efc2512..2094eca7ae257b1c0aa096c719350c32aa7521a7 100644 (file)
@@ -82,15 +82,15 @@ void Filer::file_to_extents(inode_t inode,
        else
          x_len = left;
        
-       if (ex->offset + ex->len == x_offset) {
+       if (ex->start + ex->length == x_offset) {
          // add to extent
-         ex->len += x_len;
+         ex->length += x_len;
        } else {
          // new extent
-         assert(ex->len == 0);
-         assert(ex->offset == 0);
-         ex->offset = x_offset;
-         ex->len = x_len;
+         assert(ex->length == 0);
+         assert(ex->start == 0);
+         ex->start = x_offset;
+         ex->length = x_len;
        }
        ex->buffer_extents[cur-offset] = x_len;
                
index a87b8a36fc0e6fa22eecc68ecf2534470b8cb2bc..eded1d140b4635dd7bb7f4a6d8ccb55eed565cc9 100644 (file)
@@ -71,13 +71,7 @@ class Filer {
        Objecter::OSDRead *rd = new Objecter::OSDRead(bl);
        file_to_extents(inode, len, offset, rd->extents);
 
-       // cacheless async?
-       if (oc == 0) 
-         return objecter->readx(rd, onfinish);
-
-       // use cache
-       oc->readx(rd, inode.ino, onfinish);
-       return 0;
+       return objecter->readx(rd, onfinish);
   }
 
   int write(inode_t& inode,
@@ -90,13 +84,7 @@ class Filer {
        Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
        file_to_extents(inode, len, offset, wr->extents);
 
-       // cacheles async?
-       if (oc == 0) 
-         return objecter->writex(wr, onack, oncommit);
-
-       // use cache
-       oc->writex(wr, inode.ino, onack, oncommit);
-       return 0;
+       return objecter->writex(wr, onack, oncommit);
   }
 
   int zero(inode_t& inode,
@@ -107,13 +95,28 @@ class Filer {
        Objecter::OSDZero *z = new Objecter::OSDZero;
        file_to_extents(inode, len, offset, z->extents);
 
-       // cacheless async?
-       if (oc == 0) 
-         return objecter->zerox(z, onack, oncommit);
+       return objecter->zerox(z, onack, oncommit);
+  }
+
+
+  /*** async+caching (non-blocking) file interface ***/
+  int caching_read(inode_t& inode,
+                                  size_t len, 
+                                  off_t offset, 
+                                  bufferlist *bl,
+                                  Context *onfinish) {
+       Objecter::OSDRead *rd = new Objecter::OSDRead(bl);
+       file_to_extents(inode, len, offset, rd->extents);
+       return oc->readx(rd, inode.ino, onfinish);
+  }
 
-       // actually: mds should never do this, and clients don't do zero().
-       assert(0);
-       return 0;
+  int caching_write(inode_t& inode,
+                                       size_t len, 
+                                       off_t offset, 
+                                       bufferlist& bl) {
+       Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
+       file_to_extents(inode, len, offset, wr->extents);
+       return oc->writex(wr, inode.ino);
   }
 
 
@@ -122,27 +125,25 @@ class Filer {
   
   int atomic_sync_read(inode_t& inode,
                                           size_t len, off_t offset,
-                                          bufferlist *bl) {
+                                          bufferlist *bl,
+                                          Mutex &lock) {
        Objecter::OSDRead *rd = new Objecter::OSDRead(bl);
        file_to_extents(inode, len, offset, rd->extents);
 
        assert(oc);
-       int r = oc->atomic_sync_readx(rd, inode.ino, 
-                                                                 0);  // block.
+       int r = oc->atomic_sync_readx(rd, inode.ino, lock);
        return r;
   }
 
   int atomic_sync_write(inode_t& inode,
                                                size_t len, off_t offset,
                                                bufferlist& bl,
-                                               Context *oncommit) {
+                                               Mutex &lock) {
        Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
        file_to_extents(inode, len, offset, wr->extents);
 
        assert(oc);
-       int r = oc->atomic_sync_writex(wr, inode.ino, 
-                                                                  0,  // block
-                                                                  oncommit);
+       int r = oc->atomic_sync_writex(wr, inode.ino, lock);
        return r;
   }
 
index 9c1785c88061c794f886ac22ace5565b338aeab3..e8ad74e150f08147fa87163f70e7f5292fc99a2a 100644 (file)
@@ -7,23 +7,23 @@
 
 /*** ObjectCacher::Object ***/
 
-BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
+ObjectCacher::BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
 {
   dout(20) << "split " << *bh << " at " << off << endl;
   
   // split off right
-  ObjectCacher::BufferHead *right = new ObjectCacher::BufferHead();
+  ObjectCacher::BufferHead *right = new BufferHead();
   right->set_version(bh->get_version());
   right->set_state(bh->get_state());
   
-  block_t newleftlen = off - bh->start();
+  off_t newleftlen = off - bh->start();
   right->set_start( off );
   right->set_length( bh->length() - newleftlen );
   
   // shorten left
-  stat_sub(bh);
+  oc->bh_stat_sub(bh);
   bh->set_length( newleftlen );
-  stat_add(bh);
+  oc->bh_stat_add(bh);
   
   // add right
   add_bh(right);
@@ -39,7 +39,7 @@ BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
   
   // move read waiters
   if (!bh->waitfor_read.empty()) {
-       map<block_t, list<Context*> >::iterator o, p = bh->waitfor_read.end();
+       map<off_t, list<Context*> >::iterator o, p = bh->waitfor_read.end();
        p--;
        while (p != bh->waitfor_read.begin()) {
          if (p->first < right->start()) break;   
@@ -56,28 +56,57 @@ BufferHead *ObjectCacher::Object::split(BufferHead *bh, off_t off)
   return right;
 }
 
+
+void ObjectCacher::Object::merge(BufferHead *left, BufferHead *right)
+{
+  assert(left->end() == right->start());
+  assert(left->get_state() == right->get_state());
+
+  dout(10) << "merge " << *left << " + " << *right << endl;
+  left->set_length( left->length() + right->length());
+
+  // data
+  left->bl.claim_append(right->bl);
+  
+  // version
+  left->set_version( MAX( left->get_version(), right->get_version() ) );
+
+  // waiters
+  for (map<off_t, list<Context*> >::iterator p = right->waitfor_read.begin();
+          p != right->waitfor_read.end();
+          p++) 
+       left->waitfor_read[p->first].splice( left->waitfor_read[p->first].begin(),
+                                                                                p->second );
+  
+  // hose right
+  data.erase(right->start());
+  delete right;
+
+  dout(10) << "merge result " << *left << endl;
+}
+
 /*
- * map a range of blocks into buffer_heads.
+ * map a range of bytes into buffer_heads.
  * - create missing buffer_heads as necessary.
  */
 int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
-                                   map<block_t, BufferHead*>& hits,
-                                   map<block_t, BufferHead*>& missing,
-                                   map<block_t, BufferHead*>& rx)
+                                   map<off_t, BufferHead*>& hits,
+                                   map<off_t, BufferHead*>& missing,
+                                   map<off_t, BufferHead*>& rx)
 {
-  for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
-       ex_it != wr->extents.end();
+  for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
+       ex_it != rd->extents.end();
        ex_it++) {
     
     if (ex_it->oid != oid) continue;
     
-    dout(10) << "map_read " << ex_it->oid << " " << ex_it->offset << "~" << ex_it->len << endl;
+    dout(10) << "map_read " << ex_it->oid << " " << ex_it->start << "~" << ex_it->length << endl;
     
-    map<off_t, ObjectCacher::BufferHead*>::iterator p = data.lower_bound(start);
+    map<off_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
     // p->first >= start
     
-    size_t cur = ex_it->offset;
-    size_t left = ex_it->len;
+    off_t cur = ex_it->start;
+    off_t left = ex_it->length;
     
     if (p != data.begin() && 
         (p == data.end() || p->first > cur)) {
@@ -90,7 +119,7 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
       // at end?
       if (p == data.end()) {
         // rest is a miss.
-               BufferHead *n = new ObjectCacher::BufferHead();
+               BufferHead *n = new BufferHead();
                n->set_start( cur );
                n->set_length( left );
                add_bh(n);
@@ -99,8 +128,8 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
                cur += left;
                left -= left;
         assert(left == 0);
-        assert(cur == ex_it->offset + ex_it->len);
-        break;
+        assert(cur == ex_it->start + ex_it->length);
+        break;  // no more.
       }
       
       if (p->first <= cur) {
@@ -119,7 +148,7 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
         }
         else assert(0);
         
-        size_t lenfromcur = MIN(e->end() - cur, left);
+        off_t lenfromcur = MIN(e->end() - cur, left);
         cur += lenfromcur;
         left -= lenfromcur;
         p++;
@@ -127,10 +156,10 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
         
       } else if (p->first > cur) {
         // gap.. miss
-        size_t next = p->first;
-               BufferHead *n = new ObjectCacher::BufferHead();
+        off_t next = p->first;
+               BufferHead *n = new BufferHead();
                n->set_start( cur );
-               n->set_length( MIN(next - cur), left );
+               n->set_length( MIN(next - cur, left) );
                add_bh(n);
                missing[cur] = n;
                cur += MIN(left, n->length());
@@ -147,28 +176,28 @@ int ObjectCacher::Object::map_read(Objecter::OSDRead *rd,
 
 /*
  * map a range of extents on an object's buffer cache.
- *
+ * - combine any bh's we're writing into one
  * - break up bufferheads that don't fall completely within the range
- * - cancel rx ops we obsolete.
- *   - resubmit rx ops if we split bufferheads
- *
- * - leave potentially obsoleted tx ops alone (for now)
+ * - increase the bh version number (to be larger than any it subsumes)
  */
-int ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
+ObjectCacher::BufferHead *ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
 {
+  BufferHead *final = 0;
+  version_t   max_version = 0;
+
   for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
        ex_it != wr->extents.end();
        ex_it++) {
-
+       
     if (ex_it->oid != oid) continue;
     
-    dout(10) << "map_write " << ex_it->oid << " " << ex_it->offset << "~" << ex_it->len << endl;
+    dout(10) << "map_write " << ex_it->oid << " " << ex_it->start << "~" << ex_it->length << endl;
     
-    map<off_t, ObjectCacher::BufferHead*>::iterator p = data.lower_bound(start);
+    map<off_t, BufferHead*>::iterator p = data.lower_bound(ex_it->start);
     // p->first >= start
     
-    size_t cur = ex_it->offset;
-    size_t left = ex_it->len;
+    off_t cur = ex_it->start;
+    off_t left = ex_it->length;
     
     if (p != data.begin() && 
         (p == data.end() || p->first > cur)) {
@@ -177,20 +206,18 @@ int ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
         p++;   // doesn't overlap.
     }    
     
-    ObjectCacher::BufferHead *prev = NULL;
     while (left > 0) {
-      size_t max = left;
-      
+      off_t max = left;
+
       // at end ?
       if (p == data.end()) {
-        if (prev == NULL) {
-          ObjectCacher::BufferHead *n = new ObjectCacher::BufferHead();
-          n->set_start( cur );
-          n->set_length( max );
-          add_bh(n);
-          //hits[cur] = n;          
+        if (final == NULL) {
+          final = new BufferHead();
+          final->set_start( cur );
+          final->set_length( max );
+          add_bh(final);
         } else {
-          prev->set_length( prev->length() + max );
+          final->set_length( final->length() + max );
         }
         left -= max;
         cur += max;
@@ -199,92 +226,60 @@ int ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
       
       dout(10) << "p is " << *p->second << endl;
 
+         // note highest version we see
+         if (max_version < p->second->get_version()) 
+               max_version = p->second->get_version();
+
       if (p->first <= cur) {
-        ObjectCacher::BufferHead *bh = p->second;
+        BufferHead *bh = p->second;
         dout(10) << "map_write bh " << *bh << " intersected" << endl;
         
         if (p->first < cur) {
+                 assert(final == 0);
           if (cur + max >= p->first + p->second->length()) {
             // we want right bit (one splice)
-            if (bh->is_rx() && bh_cancel_read(bh)) {
-              ObjectCacher::BufferHead *right = split(bh, cur);
-              bh_read(this, bh);          // reread left bit
-              bh = right;
-            } else if (bh->is_tx() && bh_cancel_write(bh)) {
-              ObjectCacher::BufferHead *right = split(bh, cur);
-              bh_write(this, bh);          // rewrite left bit
-              bh = right;
-            } else {
-              bh = split(bh, cur);   // just split it
-            }
-            prev = bh;  // maybe want to expand right buffer ...
+                       final = split(bh, cur);   // just split it, take right half.
             p++;
-            assert(p->second == bh);
+            assert(p->second == final);
           } else {
             // we want middle bit (two splices)
-            if (bh->is_rx() && bh_cancel_read(bh)) {
-              ObjectCacher::BufferHead *middle = split(bh, cur);
-              bh_read(this, bh);                       // reread left
-              p++;
-              assert(p->second == middle);
-              ObjectCacher::BufferHead *right = split(middle, cur + max);
-              bh_read(this(on, right);                    // reread right
-              bh = middle;
-            } else if (bh->is_tx() && bh_cancel_write(bh)) {
-              ObjectCacher::BufferHead *middle = split(bh, cur);
-              bh_write(this, bh);                       // redo left
-              p++;
-              assert(p->second == middle);
-              ObjectCacher::BufferHead *right = split(middle, cur + max);
-              bh_write(this, right);                    // redo right
-              bh = middle;
-            } else {
-              ObjectCacher::BufferHead *middle = split(bh, cur);
-              p++;
-              assert(p->second == middle);
-              split(middle, cur+max);
-              bh = middle;
-            }
-          }
+                       final = split(bh, cur);
+                       p++;
+                       assert(p->second == final);
+                       split(final, cur+max);
+                 }
         } else if (p->first == cur) {
           if (p->second->length() <= max) {
             // whole bufferhead, piece of cake.
           } else {
             // we want left bit (one splice)
-            if (bh->is_rx() && bh_cancel_read(bh)) {
-              ObjectCacher::BufferHead *right = split(bh, cur + max);
-              bh_read(this, right);                      // re-rx the right bit
-            } else if (bh->is_tx() && bh_cancel_write(bh)) {
-              ObjectCacher::BufferHead *right = split(bh, cur + max);
-              bh_write(this, right);                     // re-tx the right bit
-            } else {
-              split(bh, cur + max);        // just split
-            }
-          }
+                       split(bh, cur + max);        // just split
+                 }
+                 if (final) 
+                       merge(final,bh);
+                 else
+                       final = bh;
         }
         
-        // try to cancel tx?
-        if (bh->is_tx()) bh_cancel_write(bh);
-        
-        // put in our map
-        //hits[cur] = bh;
-        
         // keep going.
-        size_t lenfromcur = bh->end() - cur;
+        off_t lenfromcur = final->end() - cur;
         cur += lenfromcur;
         left -= lenfromcur;
         p++;
         continue; 
       } else {
         // gap!
-        size_t next = p->first;
-        size_t glen = MIN(next - cur, max);
+        off_t next = p->first;
+        off_t glen = MIN(next - cur, max);
         dout(10) << "map_write gap " << cur << "~" << glen << endl;
-        ObjectCacher::BufferHead *n = new ObjectCacher::BufferHead();
-        n->set_start( cur );
-        n->set_length( glen );
-        add_bh(n);
-        //hits[cur] = n;
+               if (final) {
+                 final->set_length( final->length() + glen );
+               } else {
+                 final = new BufferHead();
+                 final->set_start( cur );
+                 final->set_length( glen );
+                 add_bh(final);
+               }
         
         cur += glen;
         left -= glen;
@@ -292,102 +287,277 @@ int ObjectCacher::Object::map_write(Objecter::OSDWrite *wr)
       }
     }
   }
-  return(0);
+  
+  // set versoin
+  assert(final);
+  final->set_version(max_version+1);
+  dout(10) << "map_write final is " << *final << endl;
+
+  return 0;
 }
 
 /*** ObjectCacher ***/
 
 /* private */
 
-bool ObjectCacher::bh_cancel_read(BufferHead *bh)
+void ObjectCacher::bh_read(Object *ob, BufferHead *bh)
 {
-  assert(0);
-  return(0);
-}
+  dout(7) << "bh_read on " << *bh << endl;
 
-bool ObjectCacher::bh_cancel_write(BufferHead *bh)
-{
-  assert(0);
-  return(0);
+  // finisher
+  C_ReadFinish *fin = new C_ReadFinish(this, ob->get_oid(), bh->start(), bh->length());
+
+  // read req
+  Objecter::OSDRead *rd = new Objecter::OSDRead(&fin->bl);
+  
+  // object extent
+  ObjectExtent ex(ob->get_oid(), bh->start(), bh->length());
+  ex.buffer_extents[0] = bh->length();
+  rd->extents.push_back(ex);
+  
+  // go
+  objecter->readx(rd, fin);
 }
 
-void ObjectCacher::bh_read(Object *ob, BufferHead *bh)
+void ObjectCacher::bh_read_finish(object_t oid, off_t start, size_t length, bufferlist &bl)
 {
-  assert(0);
-  return(0);
+  dout(7) << "bh_read_finish " 
+                 << hex << oid << dec 
+                 << " " << start << "~" << length
+                 << endl;
+  
+  if (objects.count(oid) == 0) {
+       dout(7) << "bh_read_finish no object cache" << endl;
+       return;
+  }
+  Object *ob = objects[oid];
+
+  // apply to bh's!
+  off_t opos = start;
+  map<off_t, BufferHead*>::iterator p = ob->data.lower_bound(opos);
+  
+  while (p != ob->data.end() &&
+                opos < start+length) {
+       BufferHead *bh = p->second;
+
+       if (bh->start() > opos) {
+         dout(1) << "weirdness: gap when applying read results, " 
+                         << opos << "~" << bh->start() - opos 
+                         << endl;
+         opos = bh->start();
+         p++;
+         continue;
+       }
+
+       if (!bh->is_rx()) {
+         dout(10) << "bh_read_finish skipping non-rx " << *bh << endl;
+         continue;
+       }
+
+       assert(bh->start() == opos);   // we don't merge rx bh's... yet!
+       assert(bh->length() < start+length-opos);
+
+       bh->bl.substr_of(bl,
+                                        start+length-opos,
+                                        bh->length());
+       bh->set_version(1);
+       mark_clean(bh);
+       dout(10) << "bh_read_finish read " << *bh << endl;
+  }
 }
 
+
 void ObjectCacher::bh_write(Object *ob, BufferHead *bh)
 {
   assert(0);
-  return(0);
 }
 
 /* public */
 
 int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
 {
+  bool success = true;
+  list<BufferHead*> hit_ls;
+  map<size_t, bufferlist> stripe_map;  // final buffer offset -> substring
+
   for (list<ObjectExtent>::iterator ex_it = rd->extents.begin();
-       ex_it != wr->extents.end();
+       ex_it != rd->extents.end();
        ex_it++) {
+       dout(10) << "readx ex " << *ex_it << endl;
+
+       // get Object cache
+       Object *o;
     if (objects.count(ex_it->oid) == 0) {
-      ObjectCache::Object *o = new ObjectCache::Object(ex_it->oid, ino);
+         // create it.
+      Object *o = new Object(this, ex_it->oid, ino);
       objects[ex_it->oid] = o;
-      objects_by_ino[ino].add(o);
-    }
-    map<block_t, BufferHead*> hits, missing, rx;
+      objects_by_ino[ino].insert(o);
+    } else {
+         // had it.
+         o = objects[ex_it->oid];
+       }
+       
+       // map extent into bufferheads
+       map<off_t, BufferHead*> hits, missing, rx;
+
     o->map_read(rd, hits, missing, rx);
-    for (map<off_t, BufferHead*>::iterator bh_it = hits.begin();
-         bh_it != hits.end();
-         bh_it++) {
-      rd->bl.substr_of(bh_it->second->bl, bh_it->first, bh_it->second->length());
-    }
-    for (map<off_t, BufferHead*>::iterator bh_it = missing.begin();
-         bh_it != missing.end();
-         bh_it++) {
-      bh_read(o, bh_it->second);
-    }
-    for (map<off_t, BufferHead*>::iterator bh_it = rx.begin();
-         bh_it != rx.end();
-         bh_it++) {
-      //FIXME: need to wait here?
-      bh_read(o, bh_it->second);
-    }    
-  }  
-  return(0);
+       
+       if (!missing.empty() && !rx.empty()) {
+         // read missing
+         for (map<off_t, BufferHead*>::iterator bh_it = missing.begin();
+                  bh_it != missing.end();
+                  bh_it++) {
+               bh_read(o, bh_it->second);
+               if (success) {
+                 dout(10) << "readx missed, waiting on " << *bh_it->second 
+                                  << " off " << bh_it->first << endl;
+                 success = false;
+                 bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, ino, onfinish) );
+               }
+         }
+
+         // bump rx
+         for (map<off_t, BufferHead*>::iterator bh_it = rx.begin();
+                  bh_it != rx.end();
+                  bh_it++) {
+               touch_bh(bh_it->second);                // bump in lru, so we don't lose it.
+               if (success) {
+                 dout(10) << "readx missed, waiting on " << *bh_it->second 
+                                  << " off " << bh_it->first << endl;
+                 success = false;
+                 bh_it->second->waitfor_read[bh_it->first].push_back( new C_RetryRead(this, rd, ino, onfinish) );
+               }
+         }       
+       } else {
+         // create reverse map of buffer offset -> object for the eventual result.
+         // this is over a single ObjectExtent, so we know that
+         //  - the bh's are contiguous
+         //  - the buffer frags need not be (and almost certainly aren't)
+         map<off_t, BufferHead*>::iterator bh_it = hits.begin();
+         size_t bhoff = 0;
+         map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
+         size_t foff = 0;
+         off_t opos = ex_it->start;
+         while (1) {
+               BufferHead *bh = bh_it->second;
+               assert(opos == bh->start() + bhoff);
+               size_t len = MIN(f_it->second - foff,
+                                                bh->length() - bhoff);
+               stripe_map[f_it->first].substr_of(bh->bl,
+                                                                                 opos - bh->start(),
+                                                                                 len);
+               opos += len;
+               bhoff += len;
+               foff += len;
+               if (opos == bh->end()) {
+                 bh_it++;
+                 bhoff = 0;
+                 if (bh_it == hits.end()) break;
+               }
+               if (foff == f_it->second) {
+                 f_it++;
+                 foff = 0;
+                 if (f_it == ex_it->buffer_extents.end()) break;
+               }
+         }
+         assert(f_it == ex_it->buffer_extents.end());
+         assert(bh_it == hits.end());
+         assert(opos == ex_it->start + ex_it->length);
+       }
+  }
+  
+  // bump hits in lru
+  for (list<BufferHead*>::iterator bhit = hit_ls.begin();
+          bhit != hit_ls.end();
+          bhit++) 
+       touch_bh(*bhit);
+  
+  if (!success) 
+       return -1;
+
+  // no misses... success!  do the read.
+  assert(!hit_ls.empty());
+  dout(10) << "readx has all buffers" << endl;
+  
+  // ok, assemble into result buffer.
+  rd->bl->clear();
+  size_t pos = 0;
+  for (map<size_t,bufferlist>::iterator i = stripe_map.begin();
+          i != stripe_map.end();
+          i++) {
+       assert(pos == i->first);
+       pos += i->second.length();
+       rd->bl->claim_append(i->second);
+  }
+  
+  return 0;
 }
 
-int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit)
+
+int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
 {
   for (list<ObjectExtent>::iterator ex_it = wr->extents.begin();
-       ex_it != wr->extents.end();
+          ex_it != wr->extents.end();
        ex_it++) {
+       // create object cache?
+       Object *o = 0;
     if (objects.count(ex_it->oid) == 0) {
-      ObjectCache::Object *o = new ObjectCache::Object(ex_it->oid, ino);
+      o = new Object(this, ex_it->oid, ino);
       objects[ex_it->oid] = o;
-      objects_by_ino[ino].add(o);
-    }
-    o->map_write(wr);
-    for (map<off_t, BufferHead*>::iterator bh_it = o->data.begin();
-         bh_it != o->data.end();
-         bh_it++) {
-      bh_it->second->bl.substr_of(wr->bl, ex_it->offset, ex_it->len);
-      bh_set_state(bh_it->second, BufferHead::STATE_DIRTY);
-    }
-    // FIXME: how to set up contexts for eventual writes?
+      objects_by_ino[ino].insert(o);
+    } else {
+         o = objects[ex_it->oid];
+       }
+
+       // map into a single bufferhead.
+    BufferHead *bh = o->map_write(wr);
+       
+       // adjust buffer pointers (ie "copy" data into my cache)
+       // this is over a single ObjectExtent, so we know that
+       //  - there is one contiguous bh
+       //  - the buffer frags need not be (and almost certainly aren't)
+       // note: i assume striping is monotonic... no jumps backwards, ever!
+       off_t opos = ex_it->start;
+       for (map<size_t,size_t>::iterator f_it = ex_it->buffer_extents.begin();
+                f_it != ex_it->buffer_extents.end();
+                f_it++) {
+         size_t bhoff = bh->start() - opos;
+         assert(f_it->second < bh->length() - bhoff);
+
+         bufferlist frag;
+         frag.substr_of(wr->bl, 
+                                        f_it->first, f_it->second);
+         bh->bl.claim_append(frag);
+         opos += f_it->second;
+       }
+
+       mark_dirty(bh);
   }
-  return(0);
+  return 0;
 }
  
+
+// blocking wait for write.
+void ObjectCacher::wait_for_write(size_t len, Mutex& lock)
+{
+  while (get_stat_dirty() + len > g_conf.client_oc_max_dirty) {
+       dout(10) << "wait_for_write waiting" << endl;
+       stat_waiter++;
+       stat_cond.Wait(lock);
+       stat_waiter--;
+       dout(10) << "wait_for_write woke up" << endl;
+  }
+}
+
   
 // blocking.  atomic+sync.
-int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
+int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock)
 {
   assert(0);
   return 0;
 }
 
-int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit)
+int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock)
 {
   assert(0);
   return 0;
index 6ac5bdbdec33886c64896baaaa901eb0f1781d67..a7a2117b6eee71d7a60a5609165ae69d45f001ec 100644 (file)
@@ -3,6 +3,7 @@
 
 #include "include/types.h"
 #include "include/lru.h"
+#include "include/Context.h"
 
 #include "common/Cond.h"
 
@@ -13,7 +14,7 @@ class Objecter::OSDRead;
 class Objecter::OSDWrite;
 
 class ObjectCacher {
- private:
+ public:
   // ******* BufferHead *********
   class BufferHead : public LRUObject {
   public:
@@ -91,6 +92,7 @@ class ObjectCacher {
   class Object {
   private:
        // ObjectCacher::Object fields
+       ObjectCacher *oc;
        object_t  oid;
        inodeno_t ino;
        
@@ -104,7 +106,8 @@ class ObjectCacher {
        int lock_state;
 
   public:
-       Object(object_t o, inodeno_t i) : 
+       Object(ObjectCacher *_oc, object_t o, inodeno_t i) : 
+         oc(_oc),
          oid(o), ino(i), 
          lock_state(LOCK_NONE) 
          {}
@@ -144,9 +147,17 @@ class ObjectCacher {
        bool is_empty() { return data.empty(); }
 
        // mid-level
+       int scan_versions(off_t start, off_t len,
+                                         version_t& low, version_t& high);
        BufferHead *split(BufferHead *bh, off_t off);
-       int map_read(Objecter::OSDRead *rd);
-       int map_write(Objecter::OSDWrite *wr);
+       void merge(BufferHead *left, BufferHead *right);
+
+       int map_read(Objecter::OSDRead *rd,
+                                map<off_t, BufferHead*>& hits,
+                                map<off_t, BufferHead*>& missing,
+                                map<off_t, BufferHead*>& rx);
+       BufferHead *map_write(Objecter::OSDWrite *wr);
+
   };
 
   // ******* ObjectCacher *********
@@ -194,6 +205,13 @@ class ObjectCacher {
   off_t get_stat_dirty() { return stat_dirty; }
   off_t get_stat_clean() { return stat_clean; }
 
+  void touch_bh(BufferHead *bh) {
+       if (bh->is_dirty())
+         lru_dirty.lru_touch(bh);
+       else
+         lru_rest.lru_touch(bh);
+  }
+
   // bh states
   void bh_set_state(BufferHead *bh, int s) {
        // move between lru lists?
@@ -227,12 +245,30 @@ class ObjectCacher {
        //bh->set_dirty_stamp(g_clock.now());
   };
 
+
+
   // io
-  bool bh_cancel_read(BufferHead *bh);
-  bool bh_cancel_write(BufferHead *bh);
   void bh_read(Object *ob, BufferHead *bh);
   void bh_write(Object *ob, BufferHead *bh);
 
+ public:
+  void bh_read_finish(object_t oid, off_t offset, size_t length, bufferlist &bl);
+  void bh_write_finish(object_t oid, off_t offset, size_t length, version_t v);
+
+  class C_ReadFinish : public Context {
+       ObjectCacher *oc;
+       object_t oid;
+       off_t start;
+       size_t length;
+  public:
+       bufferlist bl;
+       C_ReadFinish(ObjectCacher *c, object_t o, off_t s, size_t l) : oc(c), oid(o), start(s), length(l) {}
+       void finish(int r) {
+         oc->bh_read_finish(oid, start, length, bl);
+       }
+  };
+
+
 
  public:
   ObjectCacher(Objecter *o) : 
@@ -241,13 +277,29 @@ class ObjectCacher {
        stat_clean(0), stat_dirty(0), stat_rx(0), stat_tx(0), stat_missing(0)
        {}
 
-  // blocking.  async.
+
+  class C_RetryRead : public Context {
+       ObjectCacher *oc;
+       Objecter::OSDRead *rd;
+       inodeno_t ino;
+       Context *onfinish;
+  public:
+       C_RetryRead(ObjectCacher *_oc, Objecter::OSDRead *r, inodeno_t i, Context *c) : oc(_oc), rd(r), ino(i), onfinish(c) {}
+       void finish(int) {
+         oc->readx(rd, ino, onfinish);
+       }
+  };
+
+  // non-blocking.  async.
   int readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish);
-  int writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit);
+  int writex(Objecter::OSDWrite *wr, inodeno_t ino);
+
+  // write blocking
+  void wait_for_write(size_t len, Mutex& lock);
   
   // blocking.  atomic+sync.
-  int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish);
-  int atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Context *onack, Context *oncommit);
+  int atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex& lock);
+  int atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mutex& lock);
 
   void flush_set(inodeno_t ino, Context *onfinish=0);
   void flush_all(Context *onfinish=0);
@@ -257,4 +309,18 @@ class ObjectCacher {
 };
 
 
+inline ostream& operator<<(ostream& out, ObjectCacher::BufferHead &bh)
+{
+  out << "bh["
+         << bh.start() << "~" << bh.end()
+         << " v " << bh.get_version();
+  if (bh.is_tx()) out << " tx";
+  if (bh.is_rx()) out << " rx";
+  if (bh.is_dirty()) out << " dirty";
+  if (bh.is_clean()) out << " clean";
+  if (bh.is_missing()) out << " missing";
+  out << "]";
+  return out;
+}
+
 #endif
index 725b3cd47b34884a045ca637a8fd3565b194a970..af8e7131f13d6541f58d4db7bb0d1550d898d0be 100644 (file)
@@ -113,10 +113,10 @@ int Objecter::readx(OSDRead *rd, Context *onfinish)
        MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
                                                   it->oid, it->pgid, osdmap->get_epoch(), 
                                                   OSD_OP_READ);
-       m->set_length(it->len);
-       m->set_offset(it->offset);
+       m->set_length(it->length);
+       m->set_offset(it->start);
        dout(15) << " read tid " << last_tid << " from osd" << osd 
-                        << " oid " << hex << it->oid << dec  << " off " << it->offset << " len " << it->len 
+                        << " oid " << hex << it->oid << dec  << " " << it->start << "~" << it->length
                         << " (" << it->buffer_extents.size() << " buffer fragments)" << endl;
        messenger->send_message(m, MSG_ADDR_OSD(osd), 0);
        
@@ -174,13 +174,13 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
                bufferlist *ox_buf = rd->read_data[eit->oid];
                unsigned ox_len = ox_buf->length();
                unsigned ox_off = 0;
-               assert(ox_len <= eit->len);           
+               assert(ox_len <= eit->length);           
 
                // for each buffer extent we're mapping into...
                for (map<size_t,size_t>::iterator bit = eit->buffer_extents.begin();
                         bit != eit->buffer_extents.end();
                         bit++) {
-                 dout(21) << " object " << hex << eit->oid << dec << " extent " << eit->offset << " len " << eit->len << " : ox offset " << ox_off << " -> buffer extent " << bit->first << " len " << bit->second << endl;
+                 dout(21) << " object " << hex << eit->oid << dec << " extent " << eit->start << "~" << eit->length << " : ox offset " << ox_off << " -> buffer extent " << bit->first << "~" << bit->second << endl;
                  by_off[bit->first] = new bufferlist;
 
                  if (ox_off + bit->second <= ox_len) {
@@ -210,7 +210,7 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
                  }
                  ox_off += bit->second;
                }
-               assert(ox_off == eit->len);
+               assert(ox_off == eit->length);
          }
 
          // sort and string bits together
@@ -301,8 +301,8 @@ int Objecter::writex(OSDWrite *wr, Context *onack, Context *oncommit)
        MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
                                                   it->oid, it->pgid, osdmap->get_epoch(),
                                                   OSD_OP_WRITE);
-       m->set_length(it->len);
-       m->set_offset(it->offset);
+       m->set_length(it->length);
+       m->set_offset(it->start);
        
        // map buffer segments into this extent
        // (may be fragmented bc of striping)
@@ -314,10 +314,10 @@ int Objecter::writex(OSDWrite *wr, Context *onack, Context *oncommit)
          thisbit.substr_of(wr->bl, bit->first, bit->second);
          cur.claim_append(thisbit);
        }
-       assert(cur.length() == it->len);
+       assert(cur.length() == it->length);
        m->set_data(cur);//.claim(cur);
 
-       off += it->len;
+       off += it->length;
 
        // add to gather set
        if (onack)
@@ -334,7 +334,7 @@ int Objecter::writex(OSDWrite *wr, Context *onack, Context *oncommit)
 
        // send
        dout(10) << " write tid " << last_tid << " osd" << osd 
-                        << "  oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len << endl;
+                        << "  oid " << hex << it->oid << dec << " " << it->start << "~" << it->length << endl;
        messenger->send_message(m, MSG_ADDR_OSD(osd), 0);
   }
 
@@ -431,10 +431,10 @@ int Objecter::zerox(OSDZero *z, Context *onack, Context *oncommit)
        MOSDOp *m = new MOSDOp(last_tid, messenger->get_myaddr(),
                                                   it->oid, it->pgid, osdmap->get_epoch(),
                                                   OSD_OP_ZERO);
-       m->set_length(it->len);
-       m->set_offset(it->offset);
+       m->set_length(it->length);
+       m->set_offset(it->start);
        
-       off += it->len;
+       off += it->length;
 
        // add to gather set
        if (onack)
@@ -451,7 +451,7 @@ int Objecter::zerox(OSDZero *z, Context *onack, Context *oncommit)
 
        // send
        dout(10) << " zero tid " << last_tid << " osd" << osd 
-                        << "  oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len << endl;
+                        << "  oid " << hex << it->oid << dec << " " << it->start << "~" << it->length << endl;
        messenger->send_message(m, MSG_ADDR_OSD(osd), 0);
   }
 
index 9321c8ca924c4f33dc641595d9a681578e9ef68d..b524599da5b3e590c8e55e90705e1265b2b1fbd1 100644 (file)
@@ -17,18 +17,26 @@ class Message;
 
 
 // new types
-typedef __uint64_t tid_t;
+typedef __uint64_t tid_t;   // transaction id
 
 class ObjectExtent {
  public:
   object_t    oid;       // object id
-  pg_t        pgid; 
-  size_t      offset, len;   // extent within the object
+  pg_t        pgid;     
+  off_t       start;     // in object
+  size_t      length;    // in object
   map<size_t, size_t>  buffer_extents;  // off -> len.  extents in buffer being mapped (may be fragmented bc of striping!)
-
-  ObjectExtent(object_t o=0, off_t off=0, size_t l=0) : oid(o), offset(off), len(l) { }
+  
+  ObjectExtent(object_t o=0, off_t s=0, size_t l=0) : oid(o), start(s), length(l) { }
 };
 
+inline ostream& operator<<(ostream& out, ObjectExtent &ex)
+{
+  return out << "extent(" 
+                        << hex << ex.oid << " in " << ex.pgid << dec
+                        << " " << ex.start << "~" << ex.length;
+}
+
 class Objecter {
  public:  
   Messenger *messenger;