]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
stabilized new rados logging stuff
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 4 Aug 2006 23:15:35 +0000 (23:15 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Fri, 4 Aug 2006 23:15:35 +0000 (23:15 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@787 29311d96-e01e-0410-9327-a35deaab8ce9

17 files changed:
ceph/Makefile
ceph/TODO
ceph/client/Client.cc
ceph/config.cc
ceph/config.h
ceph/include/types.h
ceph/messages/MOSDOp.h
ceph/msg/NewMessenger.cc
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/PG.cc
ceph/osd/PG.h
ceph/osdc/Filer.h
ceph/osdc/ObjectCacher.cc
ceph/osdc/ObjectCacher.h
ceph/osdc/Objecter.cc
ceph/osdc/Objecter.h

index 762dc9ae97b0f5504eaf6e35f107600123365e1f..60089928dfda4ad5ea0640267c948bb01714557e 100644 (file)
@@ -49,21 +49,24 @@ OSD_OBJS= \
        ebofs.o\
        osd/OSD.o
 
+OSDC_OBJS= \
+       client/Filer.o\
+       client/ObjectCacher.o\
+       client/Objecter.o
+
 COMMON_OBJS= \
        msg/Messenger.o\
        msg/Dispatcher.o\
        msg/HostMonitor.o\
-       client/Filer.o\
-       client/FileCache.o\
-       client/ObjectCacher.o\
-       client/Objecter.o\
        mds/MDCluster.o\
        common/Logger.o\
        common/Clock.o\
        common/Timer.o\
        config.o
 
+
 CLIENT_OBJS= \
+       client/FileCache.o\
        client/Client.o\
        client/SyntheticClient.o\
        client/Trace.o
@@ -87,7 +90,10 @@ obfs: depend obfstest
 cosd: cosd.cc osd.o msg/NewMessenger.o common.o
        ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
 
-cfuse: cfuse.cc client.o client/fuse.o msg/NewMessenger.o common.o
+cmds: cmds.cc mds.o osdc.o msg/NewMessenger.o common.o
+       ${CC} ${CFLAGS} ${MPILIBS} $^ -o $@
+
+cfuse: cfuse.cc client.o osdc.o client/fuse.o msg/NewMessenger.o common.o
        ${CC} ${CFLAGS} ${LIBS} -lfuse $^ -o $@
 
 
@@ -109,16 +115,16 @@ mpifuse: mpifuse.cc mds.o client.o client/fuse.o ${TCP_OBJS} common.o
 
 
 # synthetic workload
-fakesyn: fakesyn.cc mds.o client.o osd.o msg/FakeMessenger.o common.o
+fakesyn: fakesyn.cc mds.o client.o osd.o osdc.o msg/FakeMessenger.o common.o
        ${CC} -pg ${CFLAGS} ${LIBS} $^ -o $@
 
-mpisyn: mpisyn.cc mds.o client.o osd.o msg/MPIMessenger.cc common.o
+mpisyn: mpisyn.cc mds.o client.o osd.o osdc.o msg/MPIMessenger.cc common.o
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
-tcpsyn: tcpsyn.cc mds.o client.o osd.o ${TCP_OBJS} common.o
+tcpsyn: tcpsyn.cc mds.o client.o osd.o osdc.o ${TCP_OBJS} common.o
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
-newsyn: newsyn.cc mds.o client.o osd.o msg/NewMessenger.o common.o
+newsyn: newsyn.cc mds.o client.o osd.o osdc.o msg/NewMessenger.o common.o
        ${MPICC} ${MPICFLAGS} ${MPILIBS} $^ -o $@
 
 # + obfs
@@ -142,7 +148,7 @@ ebofs: mkfs.ebofs test.ebofs
 
 
 # libceph
-libceph.o: client/ldceph.o client/Client.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS}
+libceph.o: client/ldceph.o client/Client.o ${TCP_OBJS} ${COMMON_OBJS} ${SYN_OBJS} ${OSDC_OBJS}
        ld -i $^ -o $@
 
 bench/mdtest/mdtest.o: bench/mdtest/mdtest.c
@@ -173,12 +179,15 @@ common.o: ${COMMON_OBJS}
 ebofs.o: ${EBOFS_OBJS}
        ld -i -o $@ $^
 
-client.o: ${CLIENT_OBJS}
+client.o: ${CLIENT_OBJS} 
        ld -i -o $@ $^
 
 osd.o: ${OSD_OBJS}
        ld -i -o $@ $^
 
+osdc.o: ${OSDC_OBJS}
+       ld -i -o $@ $^
+
 osd_obfs.o: osd/OBFSStore.o osd/OSD.ccosd/PG.o osd/ObjectStore.o osd/FakeStore.o
        ${MPICC} -DUSE_OBFS ${MPICFLAGS} ${MPILIBS} $^ -o $@ ../uofs/uofs.a
 
index 9747f883145b538c1f86fec2fe33f5445a139821..330d962d8276c6a971917fc1806ae4784b68be6f 100644 (file)
--- a/ceph/TODO
+++ b/ceph/TODO
@@ -1,6 +1,6 @@
 
 
-FAST rados paper
+== FAST rados paper
 
 
 cluster map
@@ -31,6 +31,10 @@ ebofs
 
  
 
+
+
+== rados client nodes
+
 why do we want client op ordering?
 - simpler logic in objectcacher
 - can pipeline lock + write + unlock, etc.
@@ -69,7 +73,6 @@ Objecter:
 
 - accept acks from current|prior primary.
 - only accept commits from current primary.
-
 - need pg map
   - to detect primary changes,
   - pg crashes
@@ -77,26 +80,26 @@ Objecter:
 
 
 
+== todo
+
 
 
-- gzip in messenger?
 
-remaining hard problems
-- how to cope with file size changes and read/write sharing
-- how to choose osd monitor peer sets
-- mds failure recovery (of course)
 
+- gzip in messenger?
 
 osd/rados
 /- add caller to log
 /- make modify ops idempotent
 - rdlocks
+- test wrnoop
 - pg_bit changes
 - use pg->info.same_role_since wrt replication ops.
 - report crashed pgs?
 - recover period for crashed pgs
 
 osdmonitor
+- monitor pgs
 - watch osd utilization; adjust overload in cluster map
 
 objecter
@@ -104,12 +107,11 @@ objecter
 /- detect, cope with crashed PGs, wait for map updates, etc.
 /- filer read/write: flip offset,size args
 - replay ops (in order) after pg crash
-- sequential lock acquisition
 
 objectcacher
-- locking
-- atomic read/write ops
-- flusher
+- ocacher caps transitions vs locks
+- ocacher flushing, pin Objects* while locks held
+- test read locks
 
 
 reliability
@@ -126,6 +128,10 @@ bugs/stability
 general
 - timer needs cancel sets, schedulers need to cancel outstanding events on shutdown
 
+remaining hard problems
+- how to cope with file size changes and read/write sharing
+- mds failure recovery (of course)
+
 
 crush
 - more efficient failure when all/too many osds are down
index 64b3e4b698cfba90266bd9c882be24815239fdd0..0d22030847f783adae27d4f376b806c78cdacb8f 100644 (file)
@@ -1770,15 +1770,11 @@ int Client::read(fh_t fh, char *buf, off_t size, off_t offset)
 
     r = filer->read(in->inode, offset, size, &blist, onfinish);
 
-       if (r == 0) {
-         // wait!
-         while (!done)
-               cond.Wait(client_lock);
-       } else {
-         // had it cached, apparently!
-         assert(r > 0);
-         delete onfinish;
-       }
+       assert(r >= 0);
+
+       // wait!
+       while (!done)
+         cond.Wait(client_lock);
   }
   
   // copy data into caller's char* buf
index 0464d26670536b6a09e5a7c65ee93685be7d7c41..d1c0bf18fcaa8866a53dbb09b75057c169741612 100644 (file)
@@ -174,9 +174,9 @@ md_config_t g_conf = {
   osd_maxthreads: 2,    // 0 == no threading
   osd_max_opq: 10,
   osd_mkfs: false,
-  osd_fake_sync: false,
   osd_age: .8,
   osd_age_time: 0,
+  osd_heartbeat_interval: 10,
   
   // --- fakestore ---
   fakestore_fake_sync: 2,    // 2 seconds
index 620053eebb9eab69dcffd1d8fa85204cfe592826..8b7f533ee648bad0b996eeadc4df219b2023086a 100644 (file)
@@ -147,10 +147,9 @@ struct md_config_t {
   int   osd_maxthreads;
   int   osd_max_opq;
   bool  osd_mkfs;
-  bool  osd_fake_sync;
   float   osd_age;
   int   osd_age_time;
-
+  int   osd_heartbeat_interval;
 
   int   fakestore_fake_sync;
   bool  fakestore_fsync;
index 1a4e60835566cb09c996bc877f529c6c3dc01dee..829242b7aa1a924c1e68543db726ffff34627549 100644 (file)
@@ -263,7 +263,7 @@ inline bool operator>=(const eversion_t& l, const eversion_t& r) {
   return (l.epoch == r.epoch) ? (l.version >= r.version):(l.epoch >= r.epoch);
 }
 inline ostream& operator<<(ostream& out, const eversion_t e) {
-  return out << e.epoch << "." << e.version;
+  return out << e.epoch << "'" << e.version;
 }
 
 
index fd9ae0669d7bf382ea699e5499998fb5c442f167..8d73570c68289611995c37eaad8899361a9976dd 100644 (file)
@@ -58,6 +58,8 @@
 #define OSD_OP_REP_WRUNLOCK (100+OSD_OP_WRUNLOCK)
 #define OSD_OP_REP_RDLOCK   (100+OSD_OP_RDLOCK)
 #define OSD_OP_REP_RDUNLOCK (100+OSD_OP_RDUNLOCK)
+#define OSD_OP_REP_UPLOCK   (100+OSD_OP_UPLOCK)
+#define OSD_OP_REP_DNLOCK   (100+OSD_OP_DNLOCK)
 
 #define OSD_OP_REP_PULL     30   // whole object read
 //#define OSD_OP_REP_PUSH     31   // whole object write
@@ -103,6 +105,10 @@ public:
        case OSD_OP_TRUNCATE: return "truncate"; 
        case OSD_OP_WRLOCK: return "wrlock"; 
        case OSD_OP_WRUNLOCK: return "wrunlock"; 
+       case OSD_OP_RDLOCK: return "rdlock"; 
+       case OSD_OP_RDUNLOCK: return "rdunlock"; 
+       case OSD_OP_UPLOCK: return "uplock"; 
+       case OSD_OP_DNLOCK: return "dnlock"; 
        case OSD_OP_WRNOOP: return "wrnoop"; 
        default: assert(0);
        }
index 6a6c11f958e574b26afccdad9aa381d4a6c2100d..9bca8c6ef2c423e10fa3d0f5a3f8a3ed39baf030 100644 (file)
@@ -401,7 +401,7 @@ void *Rank::Receiver::entry()
                                  << " inst " << m->get_source_inst() 
                                  << " > " << rank.entity_map[m->get_source()] 
                                  << ", WATCH OUT " << *m << endl;
-                 //rank.entity_map[m->get_source()] = m->get_source_inst();
+                 rank.entity_map[m->get_source()] = m->get_source_inst();
                }
 
                if (m->get_dest().type() == MSG_ADDR_RANK_BASE) {
@@ -1512,9 +1512,17 @@ void Rank::mark_up(msg_addr_t a, entity_inst_t& i)
 
        assert(i.rank != my_rank);     // hrm?
        
-       assert(entity_map.count(a) == 0);
-       entity_map[a] = i;
-       connect_rank(i);
+       if (entity_map.count(a) == 0 ||
+               entity_map[a] < i) {
+         entity_map[a] = i;
+         connect_rank(i);
+       } else if (entity_map[a] == i) {
+         dout(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl;
+         derr(10) << "mark_up " << a << " inst " << i << " ... knew it" << endl;
+       } else {
+         dout(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl;
+         derr(-10) << "mark_up " << a << " inst " << i << " < " << entity_map[a] << endl;
+       }
 
        //if (waiting_for_lookup.count(a))
        //lookup(a);
index 55ebd25e61a5bac3344473d22d4ddc1d0ec96405..dc0f13c041017603db9183bdd1e70628f72b1426 100644 (file)
 #include "msg/Messenger.h"
 #include "msg/Message.h"
 
-#include "msg/HostMonitor.h"
+//#include "msg/HostMonitor.h"
 
 #include "messages/MGenericMessage.h"
 #include "messages/MPing.h"
 #include "messages/MPingAck.h"
+#include "messages/MOSDPing.h"
 #include "messages/MOSDOp.h"
 #include "messages/MOSDOpReply.h"
 #include "messages/MOSDBoot.h"
@@ -118,6 +119,9 @@ OSD::OSD(int id, Messenger *m, char *dev)
   if (g_conf.osd_remount_at) 
        g_timer.add_event_after(g_conf.osd_remount_at, new C_Remount(this));
 
+  next_heartbeat = new C_Heartbeat(this);
+  g_timer.add_event_after(g_conf.osd_heartbeat_interval, next_heartbeat);
+                                                                                  
 
   // init object store
   // try in this order:
@@ -159,7 +163,7 @@ OSD::~OSD()
 {
   if (threadpool) { delete threadpool; threadpool = 0; }
   if (osdmap) { delete osdmap; osdmap = 0; }
-  if (monitor) { delete monitor; monitor = 0; }
+  //if (monitor) { delete monitor; monitor = 0; }
   if (messenger) { delete messenger; messenger = 0; }
   if (logger) { delete logger; logger = 0; }
   if (store) { delete store; store = 0; }
@@ -206,7 +210,8 @@ int OSD::init()
        }
 
        // monitor
-       char s[80];
+       /*
+    char s[80];
        sprintf(s, "osd%d", whoami);
        string st = s;
        monitor = new HostMonitor(messenger, st);
@@ -223,6 +228,7 @@ int OSD::init()
        
        monitor->get_notify().insert(MSG_ADDR_MON(0));
        // </hack>
+       */
        
        // log
        char name[80];
@@ -270,6 +276,8 @@ int OSD::shutdown()
 {
   dout(1) << "shutdown, timer has " << g_timer.num_event << endl;
 
+  if (next_heartbeat) g_timer.cancel_event(next_heartbeat);
+
   // finish ops
   wait_for_no_ops();
 
@@ -286,7 +294,7 @@ int OSD::shutdown()
   pg_map.clear();
 
   // shut everything else down
-  monitor->shutdown();
+  //monitor->shutdown();
   messenger->shutdown();
 
   osd_lock.Unlock();
@@ -418,28 +426,108 @@ void OSD::_remove_pg(pg_t pgid)
 }
 
 
+// -------------------------------------
+
+void OSD::heartbeat()
+{
+  osd_lock.Lock();
+
+  utime_t now = g_clock.now();
+  utime_t since = now;
+  since.sec_ref() -= g_conf.osd_heartbeat_interval;
+
+  dout(15) << "heartbeat " << now << endl;
+
+  // send pings
+  set<int> pingset;
+  for (hash_map<pg_t, PG*>::iterator i = pg_map.begin();
+          i != pg_map.end();
+          i++) {
+       PG *pg = i->second;
+
+       // we want to ping the primary.
+       if (pg->get_role() <= 0) continue;   
+       if (pg->acting.size() < 1) continue; 
+
+       if (pg->last_heartbeat < since) {
+         pg->last_heartbeat = now;
+         pingset.insert(pg->acting[0]);
+       }
+  }
+  for (set<int>::iterator i = pingset.begin();
+          i != pingset.end();
+          i++)
+       messenger->send_message(new MOSDPing(osdmap->get_epoch()), 
+                                                       MSG_ADDR_OSD(*i));
+  
+
+  // schedule next!
+  next_heartbeat = new C_Heartbeat(this);
+  g_timer.add_event_after(g_conf.osd_heartbeat_interval, next_heartbeat);
+
+  osd_lock.Unlock();  
+}
+
 
 
 // --------------------------------------
 // dispatch
 
+void OSD::share_map(msg_addr_t who, epoch_t epoch)
+{
+  // does client have old map?
+  if (who.is_client()) {
+       if (epoch < osdmap->get_epoch()) {
+         dout(10) << who << " has old map " << epoch << " < " << osdmap->get_epoch() << endl;
+         send_incremental_map(epoch, who, true);
+       }
+  }
+
+  // does peer have old map?
+  if (who.is_osd()) {
+       // remember
+       if (peer_map_epoch[who] < epoch)
+         peer_map_epoch[who] = epoch;
+       
+       // older?
+       if (peer_map_epoch[who] < osdmap->get_epoch()) {
+         dout(10) << who << " has old map " << epoch << " < " << osdmap->get_epoch() << endl;
+         send_incremental_map(epoch, who, true);
+         peer_map_epoch[who] = osdmap->get_epoch();  // so we don't send it again.
+       }
+  }
+}
+
 void OSD::dispatch(Message *m) 
 {
   // check clock regularly
-  utime_t now = g_clock.now();
+  //utime_t now = g_clock.now();
   //dout(-20) << now << endl;
 
+  // -- don't need lock --
+  switch (m->get_type()) {
+  case MSG_PING:
+       dout(10) << "ping from " << m->get_source() << endl;
+       delete m;
+       return;
+
+  }
+
+
+  // lock!
   osd_lock.Lock();
 
   switch (m->get_type()) {
 
        // -- don't need OSDMap --
 
+       /*
        // host monitor
   case MSG_PING_ACK:
   case MSG_FAILURE_ACK:
        monitor->proc_message(m);
        break;
+       */
 
        // map and replication
   case MSG_OSD_MAP:
@@ -453,11 +541,6 @@ void OSD::dispatch(Message *m)
        break;
        
        
-  case MSG_PING:
-       // take note.
-       monitor->host_is_alive(m->get_source());
-       handle_ping((MPing*)m);
-       break;
 
        // -- need OSDMap --
 
@@ -481,8 +564,18 @@ void OSD::dispatch(Message *m)
                break;
          }
 
+
+         
+
+
          // need OSDMap
          switch (m->get_type()) {
+
+         case MSG_OSD_PING:
+               // take note.
+               dout(20) << "osdping from " << m->get_source() << endl;
+               share_map(m->get_source(), ((MOSDPing*)m)->map_epoch);
+               break;
                
          case MSG_OSD_PG_NOTIFY:
                handle_pg_notify((MOSDPGNotify*)m);
@@ -498,13 +591,11 @@ void OSD::dispatch(Message *m)
                break;
 
          case MSG_OSD_OP:
-               monitor->host_is_alive(m->get_source());
                handle_op((MOSDOp*)m);
                break;
                
                // for replication etc.
          case MSG_OSD_OPREPLY:
-               monitor->host_is_alive(m->get_source());
                handle_op_reply((MOSDOpReply*)m);
                break;
                
@@ -557,6 +648,7 @@ void OSD::handle_op_reply(MOSDOpReply *m)
        op_rep_pull_reply(m);
        break;
 
+  case OSD_OP_REP_WRNOOP:
   case OSD_OP_REP_WRITE:
   case OSD_OP_REP_TRUNCATE:
   case OSD_OP_REP_DELETE:
@@ -564,6 +656,8 @@ void OSD::handle_op_reply(MOSDOpReply *m)
   case OSD_OP_REP_WRUNLOCK:
   case OSD_OP_REP_RDLOCK:
   case OSD_OP_REP_RDUNLOCK:
+  case OSD_OP_REP_UPLOCK:
+  case OSD_OP_REP_DNLOCK:
        {
          const pg_t pgid = m->get_pg();
          if (pg_map.count(pgid)) {
@@ -649,6 +743,7 @@ void OSD::handle_rep_op_ack(PG *pg, __uint64_t tid, int result, bool commit,
 
 
 
+/*
 void OSD::handle_ping(MPing *m)
 {
   dout(7) << "got ping, replying" << endl;
@@ -656,6 +751,7 @@ void OSD::handle_ping(MPing *m)
                                                  m->get_source(), m->get_source_port(), 0);
   delete m;
 }
+*/
 
 
 
@@ -807,7 +903,7 @@ void OSD::handle_osd_map(MOSDMap *m)
   // all the way?
   if (advanced && cur == superblock.newest_map) {
        // yay!
-       activate_map();
+       activate_map(t);
        
        // process waiters
        take_waiters(waiting_for_osdmap);
@@ -864,7 +960,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
                  pg->info.last_epoch_started = 
                  pg->info.same_primary_since = 
                  pg->info.same_role_since = osdmap->get_epoch();
-               pg->state_set(PG::STATE_ACTIVE);  
+               pg->activate(t);
                
                dout(7) << "created " << *pg << endl;
          }
@@ -881,8 +977,8 @@ void OSD::advance_map(ObjectStore::Transaction& t)
                pg->info.last_epoch_started = 
                pg->info.same_primary_since = 
                pg->info.same_role_since = osdmap->get_epoch();
-         pg->state_set(PG::STATE_ACTIVE);  
-         
+         pg->activate(t);
+               
          dout(7) << "created " << *pg << endl;
        }
 
@@ -1040,7 +1136,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
   }
 }
 
-void OSD::activate_map()
+void OSD::activate_map(ObjectStore::Transaction& t)
 {
   dout(7) << "activate_map version " << osdmap->get_epoch() << endl;
 
@@ -1061,7 +1157,7 @@ void OSD::activate_map()
        else if (pg->get_role() == 0 && !pg->is_active()) {
          // i am (inactive) primary
          pg->build_prior();
-         pg->peer(query_map);
+         pg->peer(t, query_map);
        }
        else if (pg->is_stray()) {
          // i am residual|replica
@@ -1141,29 +1237,20 @@ bool OSD::get_inc_map(epoch_t e, OSDMap::Incremental &inc)
 
 bool OSD::require_current_map(Message *m, epoch_t ep) 
 {
-  int from = MSG_ADDR_NUM(m->get_source());
-
   // older map?
   if (ep < osdmap->get_epoch()) {
-       dout(7) << "  from old map epoch " << ep << " < " << osdmap->get_epoch() << endl;
+       dout(7) << "require_current_map epoch " << ep << " < " << osdmap->get_epoch() << endl;
        delete m;   // discard and ignore.
        return false;
   }
 
   // newer map?
   if (ep > osdmap->get_epoch()) {
-       dout(7) << "  from newer map epoch " << ep << " > " << osdmap->get_epoch() << endl;
+       dout(7) << "require_current_map epoch " << ep << " > " << osdmap->get_epoch() << endl;
        wait_for_new_map(m);
        return false;
   }
 
-  // down?
-  if (osdmap->is_down(from)) {
-       dout(7) << "  from down OSD osd" << from << ", dropping" << endl;
-       // FIXME
-       return false;
-  }
-
   assert(ep == osdmap->get_epoch());
   return true;
 }
@@ -1175,6 +1262,8 @@ bool OSD::require_current_map(Message *m, epoch_t ep)
  */
 bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch)
 {
+  dout(10) << "require_same_or_newer_map " << epoch << " (i am " << osdmap->get_epoch() << ")" << endl;
+
   // newer map?
   if (epoch > osdmap->get_epoch()) {
        dout(7) << "  from newer map epoch " << epoch << " > " << osdmap->get_epoch() << endl;
@@ -1188,21 +1277,6 @@ bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch)
        return false;
   }
 
-  // down osd?
-  if (m->get_source().is_osd() &&
-         osdmap->is_down(m->get_source().num())) {
-       if (epoch > osdmap->get_epoch()) {
-         dout(7) << "msg from down " << m->get_source()
-                         << ", waiting for new map" << endl;
-         wait_for_new_map(m);
-       } else {
-         dout(7) << "msg from down " << m->get_source()
-                         << ", dropping" << endl;
-         delete m;
-       }
-       return false;
-  }
-
   return true;
 }
 
@@ -1301,7 +1375,7 @@ void OSD::load_pgs()
          if (pg->acting[i] == whoami) role = i>0 ? 1:0;
        pg->set_role(role);
 
-       dout(10) << "load_pgs loaded " << *pg << endl;
+       dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl;
   }
 }
  
@@ -1494,7 +1568,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
                pg->adjust_prior();
          
          // peer
-         pg->peer(query_map);
+         pg->peer(t, query_map);
        }
 
        _unlock_pg(pgid);
@@ -1550,12 +1624,10 @@ void OSD::handle_pg_log(MOSDPGLog *m)
 
        // merge into our own log
        pg->merge_log(m->log, m->missing, from);
-
-       pg->clean_up_local();
        
        // peer
        map< int, map<pg_t,PG::Query> > query_map;
-       pg->peer(query_map);
+       pg->peer(t, query_map);
        do_queries(query_map);
 
   } else {
@@ -1566,22 +1638,14 @@ void OSD::handle_pg_log(MOSDPGLog *m)
        pg->merge_log(m->log, m->missing, from);
        assert(pg->missing.num_lost() == 0);
 
-       // clean up any stray objects
-       pg->clean_up_local();
-
        // ok active!
-       pg->info.last_epoch_started = osdmap->get_epoch();
        pg->info.same_primary_since = m->info.same_primary_since;
-       pg->state_set(PG::STATE_ACTIVE);
+       pg->activate(t);
 
        // take any waiters
        take_waiters(pg->waiting_for_active);
-
-       // initiate any recovery?
-       pg->start_recovery();
   }
 
-  pg->write_log(t);
   unsigned tr = store->apply_transaction(t);
   assert(tr == 0);
 
@@ -1732,7 +1796,7 @@ void OSD::handle_pg_remove(MOSDPGRemove *m)
        
        _remove_pg(pgid);
 
-       // unlock.  there shouldn't be any waiters, since we're a stray, and pg is presumably clean.
+       // unlock.  there shouldn't be any waiters, since we're a stray, and pg is presumably clean0.
        assert(pg_lock_waiters.count(pgid) == 0);
        _unlock_pg(pgid);
   }
@@ -1760,10 +1824,9 @@ void OSD::pull(PG *pg, object_t oid, eversion_t v)
   assert(pg->missing.loc.count(oid));
   int osd = pg->missing.loc[oid];
   
-  dout(7) << "pull " << hex << oid << dec 
+  dout(7) << *pg << " pull " << hex << oid << dec 
                  << " v " << v 
                  << " from osd" << osd
-                 << " in " << *pg
                  << endl;
 
   // send op
@@ -1878,8 +1941,6 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op)
   t.write(oid, 0, op->get_length(), op->get_data());
   t.setattrs(oid, op->get_attrset());
   t.collection_add(pgid, oid);
-  unsigned r = store->apply_transaction(t);
-  assert(r == 0);
 
   // close out pull op.
   pg->objects_pulling.erase(oid);
@@ -1890,37 +1951,23 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op)
        take_waiters(pg->waiting_for_missing_object[oid]);
 
   // raise last_complete?
+  assert(pg->log.complete_to != pg->log.log.end());
   while (pg->log.complete_to != pg->log.log.end()) {
        if (pg->missing.missing.count(pg->log.complete_to->oid)) break;
-       pg->info.last_complete = pg->log.complete_to->version;
-       pg++;
+       if (pg->info.last_complete < pg->log.complete_to->version)
+         pg->info.last_complete = pg->log.complete_to->version;
+       pg->log.complete_to++;
   }
   dout(10) << *pg << " last_complete now " << pg->info.last_complete << endl;
   
-  if (pg->missing.num_missing() == 0) {
-       assert(pg->info.last_complete == pg->info.last_update);
-       
-       if (pg->is_primary()) {
-         // i am primary
-         pg->clean_set.insert(whoami);
-         if (pg->is_all_clean()) {
-               pg->state_set(PG::STATE_CLEAN);
-               pg->clean_replicas();
-         }
-       } else {
-         // tell primary
-         dout(7) << *pg << " recovery complete, telling primary" << endl;
-         list<PG::Info> ls;
-         ls.push_back(pg->info);
-         messenger->send_message(new MOSDPGNotify(osdmap->get_epoch(),
-                                                                                          ls),
-                                                         MSG_ADDR_OSD(pg->get_primary()));
-       }
-  } else {
-       // continue
-       pg->do_recovery();
-  }
+  // continue
+  pg->do_recovery();
+
+  // apply to disk!
+  t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
+  unsigned r = store->apply_transaction(t);
+  assert(r == 0);
+
   _unlock_pg(pgid);
 
   delete op;
@@ -2063,6 +2110,8 @@ void OSD::op_rep_modify(MOSDOp *op, PG *pg)
        MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap->get_epoch(), false);
        messenger->send_message(ack, op->get_asker());
        oncommit->ack(); 
+
+       pg->last_heartbeat = g_clock.now();
   }
 }
 
@@ -2079,27 +2128,7 @@ void OSD::handle_op(MOSDOp *op)
   // require same or newer map
   if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
 
-  // does client have old map?
-  if (op->get_source().is_client()) {
-       if (op->get_map_epoch() < osdmap->get_epoch()) {
-         dout(10) << "handle_op client has old map " << op->get_map_epoch() << " < " << osdmap->get_epoch() << endl;
-         send_incremental_map(op->get_map_epoch(), op->get_source(), false);
-       }
-  }
-
-  // does peer have old map?
-  if (op->get_source().is_osd()) {
-       // remember
-       if (peer_map_epoch[op->get_source()] < op->get_map_epoch())
-         peer_map_epoch[op->get_source()] = op->get_map_epoch();
-         
-       // older?
-       if (peer_map_epoch[op->get_source()] < osdmap->get_epoch()) {
-         dout(10) << "handle_op osd has old map " << op->get_map_epoch() << " < " << osdmap->get_epoch() << endl;
-         send_incremental_map(op->get_map_epoch(), op->get_source(), true);
-         peer_map_epoch[op->get_source()] = osdmap->get_epoch();  // so we don't send it again.
-       }
-  }
+  share_map(op->get_source(), op->get_map_epoch());
 
   // crashed?
   if (acting_primary < 0) {
@@ -2293,6 +2322,7 @@ void OSD::do_op(MOSDOp *op, PG *pg)
          break;
 
          // replicated ops
+       case OSD_OP_REP_WRNOOP:
        case OSD_OP_REP_WRITE:
        case OSD_OP_REP_TRUNCATE:
        case OSD_OP_REP_DELETE:
@@ -2300,6 +2330,8 @@ void OSD::do_op(MOSDOp *op, PG *pg)
        case OSD_OP_REP_WRUNLOCK:
        case OSD_OP_REP_RDLOCK:
        case OSD_OP_REP_RDUNLOCK:
+       case OSD_OP_REP_UPLOCK:
+       case OSD_OP_REP_DNLOCK:
          op_rep_modify(op, pg);
          break;
 
@@ -2315,6 +2347,7 @@ void OSD::do_op(MOSDOp *op, PG *pg)
        case OSD_OP_STAT:
          op_stat(op, pg);
          break;
+       case OSD_OP_WRNOOP:
        case OSD_OP_WRITE:
        case OSD_OP_ZERO:
        case OSD_OP_DELETE:
@@ -2323,6 +2356,8 @@ void OSD::do_op(MOSDOp *op, PG *pg)
        case OSD_OP_WRUNLOCK:
        case OSD_OP_RDLOCK:
        case OSD_OP_RDUNLOCK:
+       case OSD_OP_UPLOCK:
+       case OSD_OP_DNLOCK:
          op_modify(op, pg);
          break;
        default:
@@ -2570,20 +2605,18 @@ void OSD::put_repop(OSDReplicaOp *repop)
   if (repop->can_delete()) {
        // adjust peers_complete_thru
        if (!repop->pg_complete_thru.empty()) {
-         map<int,eversion_t>::iterator p = repop->pg_complete_thru.begin();
-         eversion_t min = p->second;
-         p++;
-         while (p != repop->pg_complete_thru.end()) {
-               if (p->second < min) min = p->second;
-               p++;
-         }
-         
          pg_t pgid = repop->op->get_pg();
          osd_lock.Lock();
          PG *pg = get_pg(pgid);
          if (pg) {
+               eversion_t min = pg->info.last_complete;  // hrm....
+               for (unsigned i=1; i<pg->acting.size(); i++) {
+                 if (repop->pg_complete_thru[i] < min)      // note: if we haven't heard, it'll be zero, which is what we want.
+                       min = repop->pg_complete_thru[i];
+               }
+
                if (min > pg->peers_complete_thru) {
-                 //dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
+                 dout(10) << *pg << "put_repop peers_complete_thru " << pg->peers_complete_thru << " -> " << min << endl;
                  pg->peers_complete_thru = min;
                }
                //_unlock_pg(pgid);
index 5796bb200e6d2586d200f7a20c01af738962945a..dde3a6be60f516eef745e59ce1c3a6e3bfd974dd 100644 (file)
@@ -105,8 +105,17 @@ public:
   char dev_path[100];
   class ObjectStore *store;
 
-  // failure monitoring
-  class HostMonitor *monitor;
+  // heartbeat
+  void heartbeat();
+
+  class C_Heartbeat : public Context {
+       OSD *osd;
+  public:
+       C_Heartbeat(OSD *o) : osd(o) {}
+       void finish(int r) {
+         osd->heartbeat();
+       }
+  } *next_heartbeat;
 
   // global lock
   Mutex osd_lock;                          
@@ -161,15 +170,15 @@ public:
   // -- osd map --
   class OSDMap  *osdmap;
   list<class Message*> waiting_for_osdmap;
-  //map<version_t, OSDMap*> osdmaps;
 
   hash_map<msg_addr_t, epoch_t>  peer_map_epoch;
-  
+  void share_map(msg_addr_t who, epoch_t epoch);
+
   void wait_for_new_map(Message *m);
   void handle_osd_map(class MOSDMap *m);
   
   void advance_map(ObjectStore::Transaction& t);
-  void activate_map();
+  void activate_map(ObjectStore::Transaction& t);
 
   bool get_map_bl(epoch_t e, bufferlist& bl);
   bool get_map(epoch_t e, OSDMap &m);
@@ -238,7 +247,7 @@ public:
   // messages
   virtual void dispatch(Message *m);
 
-  void handle_ping(class MPing *m);
+  //void handle_ping(class MPing *m);
   void handle_op(class MOSDOp *m);
 
   void op_read(class MOSDOp *m, PG *pg);
index 83b00a43ac0aa0c3c39f3d3b5fe7b17ba49e994d..ac431ff83066124a2311d7b0cfeeb68c5ddb7ea9 100644 (file)
@@ -18,6 +18,7 @@
 #include "OSD.h"
 
 
+#include "messages/MOSDPGNotify.h"
 #include "messages/MOSDPGLog.h"
 #include "messages/MOSDPGRemove.h"
 
@@ -38,8 +39,8 @@ void PG::Log::copy_after(const Log &other, eversion_t v)
           i++) {
        if (i->version <= v) break;
        log.push_front(*i);
-       bottom = i->version;
   }
+  bottom = v;
 }
 
 
@@ -51,8 +52,8 @@ void PG::IndexedLog::trim(ObjectStore::Transaction& t, eversion_t s)
   while (!log.empty()) {
        Entry &e = *log.begin();
        
-       assert(requested_to != log.begin());
        assert(complete_to != log.begin());
+       assert(requested_to != log.begin());
 
        // remove from index,
        unindex(e);
@@ -72,6 +73,11 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd)
   dout(10) << "merge_log " << olog << " from osd" << fromosd
                   << " into " << log << endl;
 
+  cout << "log" << endl;
+  log.print(cout);
+  cout << "olog" << endl;
+  olog.print(cout);
+
   if (log.empty() ||
          (olog.bottom > log.top && olog.backlog)) { // e.g. log=(0,20] olog=(40,50]+backlog) 
        // i'm missing everything after old log.top.
@@ -98,13 +104,16 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd)
        
        // take the whole thing.
        log.log.swap(olog.log);
-       log.top = olog.top;
-       log.bottom = olog.bottom;
-       log.backlog = olog.backlog;
+
+       info.last_update = log.top = olog.top;
+       info.log_bottom = log.bottom = olog.bottom;
+       info.log_backlog = log.backlog = olog.backlog;
        
        log.index();
 
        dout(10) << "merge_log  result " << log << " " << missing << endl;
+       log.print(cout);
+       dout(10) << "missing " << hex << missing.missing << dec << endl;
        return;
   }
 
@@ -162,8 +171,9 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd)
        list<Log::Entry>::iterator to = olog.log.end();
        list<Log::Entry>::iterator from = olog.log.end();
        while (1) {
+         if (from == olog.log.begin()) break;
          from--;
-         if (from->version > log.top) {
+         if (from->version < log.top) {
                from++;
                break;
          }
@@ -185,12 +195,14 @@ void PG::merge_log(Log &olog, Missing &omissing, int fromosd)
        
        // splice
        log.log.splice(log.log.end(), 
-                                  log.log, from, to);
+                                  olog.log, from, to);
        
-       log.top = olog.top;
+       info.last_update = log.top = olog.top;
   }
   
   dout(10) << "merge_log  result " << log << " " << missing << endl;
+  log.print(cout);
+  dout(10) << "missing " << hex << missing.missing << dec << endl;
 }
 
 
@@ -205,6 +217,7 @@ void PG::generate_backlog()
   osd->store->collection_list(info.pgid, olist);
   
   int local = 0;
+  map<eversion_t,Log::Entry> add;
   for (list<object_t>::iterator it = olist.begin();
           it != olist.end();
           it++) {
@@ -219,28 +232,34 @@ void PG::generate_backlog()
        osd->store->getattr(*it, 
                                                "version",
                                                &e.version, sizeof(e.version));
-       log.log.push_front(e);
+       add[e.version] = e;
+  }
 
-       // index
-       log.index( *log.log.begin() );
+  for (map<eversion_t,Log::Entry>::iterator i = add.begin();
+          i != add.end();
+          i++) {
+       log.log.push_front(i->second);
+       log.index( *log.log.begin() );  // index
   }
 
   dout(10) << local << " local objects, "
+                  << add.size() << " objects added to backlog, " 
                   << log.objects.size() << " in pg" << endl;
 }
 
 void PG::drop_backlog()
 {
-  dout(10) << "dropping backlog for " << log << endl;
+  dout(10) << "drop_backlog for " << log << endl;
+  log.print(cout);
 
   assert(log.backlog);
   log.backlog = false;
   
   while (!log.log.empty()) {
        Log::Entry &e = *log.log.begin();
-       if (e.version == log.bottom) break;
-       assert(e.version < log.bottom);
+       if (e.version > log.bottom) break;
 
+       dout(10) << "drop_backlog trimming " << e.version << endl;
        log.unindex(e);
        log.log.pop_front();
   }
@@ -322,7 +341,8 @@ void PG::adjust_prior()
 }
 
 
-void PG::peer(map< int, map<pg_t,Query> >& query_map)
+void PG::peer(ObjectStore::Transaction& t, 
+                         map< int, map<pg_t,Query> >& query_map)
 {
   dout(10) << "peer.  acting is " << acting 
                   << ", prior_set is " << prior_set << endl;
@@ -357,7 +377,7 @@ void PG::peer(map< int, map<pg_t,Query> >& query_map)
   eversion_t newest_update = info.last_update;
   int        newest_update_osd = osd->whoami;
   eversion_t oldest_update_needed = info.last_update;  // only of acting (current) osd set
-  eversion_t peers_complete_thru = info.last_complete;
+  peers_complete_thru = info.last_complete;
   
   for (map<int,Info>::iterator it = peer_info.begin();
           it != peer_info.end();
@@ -488,9 +508,7 @@ void PG::peer(map< int, map<pg_t,Query> >& query_map)
 
 
   // -- ok, activate!
-  info.last_epoch_started = osd->osdmap->get_epoch();
-  state_set(PG::STATE_ACTIVE);  // i am active!
-  dout(10) << "marking active" << endl;
+  activate(t);
 
   clean_set.clear();
   if (info.is_clean()) 
@@ -543,6 +561,8 @@ void PG::peer(map< int, map<pg_t,Query> >& query_map)
        dout(10) << "sending " << m->log << " " << m->missing
                         << " to osd" << peer << endl;
 
+       m->log.print(cout);
+
        osd->messenger->send_message(m, MSG_ADDR_OSD(peer));
   }
 
@@ -553,10 +573,122 @@ void PG::peer(map< int, map<pg_t,Query> >& query_map)
        clean_replicas();       
   }
   
+  // waiters
   osd->take_waiters(waiting_for_active);
 }
 
 
+void PG::activate(ObjectStore::Transaction& t)
+{
+  // twiddle pg state
+  state_set(STATE_ACTIVE);
+  state_clear(PG::STATE_STRAY);
+  info.last_epoch_started = osd->osdmap->get_epoch();
+
+  if (role == 0) {     // primary state
+       peers_complete_thru = 0;  // we don't know (yet)!
+  }
+
+  assert(info.last_complete >= log.bottom || log.backlog);
+
+  // write pg info
+  t.collection_setattr(info.pgid, "info", (char*)&info, sizeof(info));
+  
+  // write log
+  write_log(t);
+
+  // clean up stray objects
+  clean_up_local(t); 
+
+  // init complete pointer
+  if (info.last_complete == info.last_update) {
+       dout(10) << "activate - complete" << endl;
+       log.complete_to == log.log.end();
+       log.requested_to = log.log.end();
+  } else {
+       dout(10) << "activate - not complete, starting recovery" << endl;
+
+       // init complete_to
+       log.complete_to = log.log.begin();
+       while (log.complete_to->version < info.last_complete) {
+         log.complete_to++;
+         assert(log.complete_to != log.log.end());
+       }
+
+       // start recovery
+       log.requested_to = log.log.begin();
+    do_recovery();
+  }
+}
+
+/** clean_up_local
+ * remove any objects that we're storing but shouldn't.
+ * as determined by log.
+ */
+void PG::clean_up_local(ObjectStore::Transaction& t)
+{
+  dout(10) << "clean_up_local" << endl;
+
+  assert(info.last_update >= log.bottom);  // otherwise we need some help!
+
+  if (log.backlog) {
+       // be thorough.
+       list<object_t> ls;
+       osd->store->collection_list(info.pgid, ls);
+       set<object_t> s;
+       
+       for (list<object_t>::iterator i = ls.begin();
+                i != ls.end();
+                i++) 
+         s.insert(*i);
+
+       set<object_t> did;
+       for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
+                p != log.log.rend();
+                p++) {
+         if (did.count(p->oid)) continue;
+         did.insert(p->oid);
+         
+         if (p->is_delete()) {
+               if (s.count(p->oid)) {
+                 dout(10) << " deleting " << hex << p->oid << dec
+                                  << " when " << p->version << endl;
+                 t.remove(p->oid);
+               }
+               s.erase(p->oid);
+         } else {
+               // just leave old objects.. they're missing or whatever
+               s.erase(p->oid);
+         }
+       }
+
+       for (set<object_t>::iterator i = s.begin(); 
+                i != s.end();
+                i++) {
+         dout(10) << " deleting stray " << hex << *i << dec << endl;
+         t.remove(*i);
+       }
+
+  } else {
+       // just scan the log.
+    set<object_t> did;
+       for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
+                p != log.log.rend();
+                p++) {
+         if (did.count(p->oid)) continue;
+         did.insert(p->oid);
+
+         if (p->is_delete()) {
+               dout(10) << " deleting " << hex << p->oid << dec
+                                << " when " << p->version << endl;
+               t.remove(p->oid);
+         } else {
+               // keep old(+missing) objects, just for kicks.
+         }
+       }
+  }
+}
+
 
 /**
  * do one recovery op.
@@ -569,59 +701,52 @@ bool PG::do_recovery()
   // look at log!
   Log::Entry *latest = 0;
 
-  while (1) {
-       if (log.requested_to == log.log.end()) {
-         dout(10) << "already requested everything in log" << endl;
-         return false;
-       }
+  while (log.requested_to != log.log.end()) {
+       dout(10) << "do_recovery "
+                        << log.requested_to->version
+                        << (log.requested_to->is_update() ? " update":" delete")
+                        << " " << hex << log.requested_to->oid << dec 
+                        << endl;
 
+       assert(log.objects.count(log.requested_to->oid));
        latest = log.objects[log.requested_to->oid];
+
        if (latest->is_update() &&
                !objects_pulling.count(latest->oid) &&
-               missing.is_missing(latest->oid))
-         break;
+               missing.is_missing(latest->oid)) {
+         osd->pull(this, latest->oid, latest->version);
+         return true;
+       }
        
        log.requested_to++;
   }
 
-  osd->pull(this, latest->oid, latest->version);
+  // done!
+  assert(missing.num_missing() == 0);
+  assert(info.last_complete == info.last_update);
   
-  return true;
-}
+  if (is_primary()) {
+       // i am primary
+       clean_set.insert(osd->whoami);
+       if (is_all_clean()) {
+         state_set(PG::STATE_CLEAN);
+         clean_replicas();
+       }
+  } else {
+       // tell primary
+       dout(7) << "do_recovery complete, telling primary" << endl;
+       list<PG::Info> ls;
+       ls.push_back(info);
+       osd->messenger->send_message(new MOSDPGNotify(osd->osdmap->get_epoch(),
+                                                                                                 ls),
+                                                                MSG_ADDR_OSD(get_primary()));
+  }
 
+  return false;
+}
 
-/** clean_up_local
- * remove any objects that we're storing but shouldn't.
- * as determined by log.
- */
-void PG::clean_up_local()
-{
-  dout(10) << "clean_up_local" << endl;
 
-  assert(info.last_update >= log.bottom);  // otherwise we need some help!
 
-  set<object_t> did;
-  for (list<Log::Entry>::reverse_iterator p = log.log.rbegin();
-          p != log.log.rend();
-          p++) {
-       if (!p->is_delete()) continue;
-       if (did.count(p->oid)) continue;
-       did.insert(p->oid);
-
-       if (osd->store->exists(p->oid)) {
-         eversion_t ov = 0;
-         osd->store->getattr(p->oid, "version", &ov, sizeof(ov));
-         if (ov < p->version) {
-               dout(10) << " removing " << hex << p->oid << dec
-                                << " v " << ov << " < " << p->version << endl;
-               osd->store->remove(p->oid);
-         } else {
-               dout(10) << "  keeping " << hex << p->oid << dec
-                                << " v " << ov << " >= " << p->version << endl;
-         }
-       }
-  }
-}
 
 void PG::clean_replicas()
 {
@@ -704,7 +829,7 @@ void PG::trim_ondisklog_to(ObjectStore::Transaction& t, eversion_t v)
 void PG::append_log(ObjectStore::Transaction& t, PG::Log::Entry& logentry, 
                                        eversion_t trim_to)
 {
-  // write entry
+  // write entry on disk
   bufferlist bl;
   bl.append( (char*)&logentry, sizeof(logentry) );
   t.write( info.pgid, ondisklog.top, bl.length(), bl );
@@ -756,5 +881,23 @@ void PG::read_log(ObjectStore *store)
        }
   }
   log.top = info.last_update;
+  log.index();
+
+  // build missing
+  set<object_t> did;
+  for (list<Log::Entry>::reverse_iterator i = log.log.rbegin();
+          i != log.log.rend();
+          i++) {
+       if (i->version <= info.last_complete) break;
+       if (did.count(i->oid)) continue;
+       did.insert(i->oid);
+
+       if (i->is_delete()) continue;
+
+       eversion_t v;
+       int r = osd->store->getattr(i->oid, "version", &v, sizeof(v));
+       if (r < 0 || v < i->version) 
+         missing.add(i->oid, i->version);
+  }
 }
 
index 10b4dfc62e445d272b4f3edb2cffe7098f69a112..f3a8aefc5503d36a8acd944f1298e36ee7eecf1b 100644 (file)
@@ -268,12 +268,11 @@ public:
        hash_set<reqid_t>         caller_ops;
 
        // recovery pointers
-       bool recovery_valid;
        list<Entry>::iterator requested_to; // not inclusive of referenced item
        list<Entry>::iterator complete_to;  // not inclusive of referenced item
        
        /****/
-       IndexedLog() : recovery_valid(false) {}
+       IndexedLog() {}
 
        bool logged_object(object_t oid) {
          return objects.count(oid);
@@ -284,6 +283,7 @@ public:
 
        void index() {
          objects.clear();
+         caller_ops.clear();
          for (list<Entry>::iterator i = log.begin();
                   i != log.end();
                   i++) {
@@ -303,6 +303,7 @@ public:
          assert(objects.count(e.oid));
          if (objects[e.oid]->version == e.version)
                objects.erase(e.oid);
+         caller_ops.erase(e.reqid);
        }
 
        // accessors
@@ -367,7 +368,7 @@ public:
   IndexedLog  log;
   OndiskLog   ondisklog;
   Missing     missing;
-
+  utime_t     last_heartbeat;  // 
 
 protected:
   int         role;    // 0 = primary, 1 = replica, -1=none.
@@ -379,9 +380,9 @@ protected:
   epoch_t     last_epoch_started_any;
   eversion_t  last_complete_commit;
 
- protected:
   // [primary only] content recovery state
   eversion_t  peers_complete_thru;
+ protected:
   set<int>    prior_set;   // current+prior OSDs, as defined by last_epoch_started_any.
   set<int>    stray_set;   // non-acting osds that have PG data.
   set<int>    clean_set;   // current OSDs that are clean
@@ -403,7 +404,6 @@ protected:
                   list<class Message*> > waiting_for_missing_object;   
   
   // recovery
-  //eversion_t                requested_thru;
   map<object_t, eversion_t> objects_pulling;  // which objects are currently being pulled
   
 public:
@@ -418,7 +418,6 @@ public:
        peer_info_requested.clear();
        peer_log_requested.clear();
        clear_primary_recovery_state();
-       peers_complete_thru = 0;
   }
 
  public:
@@ -451,18 +450,12 @@ public:
   void generate_backlog();
   void drop_backlog();
 
-  void peer(map< int, map<pg_t,Query> >& query_map);
+  void peer(ObjectStore::Transaction& t, map< int, map<pg_t,Query> >& query_map);
 
-  bool do_recovery();
-  void start_recovery() {
-       log.recovery_valid = true;
-       log.requested_to = log.log.begin();
-       log.complete_to = log.log.begin();
+  void activate(ObjectStore::Transaction& t);
 
-       do_recovery();
-  }
+  bool do_recovery();
 
-  void clean_up_local();
   void clean_replicas();
 
   off_t get_log_write_pos() {
@@ -513,6 +506,9 @@ public:
   }
 
 
+  // pg on-disk content
+  void clean_up_local(ObjectStore::Transaction& t);
+
   // pg on-disk state
   void write_log(ObjectStore::Transaction& t);
   void append_log(ObjectStore::Transaction& t, 
@@ -553,6 +549,7 @@ inline ostream& operator<<(ostream& out, const PG& pg)
 {
   out << "pg[" << pg.info 
          << " r=" << pg.get_role();
+  if (pg.get_role() == 0) out << " pct " << pg.peers_complete_thru;
   if (pg.is_active()) out << " active";
   if (pg.is_clean()) out << " clean";
   if (pg.is_stray()) out << " stray";
index 9db25df73db36bad69bf9d397e1fcc028efb6b07..b59e60712953a9af79fc9c7edacf98a07f21db27 100644 (file)
@@ -68,7 +68,7 @@ class Filer {
        Objecter::OSDRead *rd = new Objecter::OSDRead(bl);
        file_to_extents(inode, offset, len, rd->extents);
 
-       return objecter->readx(rd, onfinish);
+       return objecter->readx(rd, onfinish) > 0 ? 0:-1;
   }
 
   int write(inode_t& inode,
@@ -81,7 +81,7 @@ class Filer {
        Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
        file_to_extents(inode, offset, len, wr->extents);
 
-       return objecter->modifyx(wr, onack, oncommit);
+       return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1;
   }
 
   int zero(inode_t& inode,
@@ -92,7 +92,7 @@ class Filer {
        Objecter::OSDModify *z = new Objecter::OSDModify(OSD_OP_ZERO);
        file_to_extents(inode, offset, len, z->extents);
 
-       return objecter->modifyx(z, onack, oncommit);
+       return objecter->modifyx(z, onack, oncommit) > 0 ? 0:-1;
   }
 
 
index 9d498804a07be96d308563e13a417e4b1aecc91e..be67513002058d3ad9102534b54890ae600a567f 100644 (file)
@@ -794,9 +794,9 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute
        // single object.
        
        // make sure we aren't already locking/locked...
-       object_t oid = wr->extents.front()->oid;
+       object_t oid = wr->extents.front().oid;
        Object *o = 0;
-       if (objects.count(oid)) o = get_object(iod, ino);
+       if (objects.count(oid)) o = get_object(oid, ino);
        if (!o || 
                (o->lock_state != Object::LOCK_WRLOCK &&
                 o->lock_state != Object::LOCK_WRLOCKING &&
@@ -1195,3 +1195,50 @@ off_t ObjectCacher::release_set(inodeno_t ino)
 
   return unclean;
 }
+
+
+void ObjectCacher::kick_sync_writers(inodeno_t ino)
+{
+  if (objects_by_ino.count(ino) == 0) {
+       dout(10) << "kick_sync_writers on " << hex << ino << dec << " dne" << endl;
+       return;
+  }
+
+  dout(10) << "kick_sync_writers on " << hex << ino << dec << endl;
+
+  list<Context*> ls;
+
+  set<Object*>& s = objects_by_ino[ino];
+  for (set<Object*>::iterator i = s.begin();
+          i != s.end();
+          i++) {
+       Object *ob = *i;
+       
+       ls.splice(ls.begin(), ob->waitfor_wr);
+  }
+
+  finish_contexts(ls);
+}
+
+void ObjectCacher::kick_sync_readers(inodeno_t ino)
+{
+  if (objects_by_ino.count(ino) == 0) {
+       dout(10) << "kick_sync_readers on " << hex << ino << dec << " dne" << endl;
+       return;
+  }
+
+  dout(10) << "kick_sync_readers on " << hex << ino << dec << endl;
+
+  list<Context*> ls;
+
+  set<Object*>& s = objects_by_ino[ino];
+  for (set<Object*>::iterator i = s.begin();
+          i != s.end();
+          i++) {
+       Object *ob = *i;
+       
+       ls.splice(ls.begin(), ob->waitfor_rd);
+  }
+
+  finish_contexts(ls);
+}
index fd93b6f2342cd743d2c4d1905d0534f04262c783..20614c44faa0750613f33b83aa7f77f4c70d29a3 100644 (file)
@@ -406,6 +406,9 @@ class ObjectCacher {
 
   off_t release_set(inodeno_t ino);  // returns # of bytes not released (ie non-clean)
 
+  void kick_sync_writers(inodeno_t ino);
+  void kick_sync_readers(inodeno_t ino);
+
 
   // file functions
 
index b6f45dbf38c8e6fc6b5f6f54a06a86a568f825fc..45cdca4290aecaf93d7fd545f5de8b0a95d8ae2e 100644 (file)
@@ -100,6 +100,8 @@ void Objecter::handle_osd_map(MOSDMap *m)
 
 void Objecter::scan_pgs(set<pg_t>& changed_pgs, set<pg_t>& down_pgs)
 {
+  dout(10) << "scan_pgs" << endl;
+
   for (hash_map<pg_t,PG>::iterator i = pg_map.begin();
           i != pg_map.end();
           i++) {
@@ -107,12 +109,22 @@ void Objecter::scan_pgs(set<pg_t>& changed_pgs, set<pg_t>& down_pgs)
        PG& pg = i->second;
 
        int old = pg.primary;
-       if (pg.calc_primary(pgid, osdmap)) {
+       pg.calc_primary(pgid, osdmap);
+
+       if (old != pg.primary) {
          if (osdmap->is_down(old)) {
-               dout(10) << "scan_pgs pg " << hex << pgid << dec << " went down" << endl;
+               dout(10) << "scan_pgs pg " << hex << pgid << dec 
+                                << " (" << pg.active_tids << ")"
+                                << " primary " << old << " -> " << pg.primary
+                                << " (went down)"
+                                << endl;
                down_pgs.insert(pgid);
          } else {
-               dout(10) << "scan_pgs pg " << hex << pgid << dec << " changed" << endl;
+               dout(10) << "scan_pgs pg " << hex << pgid << dec 
+                                << " (" << pg.active_tids << ")"
+                                << " primary " << old << " -> " << pg.primary
+                                << " (changed)"
+                                << endl;
          }
          changed_pgs.insert(pgid);
        }
@@ -121,6 +133,10 @@ void Objecter::scan_pgs(set<pg_t>& changed_pgs, set<pg_t>& down_pgs)
 
 void Objecter::kick_requests(set<pg_t>& changed_pgs, set<pg_t>& down_pgs) 
 {
+  dout(10) << "kick_requests changed " << hex << changed_pgs
+                  << ", down " << down_pgs << dec
+                  << endl;
+
   for (set<pg_t>::iterator i = changed_pgs.begin();
           i != changed_pgs.end();
           i++) {
@@ -143,34 +159,20 @@ void Objecter::kick_requests(set<pg_t>& changed_pgs, set<pg_t>& down_pgs)
                
                // WRITE
                if (wr->waitfor_ack.count(tid)) {
-                 // resubmit
                  if (down_pgs.count(pgid) == 0) {
-                       dout(0) << "kick_requests resub WRNOOP " << tid << endl;
+                       dout(0) << "kick_requests missing ack, resub WRNOOP " << tid << endl;
                        modifyx_submit(wr, wr->waitfor_ack[tid], true);
                  } else {
-                       dout(0) << "kick_requests resub write " << tid << endl;
+                       dout(0) << "kick_requests missing ack, resub write " << tid << endl;
                        modifyx_submit(wr, wr->waitfor_ack[tid]);
                  }
                  wr->waitfor_ack.erase(tid);
                  wr->waitfor_commit.erase(tid);
                }
                else if (wr->waitfor_commit.count(tid)) {
-                 // fake it.  FIXME.
-                 dout(0) << "kick_requests faking commit on write " << tid << endl;
-                 wr->waitfor_ack.erase(tid);
-                 if (wr->waitfor_ack.empty() && wr->onack) {
-                       wr->onack->finish(0);
-                       delete wr->onack;
-                       wr->onack = 0;
-                 }
+                 dout(0) << "kick_requests missing commit, resub WRNOOP " << tid << endl;
+                 modifyx_submit(wr, wr->waitfor_commit[tid], true);
                  wr->waitfor_commit.erase(tid);
-                 if (wr->waitfor_commit.empty()) {
-                       if (wr->oncommit) {
-                         wr->oncommit->finish(0);
-                         delete wr->oncommit;
-                       }
-                       delete wr;
-                 }
                }
          }
 
@@ -201,10 +203,15 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
        handle_osd_read_reply(m);
        break;
        
+  case OSD_OP_WRNOOP:
   case OSD_OP_WRITE:
   case OSD_OP_ZERO:
   case OSD_OP_WRUNLOCK:
   case OSD_OP_WRLOCK:
+  case OSD_OP_RDLOCK:
+  case OSD_OP_RDUNLOCK:
+  case OSD_OP_UPLOCK:
+  case OSD_OP_DNLOCK:
        handle_osd_modify_reply(m);
        break;
 
@@ -218,7 +225,7 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
 
 tid_t Objecter::read(object_t oid, off_t off, size_t len, bufferlist *bl, 
-                                  Context *onfinish)
+                                        Context *onfinish)
 {
   OSDRead *rd = new OSDRead(bl);
   rd->extents.push_back(ObjectExtent(oid, off, len));
@@ -252,9 +259,12 @@ void Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex)
                                                 OSD_OP_READ);
   m->set_length(ex.length);
   m->set_offset(ex.start);
-  dout(10) << "readx_submit tid " << last_tid << " to osd" << pg.primary
+  dout(10) << "readx_submit " << rd << " tid " << last_tid
                   << " oid " << hex << ex.oid << dec  << " " << ex.start << "~" << ex.length
-                  << " (" << ex.buffer_extents.size() << " buffer fragments)" << endl;
+                  << " (" << ex.buffer_extents.size() << " buffer fragments)" 
+                  << " pg " << hex << ex.pgid << dec
+                  << " osd" << pg.primary 
+                  << endl;
 
   if (pg.primary >= 0) 
        messenger->send_message(m, MSG_ADDR_OSD(pg.primary), 0);
@@ -271,9 +281,14 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
 {
   // get pio
   tid_t tid = m->get_tid();
+
+  if (op_read.count(tid) == 0) {
+       dout(7) << "handle_osd_read_reply " << tid << " ... stray" << endl;
+       delete m;
+       return;
+  }
+
   dout(7) << "handle_osd_read_reply " << tid << endl;
-  
-  assert(op_read.count(tid));
   OSDRead *rd = op_read[ tid ];
   op_read.erase( tid );
 
@@ -303,7 +318,6 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
   if (rd->ops.empty()) {
        // all done
        size_t bytes_read = 0;
-       rd->bl->clear();
        
        if (rd->read_data.size()) {
          dout(15) << " assembling frags" << endl;
@@ -409,7 +423,7 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
        Context *onfinish = rd->onfinish;
 
        dout(7) << " " << bytes_read << " bytes " 
-         //<< rd->bl->length()
+                       << rd->bl->length()
                        << endl;
        
        // done
@@ -536,9 +550,12 @@ void Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, bool wrnoop)
   pg.active_tids.insert(last_tid);
   
   // send
-  dout(10) << "modifyx_submit op " << wr->op << " tid " << last_tid
-                  << " osd" << pg.primary << "  oid " << hex << ex.oid << dec 
-                  << " " << ex.start << "~" << ex.length << endl;
+  dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << last_tid
+                  << "  oid " << hex << ex.oid << dec 
+                  << " " << ex.start << "~" << ex.length 
+                  << " pg " << hex << ex.pgid << dec 
+                  << " osd" << pg.primary 
+                  << endl;
   if (pg.primary >= 0)
        messenger->send_message(m, MSG_ADDR_OSD(pg.primary), 0);
 }
@@ -549,8 +566,14 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
 {
   // get pio
   tid_t tid = m->get_tid();
+
+  if (op_modify.count(tid) == 0) {
+       dout(7) << "handle_osd_modify_reply " << tid << " commit " << m->get_commit() << " ... stray" << endl;
+       delete m;
+       return;
+  }
+
   dout(7) << "handle_osd_modify_reply " << tid << " commit " << m->get_commit() << endl;
-  assert(op_modify.count(tid));
   OSDModify *wr = op_modify[ tid ];
 
   Context *onack = 0;
index 72cc1cb90cce8b50180487571c40876955553982..abcd46f95d29d3cef53f4755ac487e0a240d2388 100644 (file)
@@ -40,7 +40,9 @@ class Objecter {
        map<tid_t, ObjectExtent> ops;
        map<object_t, bufferlist*> read_data;  // bits of data as they come back
 
-       OSDRead(bufferlist *b) : bl(b), onfinish(0) {}
+       OSDRead(bufferlist *b) : bl(b), onfinish(0) {
+         bl->clear();
+       }
   };
 
   // generic modify
@@ -89,12 +91,8 @@ class Objecter {
 
        PG() : primary(-1) {}
 
-       bool calc_primary(pg_t pgid, OSDMap *osdmap) {  // return true if change
-         int n = osdmap->get_pg_acting_primary(pgid);
-         if (n == primary) 
-               return false;
-         primary = n;
-         return true;    
+       void calc_primary(pg_t pgid, OSDMap *osdmap) {  // return true if change
+         primary = osdmap->get_pg_acting_primary(pgid);
        }
   };