]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 25 Oct 2005 17:11:43 +0000 (17:11 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 25 Oct 2005 17:11:43 +0000 (17:11 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@506 29311d96-e01e-0410-9327-a35deaab8ce9

24 files changed:
ceph/common/Cond.h
ceph/common/ThreadPool.h
ceph/common/Timer.cc
ceph/common/Timer.h
ceph/config.cc
ceph/config.h
ceph/include/types.h
ceph/mds/MDS.cc
ceph/messages/MOSDOp.h
ceph/messages/MOSDPGPeer.h
ceph/messages/MOSDPGUpdate.h
ceph/msg/FakeMessenger.cc
ceph/msg/Messenger.cc
ceph/msg/Messenger.h
ceph/msg/TCPMessenger.cc
ceph/osd/FakeStore.cc
ceph/osd/FakeStore.h
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/ObjectStore.h
ceph/osd/PG.cc
ceph/osd/PG.h
ceph/osdc/Filer.cc
ceph/osdc/Filer.h

index db6cea095c40f55355f241d6654b11e0f3cd1da0..3a376108d5a4c04e4b97b861b42d573fdefcab9b 100644 (file)
@@ -20,7 +20,8 @@ class Cond
  public:
 
   Cond() {
-    pthread_cond_init(&C,NULL);
+    int r = pthread_cond_init(&C,NULL);
+       assert(r == 0);
   }
 
   virtual ~Cond() { 
@@ -53,7 +54,8 @@ class Cond
   }
 
   int Signal() { 
-       int r = pthread_cond_signal(&C);
+       //int r = pthread_cond_signal(&C);
+       int r = pthread_cond_broadcast(&C);
        return r;
   }
 };
index 17670acfbe141f78d9651ad8094ccd34f180f3ec..beeefa4a53927dc04072f101404bac94a1de97c5 100644 (file)
@@ -53,7 +53,7 @@ class ThreadPool {
                //pthread_exit(0);
                return 0;   // like this, i think!
       }
-      //tpdout(DBLVL) << "Thread "<< pthread_self() << " calling the function\n";
+      tpdout(DBLVL) << "Thread "<< pthread_self() << " calling the function on " << op << endl;
       func(u, op);
     }
        return 0;
@@ -108,6 +108,7 @@ class ThreadPool {
 
   void put_op(T* op)
   {
+    tpdout(DBLVL) << "put_op " << op << endl;
     q_lock.Lock();
     q.push(op);
     num_ops++;
index 324e11a59377d80ae571986d79ec7eed4d7189cb..070e6c682abe1dbcd49452486ae89a340be437c9 100644 (file)
@@ -11,7 +11,7 @@
 #undef dout
 #define dout(x)  if (x <= g_conf.debug) cout << "Timer: "
 
-#define DBL 20
+#define DBL 10
 
 #include <signal.h>
 #include <sys/time.h>
@@ -20,7 +20,8 @@
 // single global instance
 Timer      g_timer;
 
-Context *messenger_kicker = 0;
+//Context *messenger_kicker = 0;
+Messenger *messenger = 0;
 
 
 
@@ -55,28 +56,41 @@ void Timer::timer_thread()
                utime_t t = it->first;
                dout(DBL) << "queuing event(s) scheduled at " << t << endl;
 
-               pending[t] = it->second;
+               if (messenger) {
+                 for (set<Context*>::iterator cit = it->second.begin();
+                          cit != it->second.end();
+                          cit++)
+                       messenger->queue_callback(*cit);
+               }
+
+               //pending[t] = it->second;
                it++;
                scheduled.erase(t);
          }
 
-         if (messenger_kicker) {
-               dout(DBL) << "kicking messenger" << endl;
-               messenger_kicker->finish(0);
-         } else {
-               dout(DBL) << "no messenger ot kick!" << endl;
-         }
-
        }
 
        else {
          // sleep
          if (event) {
                dout(DBL) << "sleeping until " << next << endl;
-               cond.Wait(lock, next);  // wait for waker or time
+               timed_sleep = true;
+               timeout_cond.Wait(lock, next);  // wait for waker or time
+               utime_t now = g_clock.now();
+               dout(DBL) << "kicked or timed out at " << now << endl;
          } else {
                dout(DBL) << "sleeping" << endl;
-               cond.Wait(lock);         // wait for waker
+
+               //wtf this isn't waking up! 
+               timed_sleep = false;
+               sleep_cond.Wait(lock);         // wait for waker
+               // setting a 1s limit works tho
+               //utime_t next = g_clock.now();
+               //next.sec_ref() += 10;
+               //cond.Wait(lock, next);         // wait for waker
+
+               utime_t now = g_clock.now();
+               dout(DBL) << "kicked at " << now << endl;
          }
        }
   }
@@ -90,7 +104,7 @@ void Timer::timer_thread()
  * Timer bits
  */
 
-
+/*
 void Timer::set_messenger_kicker(Context *c)
 {
   dout(10) << "messenger kicker is " << c << endl;
@@ -106,12 +120,27 @@ void Timer::unset_messenger_kicker()
   }
   cancel_timer();
 }
+*/
+
+void Timer::set_messenger(Messenger *m)
+{
+  dout(10) << "set messenger " << m << endl;
+  messenger = m;
+}
+void Timer::unset_messenger()
+{
+  dout(10) << "unset messenger" << endl;
+  messenger = 0;
+}
 
 void Timer::register_timer()
 {
   if (thread_id) {
        dout(DBL) << "register_timer kicking thread" << endl;
-       cond.Signal();
+       if (timed_sleep)
+         timeout_cond.Signal();
+       else
+         sleep_cond.Signal();
   } else {
        dout(DBL) << "register_timer starting thread" << endl;
        pthread_create(&thread_id, NULL, timer_thread_entrypoint, (void*)this);
@@ -125,7 +154,10 @@ void Timer::cancel_timer()
        dout(10) << "setting thread_stop flag" << endl;
        lock.Lock();
        thread_stop = true;
-       cond.Signal();
+       if (timed_sleep)
+         timeout_cond.Signal();
+       else
+         sleep_cond.Signal();
        lock.Unlock();
        
        dout(10) << "waiting for thread to finish" << endl;
@@ -159,10 +191,11 @@ void Timer::add_event_at(utime_t when,
   lock.Lock();
   scheduled[ when ].insert(callback);
   event_times[callback] = when;
-  lock.Unlock();
   
   // make sure i wake up
   register_timer();
+
+  lock.Unlock();
 }
 
 bool Timer::cancel_event(Context *callback) 
@@ -194,6 +227,7 @@ bool Timer::cancel_event(Context *callback)
  * this should be called by the Messenger in the proper thread (usually same as incoming messages)
  */
 
+/*
 void Timer::execute_pending()
 {
   lock.Lock();
@@ -215,3 +249,5 @@ void Timer::execute_pending()
 
   lock.Unlock();
 }
+
+*/
index 1c8f535980e09d35598a6c5256740d6cb2bb82f4..28c777e41d8f7600d89e5a8e842eb413a6c32251 100644 (file)
@@ -57,7 +57,9 @@ class Timer {
   pthread_t thread_id;
   bool      thread_stop;
   Mutex     lock;
-  Cond      cond;
+  bool      timed_sleep;
+  Cond      sleep_cond;
+  Cond      timeout_cond;
  public:
   void timer_thread();    // waiter thread (that wakes us up)
 
@@ -100,6 +102,10 @@ class Timer {
   void set_messenger_kicker(Context *c);
   void unset_messenger_kicker();
 
+  void set_messenger(Messenger *m);
+  void unset_messenger();
+
+
   // schedule events
   void add_event_after(float seconds,
                                           Context *callback);
index 92144965ce233ec11211798e0253b2ac778c1638..f65a73e201b5b28412efda7f2d5a70d82b35fa61 100644 (file)
@@ -20,14 +20,14 @@ Mutex bufferlock;
 
 
 
-FileLayout g_OSD_FileLayout( 1<<20, 1, 1<<20 );   // stripe files over whole objects
+FileLayout g_OSD_FileLayout( 1<<20, 1, 1<<20, 3 );   // stripe files over whole objects
 //FileLayout g_OSD_FileLayout( 1<<17, 4, 1<<20 );   // 128k stripes over sets of 4
 
 // ??
-FileLayout g_OSD_MDDirLayout( 1<<14, 1<<2, 1<<19 );
+FileLayout g_OSD_MDDirLayout( 1<<14, 1<<2, 1<<19, 3 );
 
 // stripe mds log over 128 byte bits (see mds_log_pad_entry below to match!)
-FileLayout g_OSD_MDLogLayout( 1<<7, 32, 1<<20 );  // new (good?) way
+FileLayout g_OSD_MDLogLayout( 1<<7, 32, 1<<20, 3 );  // new (good?) way
 //FileLayout g_OSD_MDLogLayout( 57, 32, 1<<20 );  // pathological case to test striping buffer mapping
 //FileLayout g_OSD_MDLogLayout( 1<<20, 1, 1<<20 );  // old way
 
@@ -48,6 +48,7 @@ md_config_t g_conf = {
   fake_clock: false,
   fakemessenger_serialize: true,
   fake_osdmap_expand: 0,
+  fake_osd_sync: true,
 
   debug: 1,
   debug_mds_balancer: 1,
@@ -109,6 +110,7 @@ md_config_t g_conf = {
   osd_writesync: false,
   osd_maxthreads: 0,   // 0 == no threading!
   
+  osd_fakestore_syncthreads: 4,
 
 
   // --- fakeclient (mds regression testing) ---
index fc0e5f1135f91191fb4815166491647c442722a0..acc17857b18b045bdbc203df4e73f01033f10ff0 100644 (file)
@@ -26,6 +26,7 @@ struct md_config_t {
   bool fakemessenger_serialize;
 
   int fake_osdmap_expand;
+  bool fake_osd_sync;
 
   int debug;
   int debug_mds_balancer;
@@ -84,6 +85,8 @@ struct md_config_t {
   bool  osd_writesync;
   int   osd_maxthreads;
 
+  int osd_fakestore_syncthreads;
+
   // fake client
   int      num_fakeclient;
   unsigned fakeclient_requests;
index 6198267846341974a0b8941606c97b6d460714c0..d0f9da762251cb4b9156946e079ec32249638bb8 100644 (file)
@@ -6,6 +6,7 @@
 
 #include <string>
 #include <set>
+#include <map>
 #include <vector>
 #include <iostream>
 using namespace std;
@@ -241,8 +242,9 @@ inline ostream& operator<<(ostream& out, set<int>& iset) {
   return out;
 }
 
-inline ostream& operator<<(ostream& out, set<__uint64_t>& iset) {
-  for (set<__uint64_t>::iterator it = iset.begin();
+template<class A>
+inline ostream& operator<<(ostream& out, set<A>& iset) {
+  for (typename set<A>::iterator it = iset.begin();
           it != iset.end();
           it++) {
        if (it != iset.begin()) out << ",";
@@ -251,8 +253,9 @@ inline ostream& operator<<(ostream& out, set<__uint64_t>& iset) {
   return out;
 }
 
-inline ostream& operator<<(ostream& out, multiset<int>& iset) {
-  for (multiset<int>::iterator it = iset.begin();
+template<class A>
+inline ostream& operator<<(ostream& out, multiset<A>& iset) {
+  for (typename multiset<A>::iterator it = iset.begin();
           it != iset.end();
           it++) {
        if (it != iset.begin()) out << ",";
@@ -261,6 +264,20 @@ inline ostream& operator<<(ostream& out, multiset<int>& iset) {
   return out;
 }
 
+template<class A,class B>
+inline ostream& operator<<(ostream& out, map<A,B>& m) 
+{
+  out << "{";
+  for (typename map<A,B>::const_iterator it = m.begin();
+          it != m.end();
+          it++) {
+       if (it != m.begin()) out << ",";
+       out << it->first << "=" << it->second;
+  }
+  out << "}";
+  return out;
+}
+
 
 
 
index ceab7adb0948f96b2245d8133ee95ad9a414030f..6bf9902577394b8e47c36ccfad2ada5aef7cf899 100644 (file)
@@ -98,6 +98,7 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) {
   // <HACK set up OSDMap from g_conf>
   osdmap = new OSDMap();
   osdmap->set_pg_bits(g_conf.osd_pg_bits);
+  osdmap->inc_version();
 
   Bucket *b = new UniformBucket(1, 0);
   for (int i=0; i<g_conf.num_osd; i++) {
@@ -112,14 +113,6 @@ MDS::MDS(MDCluster *mdc, int whoami, Messenger *m) {
        osdmap->crush.rules[i].steps.push_back(RuleStep(CRUSH_RULE_EMIT));
   }
 
-  /*OSDGroup osdg;
-  osdg.num_osds = g_conf.num_osd;
-  for (int i=0; i<osdg.num_osds; i++) osdg.osds.push_back(i);
-  osdg.weight = 100;
-  osdg.osd_size = 100;  // not used yet?
-  osdmap->add_group(osdg);
-  */
-
   // </HACK>
 
   filer = new Filer(messenger, osdmap);
index 211953c1615197adcb53cb862f79b43ff6436e30..ba3feb32ab60bba3de34878b5fa8aa1342036500 100644 (file)
@@ -59,20 +59,20 @@ class MOSDOp : public Message {
   friend class MOSDOpReply;
 
  public:
-  long       get_tid() { return st.tid; }
-  msg_addr_t get_asker() { return st.asker; }
+  const long       get_tid() { return st.tid; }
+  const msg_addr_t get_asker() { return st.asker; }
 
-  object_t   get_oid() { return st.oid; }
-  pg_t get_pg() { return st.pg; }
-  version_t  get_map_version() { return st.map_version; }
+  const object_t   get_oid() { return st.oid; }
+  const pg_t get_pg() { return st.pg; }
+  const version_t  get_map_version() { return st.map_version; }
 
-  int        get_pg_role() { return st.pg_role; }  // who am i asking for?
-  version_t  get_version() { return st.version; }
-  version_t  get_old_version() { return st.old_version; }
+  const int        get_pg_role() { return st.pg_role; }  // who am i asking for?
+  const version_t  get_version() { return st.version; }
+  const version_t  get_old_version() { return st.old_version; }
 
-  int    get_op() { return st.op; }
-  size_t get_length() { return st.length; }
-  size_t get_offset() { return st.offset; }
+  const int    get_op() { return st.op; }
+  const size_t get_length() { return st.length; }
+  const size_t get_offset() { return st.offset; }
 
   void set_data(bufferlist &d) {
        data.claim(d);
index 674f64914fc68143c2771d41d75ad874eff40841..6830490f75ff36bf2017e34b2aaea1cee3dd861a 100644 (file)
@@ -8,14 +8,18 @@ class MOSDPGPeer : public Message {
   __uint64_t       map_version;
   list<pg_t> pg_list;
 
+  bool complete;
+
  public:
   __uint64_t get_version() { return map_version; }
   list<pg_t>& get_pg_list() { return pg_list; }
+  bool get_complete() { return complete; }
 
   MOSDPGPeer() {}
-  MOSDPGPeer(__uint64_t v, list<pg_t>& l) :
+  MOSDPGPeer(__uint64_t v, list<pg_t>& l, bool c=false) :
        Message(MSG_OSD_PG_PEER) {
        this->map_version = v;
+       this->complete = c;
        pg_list.splice(pg_list.begin(), l);
   }
   
@@ -23,12 +27,15 @@ class MOSDPGPeer : public Message {
 
   void encode_payload() {
        payload.append((char*)&map_version, sizeof(map_version));
+       payload.append((char*)&complete, sizeof(complete));
        _encode(pg_list, payload);
   }
   void decode_payload() {
        int off = 0;
        payload.copy(off, sizeof(map_version), (char*)&map_version);
        off += sizeof(map_version);
+       payload.copy(off, sizeof(complete), (char*)&complete);
+       off += sizeof(complete);
        _decode(pg_list, payload, off);
   }
 };
index 7e9172fe7b57a9484060e504d525e8156abc8a42..146f77c2aba63135d3f2a05ad8d75cecb926f28b 100644 (file)
@@ -7,20 +7,23 @@ class MOSDPGUpdate : public Message {
   version_t   map_version;
   pg_t        pgid;
   //pginfo_t    info;
-  bool complete;
+  bool        complete;
+  version_t   last_any_complete;
 
  public:
   version_t get_version() { return map_version; }
   pg_t get_pgid() { return pgid; }
   //pginfo_t& get_pginfo() { return info; }
   bool is_complete() { return complete; }
+  version_t get_last_any_complete() { return last_any_complete; }
 
   MOSDPGUpdate() {}
-  MOSDPGUpdate(version_t mv, pg_t pgid, bool complete) :
+  MOSDPGUpdate(version_t mv, pg_t pgid, bool complete, version_t last_any_complete) :
        Message(MSG_OSD_PG_UPDATE) {
        this->map_version = mv;
        this->pgid = pgid;
        this->complete = complete;
+       this->last_any_complete = last_any_complete;
   }
   
   char *get_type_name() { return "PGUp"; }
@@ -29,6 +32,7 @@ class MOSDPGUpdate : public Message {
        payload.append((char*)&map_version, sizeof(map_version));
        payload.append((char*)&pgid, sizeof(pgid));
        payload.append((char*)&complete, sizeof(complete));
+       payload.append((char*)&last_any_complete, sizeof(last_any_complete));
   }
   void decode_payload() {
        int off = 0;
@@ -38,6 +42,8 @@ class MOSDPGUpdate : public Message {
        off += sizeof(pgid);
        payload.copy(off, sizeof(complete), (char*)&complete);
        off += sizeof(complete);
+       payload.copy(off, sizeof(last_any_complete), (char*)&last_any_complete);
+       off += sizeof(last_any_complete);
   }
 };
 
index 33d0c07fd02e621bfae123731057c30330a819fa..9d94be382a40f2a9b9d8a8f26d586ed8ea474de3 100644 (file)
@@ -33,6 +33,8 @@ map<int, FakeMessenger*>      directory;
 hash_map<int, Logger*>        loggers;
 LogType fakemsg_logtype;
 
+set<FakeMessenger*>           shutdown_set;
+
 Mutex lock;
 Cond  cond;
 
@@ -47,15 +49,17 @@ class C_FakeKicker : public Context {
   void finish(int r) {
        dout(18) << "timer kick" << endl;
        pending_timer = true;
+       lock.Lock();
        cond.Signal();  // why not
+       lock.Unlock();
   }
 };
 
-
 void *fakemessenger_thread(void *ptr) 
 {
-  dout(1) << "thread start, setting timer kicker" << endl;
-  g_timer.set_messenger_kicker(new C_FakeKicker());
+  //dout(1) << "thread start, setting timer kicker" << endl;
+  //g_timer.set_messenger_kicker(new C_FakeKicker());
+  msgr_callback_kicker = new C_FakeKicker();
 
   lock.Lock();
   while (1) {
@@ -73,8 +77,10 @@ void *fakemessenger_thread(void *ptr)
   }
   lock.Unlock();
 
-  cout << "unsetting messenger kicker" << endl;
-  g_timer.unset_messenger_kicker();
+  cout << "unsetting messenger" << endl;
+  //g_timer.unset_messenger_kicker();
+  g_timer.unset_messenger();
+  msgr_callback_kicker = 0;
 
   dout(1) << "thread finish (i woke up but no messages, bye)" << endl;
   return 0;
@@ -128,12 +134,18 @@ int fakemessenger_do_loop_2()
        
        dout(18) << "do_loop top" << endl;
 
-       // timer?
+       /*// timer?
        if (pending_timer) {
          pending_timer = false;
          dout(5) << "pending timer" << endl;
          g_timer.execute_pending();
        }
+       */
+
+       // callbacks
+       lock.Unlock();
+       messenger_do_callbacks();
+       lock.Lock();
 
        // messages
        map<int, FakeMessenger*>::iterator it = directory.begin();
@@ -176,11 +188,27 @@ int fakemessenger_do_loop_2()
          }
        }
        
+       // deal with shutdowns.. dleayed to avoid concurrent directory modification
+       if (!shutdown_set.empty()) {
+         for (set<FakeMessenger*>::iterator it = shutdown_set.begin();
+                  it != shutdown_set.end();
+                  it++) {
+               dout(7) << "fakemessenger: removing " << MSG_ADDR_NICE((*it)->get_myaddr()) << " from directory" << endl;
+               assert(directory.count((*it)->get_myaddr()));
+               directory.erase((*it)->get_myaddr());
+               if (directory.empty()) {
+                 dout(1) << "fakemessenger: last shutdown" << endl;
+                 ::shutdown = true;
+               }
+         }
+         shutdown_set.clear();
+       }
        
        if (!didone)
          break;
   }
 
+
   dout(18) << "do_loop end (no more messages)." << endl;
   //lock.Unlock();
   return 0;
@@ -196,6 +224,8 @@ FakeMessenger::FakeMessenger(long me)  : Messenger(me)
 
   cout << "fakemessenger " << whoami << " messenger is " << this << endl;
 
+  g_timer.set_messenger(this);
+
   /*
   string name;
   name = "m.";
@@ -220,14 +250,17 @@ int FakeMessenger::shutdown()
 {
   //cout << "shutdown on messenger " << this << " has " << num_incoming() << " queued" << endl;
   lock.Lock();
-
   assert(directory.count(whoami) == 1);
+  shutdown_set.insert(this);
+  
+  /*
   directory.erase(whoami);
   if (directory.empty()) {
        dout(1) << "fakemessenger: last shutdown" << endl;
        ::shutdown = true;
        cond.Signal();  // why not
   } 
+  */
 
   /*
   if (loggers[whoami]) {
@@ -275,7 +308,10 @@ int FakeMessenger::send_message(Message *m, msg_addr_t dest, int port, int fromp
 
        // queue
        FakeMessenger *dm = directory[dest];
-       assert(dm);
+       if (!dm) {
+         dout(1) << "** destination " << MSG_ADDR_NICE(dest) << " (" << dest << ") dne" << endl;
+         assert(dm);
+       }
        dm->queue_incoming(m);
 
        dout(5) << "--> sending " << m << " to " << MSG_ADDR_NICE(dest) << endl;//" m " << dm << " has " << dm->num_incoming() << " queued" << endl;
index a753830cb35d479b6e7b79241614082db215efe3..52042cd849828de200ca57a1c45ef50e9759e642 100644 (file)
@@ -111,6 +111,38 @@ ostream& operator<<(ostream& out, Message& m)
 }
 
 
+// -------- 
+// callbacks
+
+Mutex                msgr_callback_lock;
+list<Context*>       msgr_callback_queue;
+Context*             msgr_callback_kicker = 0;
+
+void Messenger::queue_callback(Context *c) {
+  msgr_callback_lock.Lock();
+  msgr_callback_queue.push_back(c);
+  msgr_callback_lock.Unlock();
+
+  msgr_callback_kicker->finish(0);
+}
+
+void messenger_do_callbacks() {
+  // take list
+  msgr_callback_lock.Lock();
+  list<Context*> ls;
+  ls.splice(ls.begin(), msgr_callback_queue);
+  msgr_callback_lock.Unlock();
+
+  // do them
+  for (list<Context*>::iterator it = ls.begin();
+          it != ls.end();
+          it++) {
+       dout(10) << "--- doing callback " << *it << endl;
+       (*it)->finish(0);
+       delete *it;
+  }
+}
+
 // ---------
 // incoming messages
 
index 1ea91e4a7b7be9578a6094171dc1539cfa7a4913..78fdd58681a4e2ddbc3978cba581bc7bd61a9098 100644 (file)
@@ -9,6 +9,7 @@ using namespace std;
 #include "Dispatcher.h"
 #include "common/Mutex.h"
 #include "common/Cond.h"
+#include "include/Context.h"
 
 
 typedef __uint64_t lamport_t;
@@ -23,6 +24,8 @@ class Messenger {
   msg_addr_t           _myaddr;
   lamport_t            lamport_clock;
 
+  // callbacks
+
   // procedure call fun
   long                   _last_pcid;
   Mutex                  _lock;      // protect call_sem, call_reply
@@ -45,6 +48,8 @@ class Messenger {
 
   virtual int shutdown() = 0;
   
+  void queue_callback(Context *c);
+
   // setup
   void set_dispatcher(Dispatcher *d) { dispatcher = d; ready(); }
   Dispatcher *get_dispatcher() { return dispatcher; }
@@ -60,6 +65,10 @@ class Messenger {
   virtual Message* sendrecv(Message *m, msg_addr_t dest, int port=0);
 };
 
+// callbacks
+void messenger_do_callbacks();
+extern Context *msgr_callback_kicker;
+
 
 extern Message *decode_message(msg_envelope_t &env, bufferlist& bl);
 
index abc307f63cf578da729c1f62a29fe9712fa0f339..37d22ef090b8845dcd9c410598b9807f6a8d342f 100644 (file)
@@ -698,6 +698,9 @@ void* tcp_dispatchthread(void*)
 
   while (1) {
 
+       // callbacks?
+       Messenger::do_callbacks();
+
        // timer events?
        if (pending_timer) {
          pending_timer = false;
index 95f6346d3de569e89c55328e9b9a6e8feaa7196d..252addf5ae384c66de7db66b42b33e6852ba9f32 100644 (file)
@@ -56,6 +56,14 @@ int FakeStore::init()
        return r;
   }
 
+  {
+       char name[80];
+       sprintf(name,"osd%d.fakestore.threadpool", whoami);
+       fsync_threadpool = new ThreadPool<FakeStore, pair<int,Context*> >(name, g_conf.osd_fakestore_syncthreads, 
+                                                                                                                                         (void (*)(FakeStore*, pair<int,Context*>*))dofsync, 
+                                                                                                                                         this);
+  }
+
   // all okay.
   return 0;
 }
@@ -67,12 +75,25 @@ int FakeStore::finalize()
   // close collections db files
   close_collections();
 
+  delete fsync_threadpool;
+
   // nothing
   return 0;
 }
 
 
 
+///////////
+
+void FakeStore::do_fsync(int fd, Context *c)
+{
+  ::fsync(fd);
+  ::close(fd);
+  dout(10) << "do_fsync finished on " << fd << " context " << c << endl;
+  c->finish(0);
+  delete c;
+}
+
 
 ////
 
@@ -205,7 +226,7 @@ int FakeStore::truncate(object_t oid, off_t size)
 
 int FakeStore::read(object_t oid, 
                                        size_t len, off_t offset,
-                                       char *buffer) {
+                                       bufferlist& bl) {
   dout(20) << "read " << oid << " len " << len << " off " << offset << endl;
 
   string fn;
@@ -221,7 +242,10 @@ int FakeStore::read(object_t oid,
   off_t actual = lseek(fd, offset, SEEK_SET);
   size_t got = 0;
   if (actual == offset) {
-       got = ::read(fd, buffer, len);
+       bufferptr bptr = new buffer(len);  // prealloc space for entire read
+       got = ::read(fd, bptr.c_str(), len);
+       bptr.set_length(got);   // properly size the buffer
+       bl.push_back( bptr );   // put it in the target bufferlist
   }
   ::flock(fd, LOCK_UN);
   ::close(fd);
@@ -230,7 +254,7 @@ int FakeStore::read(object_t oid,
 
 int FakeStore::write(object_t oid,
                                         size_t len, off_t offset,
-                                        char *buffer,
+                                        bufferlist& bl,
                                         bool do_fsync) {
   dout(20) << "write " << oid << " len " << len << " off " << offset << endl;
 
@@ -249,10 +273,23 @@ int FakeStore::write(object_t oid,
   ::flock(fd, LOCK_EX);    // lock for safety
   //::fchmod(fd, 0664);
   
+  // seek
   off_t actual = lseek(fd, offset, SEEK_SET);
   int did = 0;
   assert(actual == offset);
-  did = ::write(fd, buffer, len);
+
+  // write buffers
+  for (list<bufferptr>::iterator it = bl.buffers().begin();
+          it != bl.buffers().end();
+          it++) {
+       int r = ::write(fd, (*it).c_str(), (*it).length());
+       if (r > 0)
+         did += r;
+       else {
+         dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
+       }
+  }
+  
   if (did < 0) {
        dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
   }
@@ -266,6 +303,55 @@ int FakeStore::write(object_t oid,
   return did;
 }
 
+int FakeStore::write(object_t oid, 
+                                        size_t len, off_t offset, 
+                                        bufferlist& bl, 
+                                        Context *onsafe)
+{
+  dout(20) << "write " << oid << " len " << len << " off " << offset << endl;
+
+  string fn;
+  get_oname(oid,fn);
+  
+  ::mknod(fn.c_str(), 0644, 0);  // in case it doesn't exist yet.
+
+  int flags = O_WRONLY;//|O_CREAT;
+  int fd = ::open(fn.c_str(), flags);
+  if (fd < 0) {
+       dout(1) << "write couldn't open " << fn.c_str() << " flags " << flags << " errno " << errno << " " << strerror(errno) << endl;
+       return fd;
+  }
+  ::flock(fd, LOCK_EX);    // lock for safety
+  //::fchmod(fd, 0664);
+  
+  // seek
+  off_t actual = lseek(fd, offset, SEEK_SET);
+  int did = 0;
+  assert(actual == offset);
+
+  // write buffers
+  for (list<bufferptr>::iterator it = bl.buffers().begin();
+          it != bl.buffers().end();
+          it++) {
+       int r = ::write(fd, (*it).c_str(), (*it).length());
+       if (r > 0)
+         did += r;
+       else {
+         dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
+       }
+  }
+  
+  if (did < 0) {
+       dout(1) << "couldn't write to " << fn.c_str() << " len " << len << " off " << offset << " errno " << errno << " " << strerror(errno) << endl;
+  }
+
+  ::flock(fd, LOCK_UN);
+
+  // schedule sync
+  queue_fsync(fd, onsafe);
+  
+  return did;
+}
 
 
 
index 4e5594c073c0a2143c153325d164c050b2b67907..384b01abfc264e2f7264b0cc23d007bed38f2127 100644 (file)
@@ -3,10 +3,12 @@
 
 #include "ObjectStore.h"
 #include "BDBMap.h"
+#include "common/ThreadPool.h"
 
 #include <map>
 using namespace std;
 
+
 class FakeStore : public ObjectStore {
   string basedir;
   int whoami;
@@ -17,6 +19,19 @@ class FakeStore : public ObjectStore {
   void wipe_dir(string mydir);
 
 
+  // async fsync
+  class ThreadPool<class FakeStore, pair<int, class Context*> >  *fsync_threadpool;
+  void queue_fsync(int fd, class Context *c) {
+       fsync_threadpool->put_op(new pair<int, class Context*>(fd,c));
+  }
+ public:
+  void do_fsync(int fd, class Context *c);
+  static void dofsync(class FakeStore *f, pair<int, class Context*> *af) {
+       f->do_fsync(af->first, af->second);
+       delete af;
+  }
+
+
  public:
   FakeStore(char *base, int whoami);
 
@@ -33,11 +48,15 @@ class FakeStore : public ObjectStore {
   int truncate(object_t oid, off_t size);
   int read(object_t oid, 
                   size_t len, off_t offset,
-                  char *buffer);
+                  bufferlist& bl);
   int write(object_t oid,
                        size_t len, off_t offset,
-                       char *buffer,
+                       bufferlist& bl,
                        bool fsync);
+  int write(object_t oid, 
+                       size_t len, off_t offset, 
+                       bufferlist& bl, 
+                       Context *onsafe);
 
   int setattr(object_t oid, const char *name,
                                void *value, size_t size);
index dd8372a368cca7a690d09649ebfe6caa91e7fced..8cb193b7b08f660f7671f43b3710fc5ed63f8bea 100644 (file)
 #include "messages/MOSDPGPeerAck.h"
 #include "messages/MOSDPGUpdate.h"
 
-//#include "messages/MOSDPGQuery.h"
-//#include "messages/MOSDPGQueryReply.h"
-
 #include "common/Logger.h"
 #include "common/LogType.h"
-
+#include "common/Timer.h"
 #include "common/ThreadPool.h"
 
 #include <iostream>
@@ -54,6 +51,7 @@ char *osd_base_path = "./osddata";
 #define ROLE_TYPE(x)   ((x)>0 ? 1:(x))
 
 
+
 // cons/des
 
 LogType osd_logtype;
@@ -272,12 +270,6 @@ void OSD::dispatch(Message *m)
                handle_pg_update((MOSDPGUpdate*)m);
                break;
 
-               /*
-         case MSG_OSD_PG_QUERYREPLY:
-               handle_pg_query_reply((MOSDPGQueryReply*)m);
-               break;
-               */
-
          case MSG_OSD_OP:
                monitor->host_is_alive(m->get_source());
                handle_op((MOSDOp*)m);
@@ -313,6 +305,13 @@ void OSD::dispatch(Message *m)
 
 void OSD::handle_op_reply(MOSDOpReply *m)
 {
+  // did i get a new osdmap?
+  if (m->get_map_version() > osdmap->get_version()) {
+       dout(3) << "replica op reply includes a new osd map" << endl;
+       update_map(m->get_osdmap());
+  }
+
+  // handle op
   switch (m->get_op()) {
   case OSD_OP_REP_PULL:
        op_rep_pull_reply(m);
@@ -327,7 +326,7 @@ void OSD::handle_op_reply(MOSDOpReply *m)
   case OSD_OP_REP_WRITE:
   case OSD_OP_REP_TRUNCATE:
   case OSD_OP_REP_DELETE:
-       ack_replica_op(m->get_tid(), m->get_result(), MSG_ADDR_NUM(m->get_source()));
+       ack_replica_op(m->get_tid(), m->get_result(), m->get_safe(), MSG_ADDR_NUM(m->get_source()));
        delete m;
        break;
 
@@ -336,53 +335,139 @@ void OSD::handle_op_reply(MOSDOpReply *m)
   }
 }
 
-void OSD::ack_replica_op(__uint64_t tid, int result, int fromosd)
+void OSD::ack_replica_op(__uint64_t tid, int result, bool safe, int fromosd)
 {
-  replica_write_lock.Lock();
+  //replica_write_lock.Lock();
+
+  if (!replica_ops.count(tid)) {
+       dout(7) << "not waiting for tid " << tid << " replica op reply, map must have changed, dropping." << endl;
+       return;
+  }
   
-  if (replica_writes.count(tid)) {
-       MOSDOp *op = replica_writes[tid];
-       dout(7) << "rep_write_reply ack tid " << tid << " orig op " << op << endl;
+  
+  OSDReplicaOp *repop = replica_ops[tid];
+  MOSDOp *op = repop->op;
+  pg_t pgid = op->get_pg();
+
+  dout(7) << "ack_replica_op " << tid << " op " << op << " result " << result << " safe " << safe << " from osd" << fromosd << endl;
+  dout(15) << " repop was: op " << repop->op << " waitfor ack=" << repop->waitfor_ack << " sync=" << repop->waitfor_sync << " localsync=" << repop->local_sync << " cancel=" << repop->cancel << "  osd=" << repop->osds << endl;
+
+
+  if (result >= 0) {
+       // success
        
-       replica_writes.erase(tid);
-       replica_write_tids[op].erase(tid);
-               
-       pg_t pgid = op->get_pg();
+       if (safe) {
+         // sync
+         repop->waitfor_sync.erase(tid);
+         repop->waitfor_ack.erase(tid);
+         replica_ops.erase(tid);
+         
+         replica_pg_osd_tids[pgid][fromosd].erase(tid);
+         if (replica_pg_osd_tids[pgid][fromosd].empty()) replica_pg_osd_tids[pgid].erase(fromosd);
+         if (replica_pg_osd_tids[pgid].empty()) replica_pg_osd_tids.erase(pgid);
+
+         // send 'safe' to client?
+         if (repop->can_send_sync()) {
+               MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
+               messenger->send_message(reply, op->get_asker());
+               delete op;
+               delete repop;
+         }
+       } else {
+         // ack
+         repop->waitfor_ack.erase(tid);
+
+         // send 'ack' to client?
+         if (repop->can_send_ack()) {
+               MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+               messenger->send_message(reply, op->get_asker());
+         }
+       }
+
+  } else {
+       // failure
+       
+       // forget about this failed attempt..
+       repop->osds.erase(fromosd);
+       repop->waitfor_ack.erase(tid);
+       repop->waitfor_sync.erase(tid);
+
+       replica_ops.erase(tid);
 
        replica_pg_osd_tids[pgid][fromosd].erase(tid);
        if (replica_pg_osd_tids[pgid][fromosd].empty()) replica_pg_osd_tids[pgid].erase(fromosd);
        if (replica_pg_osd_tids[pgid].empty()) replica_pg_osd_tids.erase(pgid);
-       
-       if (replica_write_tids[op].empty()) {
-         // reply?
-         if (replica_write_local.count(op)) {
-               replica_write_local.erase(op);
-               
-               if (result >= 0) {
-                 dout(7) << "last one, replying to write op" << endl;
-                 
-                 // written locally too, reply
-                 MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
-                 messenger->send_message(reply, op->get_asker());
-                 delete op;
-               } else {
-                 dout(7) << "last one, but replica write failed, resubmit" << endl;
-                 finished.push_back(op);  // handle_op will fw this to new primary, probably!
-               }
-               replica_write_result.erase(op);
-         } else {
-               // not yet written locally.
-               dout(9) << "not yet written locally, still waiting for that" << endl;
-               replica_write_result[op] = -1;
+
+       bool did = false;
+       PG *pg = get_pg(pgid);
+
+       // am i no longer the primary?
+       if (pg->get_primary() != whoami) {
+         // oh, it wasn't a replica.. primary must have changed
+         dout(4) << "i'm no longer the primary for " << *pg << endl;
+
+         // retry the whole thing
+         finished.push_back(repop->op);
+
+         // clean up
+         for (map<__uint64_t,int>::iterator it = repop->waitfor_ack.begin();
+                  it != repop->waitfor_ack.end();
+                  it++) {
+               replica_ops.erase(it->first);
+               replica_pg_osd_tids[pgid][it->second].erase(it->first);
+               if (replica_pg_osd_tids[pgid][it->second].empty()) replica_pg_osd_tids[pgid].erase(it->second);
+         }
+         for (map<__uint64_t,int>::iterator it = repop->waitfor_sync.begin();
+                  it != repop->waitfor_sync.end();
+                  it++) {
+               replica_ops.erase(it->first);
+               replica_pg_osd_tids[pgid][it->second].erase(it->first);
+               if (replica_pg_osd_tids[pgid][it->second].empty()) replica_pg_osd_tids[pgid].erase(it->second);
          }
-         replica_write_tids.erase(op);
+         if (replica_pg_osd_tids[pgid].empty()) replica_pg_osd_tids.erase(pgid);
+
+         if (repop->local_sync)
+               delete repop;
+         else {
+               repop->op = 0;      // we're forwarding it
+               repop->cancel = true;     // will get deleted by local sync callback
+         }
+         did = true;
        }
-       
-  } else {
-       dout(7) << "not waiting for tid " << tid << " rep_write reply, map must have changed, dropping." << endl;
+
+       /* no!  don't do this, not without checking complete/clean-ness
+       else {
+         // i am still primary.
+         // re-issue replica op to a moved replica?
+         for (unsigned i=1; i<pg->acting.size(); i++) {
+               if (repop->osds.count(pg->acting[i])) continue;
+               issue_replica_op(pg, repop, pg->acting[i]);
+               did = true;
+         }     
+       }
+       */
+
+       if (!did) {
+         // an osd musta just gone down or somethin.  are we "done" now?
+         
+         // send 'safe' to client?
+         if (repop->can_send_sync()) {
+               MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
+               messenger->send_message(reply, op->get_asker());
+               delete op;
+               delete repop;
+         }
+         
+         // send 'ack' to client?
+         else if (repop->can_send_ack()) {
+               MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+               messenger->send_message(reply, op->get_asker());
+         }       
+       }
+
   }
-  
-  replica_write_lock.Unlock();
+
+
 }
 
 
@@ -421,26 +506,69 @@ void OSD::wait_for_new_map(Message *m)
 /** update_map
  * assimilate a new OSDMap.  scan pgs.
  */
-/*
-void OSD::update_map(bufferlist& state)
+void OSD::update_map(bufferlist& state, bool mkfs)
 {
   // decode new map
-  if (!osdmap) osdmap = new OSDMap();
+  osdmap = new OSDMap();
   osdmap->decode(state);
-  dout(7) << "update_map version " << osdmap->get_version() << endl;
-
   osdmaps[osdmap->get_version()] = osdmap;
+  dout(7) << "got osd map version " << osdmap->get_version() << endl;
+       
+  // pg list
+  list<pg_t> pg_list;
+  
+  if (mkfs) {
+       // create PGs
+       for (int nrep = 2; nrep <= g_conf.osd_max_rep; nrep++) {
+         ps_t maxps = 1LL << osdmap->get_pg_bits();
+         for (pg_t ps = 0; ps < maxps; ps++) {
+               pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep);
+               vector<int> acting;
+               osdmap->pg_to_acting_osds(pgid, acting);
+               
+               
+               if (acting[0] == whoami) {
+                 PG *pg = create_pg(pgid);
+                 pg->acting = acting;
+                 pg->set_role(0);
+                 pg->set_primary_since(osdmap->get_version());
+                 pg->mark_complete( osdmap->get_version() );
+                 
+                 dout(7) << "created " << *pg << endl;
+                 
+                 pg_list.push_back(pgid);
+               }
+         }
+       }
+  } else {
+       // get pg list
+       get_pg_list(pg_list);
+  }
+  
+  // use our new map(s)
+  advance_map(pg_list);
+  activate_map(pg_list);
+  
+  if (mkfs) {
+       // mark all peers complete
+       for (list<pg_t>::iterator pgid = pg_list.begin();
+                pgid != pg_list.end();
+                pgid++) {
+         PG *pg = get_pg(*pgid);
+         for (map<int,PGPeer*>::iterator it = pg->peers.begin();
+                  it != pg->peers.end();
+                  it++) {
+               PGPeer *p = it->second;
+               //dout(7) << " " << *pg << " telling peer osd" << p->get_peer() << " they are complete" << endl;
+               messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
+                                                               MSG_ADDR_OSD(p->get_peer()));
+         }
+       }
+  }
 
-  // FIXME mutliple maps?
-
-  // scan PGs
-  list<pg_t> ls;
-  get_pg_list(ls);
-
-  advance_map(ls);
-  activate_map(ls);
+  // process waiters
+  take_waiters(waiting_for_osdmap);
 }
-*/
 
 void OSD::handle_osd_map(MOSDMap *m)
 {
@@ -462,49 +590,7 @@ void OSD::handle_osd_map(MOSDMap *m)
          dout(3) << "handle_osd_map got osd map version " << m->get_version() << endl;
        }
 
-       // decode new map
-       osdmap = new OSDMap();
-       osdmap->decode(m->get_osdmap());
-       osdmaps[osdmap->get_version()] = osdmap;
-       dout(7) << "got osd map version " << osdmap->get_version() << endl;
-       
-       // pg list
-       list<pg_t> pg_list;
-
-       if (m->is_mkfs()) {
-         // create PGs
-         for (int nrep = 2; nrep <= g_conf.osd_max_rep; nrep++) {
-               ps_t maxps = 1LL << osdmap->get_pg_bits();
-               for (pg_t ps = 0; ps < maxps; ps++) {
-                 pg_t pgid = osdmap->ps_nrep_to_pg(ps, nrep);
-                 vector<int> acting;
-                 osdmap->pg_to_acting_osds(pgid, acting);
-                 
-                 
-                 if (acting[0] == whoami) {
-                       PG *pg = create_pg(pgid);
-                       pg->acting = acting;
-                       pg->set_role(0);
-                       pg->set_primary_since(osdmap->get_version());
-                       pg->state_set(PG_STATE_COMPLETE);
-                       
-                       dout(7) << "created " << *pg << endl;
-                       
-                       pg_list.push_back(pgid);
-                 }
-               }
-         }
-       } else {
-         // get pg list
-         get_pg_list(pg_list);
-       }
-
-       // use our new map(s)
-       advance_map(pg_list);
-       activate_map(pg_list);
-
-       // process waiters
-       take_waiters(waiting_for_osdmap);
+       update_map(m->get_osdmap(), m->is_mkfs());
 
   } else {
        dout(3) << "handle_osd_map ignoring osd map version " << m->get_version() << " <= " << osdmap->get_version() << endl;
@@ -626,7 +712,6 @@ PG *OSD::create_pg(pg_t pgid)
 
   PG *pg = new PG(whoami, pgid);
   //pg->info.created = osdmap->get_version();
-  pg->last_complete = osdmap->get_version();
   
   pg->store(store);
   pg_map[pgid] = pg;
@@ -694,6 +779,18 @@ void OSD::advance_map(list<pg_t>& ls)
          if (pg->get_role() == 0) {
                // drop peers
                take_waiters(pg->waiting_for_peered);
+               for (hash_map<object_t, list<Message*> >::iterator it = pg->waiting_for_missing_object.begin();
+                        it != pg->waiting_for_missing_object.end();
+                        it++)
+                 take_waiters(it->second);
+               pg->waiting_for_missing_object.clear();
+
+               for (hash_map<object_t, list<Message*> >::iterator it = pg->waiting_for_clean_object.begin();
+                        it != pg->waiting_for_clean_object.end();
+                        it++)
+                 take_waiters(it->second);
+               pg->waiting_for_clean_object.clear();
+
                pg->drop_peers();
                pg->state_clear(PG_STATE_CLEAN);
                pg->discard_recovery_plan();
@@ -753,7 +850,7 @@ void OSD::advance_map(list<pg_t>& ls)
                for (set<__uint64_t>::iterator tid = s.begin();
                         tid != s.end();
                         tid++)
-                 ack_replica_op(*tid, -1, *down);
+                 ack_replica_op(*tid, -1, false, *down);
          }
        }
 
@@ -765,7 +862,7 @@ void OSD::activate_map(list<pg_t>& ls)
 {
   dout(7) << "activate_map version " << osdmap->get_version() << endl;
 
-  map< int, map<pg_t, version_t> > notify_list;  // primary -> pgid -> last_complete
+  map< int, map<pg_t, version_t> > notify_list;  // primary -> pgid -> last_any_complete
   map< int, map<PG*,int> >   start_map;    // peer -> PG -> peer_role
 
   // scan pg's
@@ -782,7 +879,7 @@ void OSD::activate_map(list<pg_t>& ls)
        } 
        else if (pg->is_stray()) {
          // i am residual|replica
-         notify_list[pg->get_primary()][pgid] = pg->get_last_complete();
+         notify_list[pg->get_primary()][pgid] = pg->get_last_any_complete();
        }
   }  
 
@@ -816,13 +913,13 @@ void OSD::peer_notify(int primary, map<pg_t, version_t>& pg_list)
 
 void OSD::start_peers(PG *pg, map< int, map<PG*,int> >& start_map) 
 {
-  dout(10) << " " << *pg << " was last_complete " << pg->get_last_complete() << endl;
+  dout(10) << " " << *pg << " last_any_complete " << pg->get_last_any_complete() << endl;
 
   // determine initial peer set
   map<int,int> peerset;  // peer -> role
   
   // prior map(s), if OSDs are still up
-  for (version_t epoch = pg->get_last_complete();
+  for (version_t epoch = pg->get_last_any_complete();
           epoch < osdmap->get_version();
           epoch++) {
        OSDMap *omap = get_osd_map(epoch);
@@ -870,7 +967,9 @@ void OSD::start_peers(PG *pg, map< int, map<PG*,int> >& start_map)
          !pg->is_peered()) {
        dout(10) << " " << *pg << " already has necessary peers, analyzing" << endl;
        pg->mark_peered();
-       pg->plan_recovery(store, osdmap->get_version());
+       take_waiters(pg->waiting_for_peered);
+
+       plan_recovery(pg);
        do_recovery(pg);
   }
 }
@@ -928,7 +1027,7 @@ bool OSD::require_current_map(Message *m, version_t v)
 
   // down?
   if (osdmap->is_down(from)) {
-       dout(7) << "  from down OSD osd" << from << ", pinging" << endl;
+       dout(7) << "  from down OSD osd" << from << ", dropping" << endl;
        // FIXME
        return false;
   }
@@ -1001,7 +1100,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
          assert(pg->acting[0] == whoami);
          pg->set_role(0);
          pg->set_primary_since( osdmap->get_version() );  // FIXME: this may miss a few epochs!
-         pg->set_last_complete( 0 );
+         pg->mark_any_complete( it->second );
 
          dout(10) << " " << *pg << " is new, nrep=" << nrep << endl;     
 
@@ -1020,6 +1119,9 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
          assert(0);
        }
 
+       if (it->second > pg->get_last_any_complete())
+         pg->mark_any_complete( it->second );
+
        // peered with this guy specifically?
        PGPeer *pgp = pg->get_peer(from);
        if (!pgp && 
@@ -1081,6 +1183,8 @@ void OSD::handle_pg_peer(MOSDPGPeer *m)
          pg->acting = acting;
          pg->set_role(role);
 
+         //if (m->get_version() == 1) pg->mark_complete();   // hack... need a more elegant solution
+
          dout(10) << " " << *pg << " dne (before), but i am role " << role << endl;
 
          // take any waiters
@@ -1095,6 +1199,7 @@ void OSD::handle_pg_peer(MOSDPGPeer *m)
        // report back state and pg content
        ack->pg_state[pgid].state = pg->get_state();
        ack->pg_state[pgid].last_complete = pg->get_last_complete();
+       ack->pg_state[pgid].last_any_complete = pg->get_last_any_complete();
        pg->scan_local_objects(ack->pg_state[pgid].objects, store);     // list my objects
        
        // i am now peered
@@ -1145,11 +1250,16 @@ void OSD::handle_pg_peer_ack(MOSDPGPeerAck *m)
        assert(pg);
 
        dout(10) << " " << *pg << " osd" << from << " remote state " << it->second.state 
-                        << " w/ " << it->second.objects.size() << " objects" << endl;
+                        << " w/ " << it->second.objects.size() << " objects"
+                        << ", last_complete " << it->second.last_complete 
+                        << ", last_any_complete " << it->second.last_any_complete 
+                        << endl;
 
        PGPeer *pgp = pg->get_peer(from);
        assert(pgp);
        
+       pg->mark_any_complete( it->second.last_any_complete );
+
        pgp->last_complete = it->second.last_complete;
        pgp->objects = it->second.objects;
        pgp->state_set(PG_PEER_STATE_ACTIVE);
@@ -1172,7 +1282,7 @@ void OSD::handle_pg_peer_ack(MOSDPGPeerAck *m)
                take_waiters(pg->waiting_for_peered);
 
                dout(10) << " " << *pg << " fully peered, analyzing" << endl;
-               pg->plan_recovery(store, osdmap->get_version());
+               plan_recovery(pg);
                do_recovery(pg);
          } else {
                // we're already peered.
@@ -1190,7 +1300,10 @@ void OSD::handle_pg_peer_ack(MOSDPGPeerAck *m)
 void OSD::handle_pg_update(MOSDPGUpdate *m)
 {
   int from = MSG_ADDR_NUM(m->get_source());
-  dout(7) << "handle_pg_update on " << hex << m->get_pgid() << dec << " from osd" << from << endl;
+  dout(7) << "handle_pg_update on " << hex << m->get_pgid() << dec << " from osd" << from 
+                 << " complete=" << m->is_complete() 
+                 << " last_any_complete=" << m->get_last_any_complete()
+                 << endl;
   
   PG *pg = get_pg(m->get_pgid());
   if (!require_current_pg_primary(m, m->get_version(), pg)) return;
@@ -1203,8 +1316,12 @@ void OSD::handle_pg_update(MOSDPGUpdate *m)
        //pg->assim_info( m->get_pginfo() );
        
        // complete?
-       if (m->is_complete()) 
-         pg->mark_complete();
+       if (m->is_complete()) {
+         pg->mark_complete( osdmap->get_version() );
+       }
+
+       if (m->get_last_any_complete()) 
+         pg->mark_any_complete( m->get_last_any_complete() );
 
        pg->store(store);
   }
@@ -1216,15 +1333,32 @@ void OSD::handle_pg_update(MOSDPGUpdate *m)
 
 // RECOVERY
 
+void OSD::plan_recovery(PG *pg) 
+{
+  version_t current_version = osdmap->get_version();
+  
+  list<PGPeer*> complete_peers;
+  pg->plan_recovery(store, current_version, complete_peers);
+
+  for (list<PGPeer*>::iterator it = complete_peers.begin();
+          it != complete_peers.end();
+          it++) {
+       PGPeer *p = *it;
+       dout(7) << " " << *pg << " telling peer osd" << p->get_peer() << " they are complete" << endl;
+       messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
+                                                       MSG_ADDR_OSD(p->get_peer()));
+  }
+}
+
 void OSD::do_recovery(PG *pg)
 {
   // recover
-  if (!pg->is_complete()) {
+  if (!pg->is_complete(osdmap->get_version())) {
        pg_pull(pg, max_recovery_ops);
   }
   
   // replicate
-  if (pg->is_complete()) {
+  if (pg->is_complete( osdmap->get_version() )) {
        if (!pg->objects_unrep.empty()) 
          pg_push(pg, max_recovery_ops);
        if (!pg->objects_stray.empty()) 
@@ -1262,8 +1396,7 @@ void OSD::pull_replica(PG *pg, object_t oid)
   dout(7) << "pull_replica " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl;
 
   // add to fetching list
-  p->pull(oid, v);
-  pg->objects_pulling[oid] = p;
+  pg->pulling(oid, v, p);
 
   // send op
   __uint64_t tid = ++last_tid;
@@ -1295,17 +1428,14 @@ void OSD::op_rep_pull(MOSDOp *op)
   assert(v == op->get_version());
   
   // read
-  bufferptr bptr = new buffer(st.st_size);   // prealloc space for entire read
+  bufferlist bl;
   long got = store->read(op->get_oid(), 
                                                 st.st_size, 0,
-                                                bptr.c_str());
+                                                bl);
   assert(got == st.st_size);
   
   // reply
   MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); 
-  bptr.set_length(got);   // properly size the buffer
-  bufferlist bl;
-  bl.push_back( bptr );
   reply->set_result(0);
   reply->set_data(bl);
   reply->set_length(got);
@@ -1333,7 +1463,7 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op)
   osd_lock.Unlock();
 
   // write it and add it to the PG
-  store->write(o, op->get_length(), 0, op->get_data().c_str());
+  store->write(o, op->get_length(), 0, op->get_data());
   p->pg->add_object(store, o);
 
   store->setattr(o, "version", &v, sizeof(v));
@@ -1343,7 +1473,22 @@ void OSD::op_rep_pull_reply(MOSDOpReply *op)
   pull_ops.erase(op->get_tid());
 
   pg->pulled(o, v, p);
-  
+
+  // now complete?
+  if (pg->objects_missing.empty()) {
+       pg->mark_complete(osdmap->get_version());
+
+       // distribute new last_any_complete
+       dout(7) << " " << *pg << " now complete, updating last_any_complete on peers" << endl;
+       for (map<int,PGPeer*>::iterator it = pg->peers.begin();
+                it != pg->peers.end();
+                it++) {
+         PGPeer *p = it->second;
+         messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), false, osdmap->get_version()),
+                                                         MSG_ADDR_OSD(p->get_peer()));
+       }
+  }
+
   // finish waiters
   if (pg->waiting_for_missing_object.count(o)) 
        take_waiters(pg->waiting_for_missing_object[o]);
@@ -1375,7 +1520,6 @@ void OSD::pg_push(PG *pg, int maxops)
        push_replica(pg, oid);
        ops++;
   }  
-  
 }
 
 void OSD::push_replica(PG *pg, object_t oid)
@@ -1386,13 +1530,14 @@ void OSD::push_replica(PG *pg, object_t oid)
 
   set<int>& peers = pg->objects_unrep[oid];
 
-  dout(7) << "push_replica " << hex << oid << dec << " v " << v << " to osds " << peers << endl;
-  
   // load object content
   struct stat st;
   store->stat(oid, &st);
-  bufferptr b = new buffer(st.st_size);
-  store->read(oid, st.st_size, 0, b.c_str());
+  bufferlist bl;
+  store->read(oid, st.st_size, 0, bl);
+  assert(bl.length() == st.st_size);
+
+  dout(7) << "push_replica " << hex << oid << dec << " v " << v << " to osds " << peers << "  size " << st.st_size << endl;
 
   for (set<int>::iterator pit = peers.begin();
           pit != peers.end();
@@ -1401,8 +1546,7 @@ void OSD::push_replica(PG *pg, object_t oid)
        assert(p);
        
        // add to list
-       p->push(oid, v);
-       pg->objects_pushing[oid].insert(p);
+       pg->pushing(oid, v, p);
 
        // send op
        __uint64_t tid = ++last_tid;
@@ -1414,7 +1558,8 @@ void OSD::push_replica(PG *pg, object_t oid)
        op->set_pg_role(-1);  // whatever, not 0
        
        // include object content
-       op->get_data().append(b);
+       //op->set_data(bl);  // no no bad, will modify bl
+       op->get_data() = bl;  // _copy_ bufferlist, we may have multiple destinations!
        op->set_length(st.st_size);
        op->set_offset(0);
        
@@ -1431,6 +1576,9 @@ void OSD::op_rep_push(MOSDOp *op)
   dout(7) << "rep_push on " << hex << op->get_oid() << dec << " v " << op->get_version() <<  endl;
   lock_object(op->get_oid());
 
+  PG *pg = get_pg(op->get_pg());
+  assert(pg);
+
   // exists?
   if (store->exists(op->get_oid())) {
        store->truncate(op->get_oid(), 0);
@@ -1441,20 +1589,12 @@ void OSD::op_rep_push(MOSDOp *op)
   }
 
   // write out buffers
-  bufferlist bl;
-  bl.claim( op->get_data() );
-
-  off_t off = 0;
-  for (list<bufferptr>::iterator it = bl.buffers().begin();
-          it != bl.buffers().end();
-          it++) {
-       int r = store->write(op->get_oid(),
-                                                (*it).length(), off,
-                                                (*it).c_str(), 
-                                                false);  // write async, no rush
-       assert((unsigned)r == (*it).length());
-       off += (*it).length();
-  }
+  int r = store->write(op->get_oid(),
+                                          op->get_length(), 0,
+                                          op->get_data(),
+                                          false);       // FIXME
+  pg->add_object(store, op->get_oid());
+  assert(r >= 0);
 
   // set version
   version_t v = op->get_version();
@@ -1488,7 +1628,7 @@ void OSD::op_rep_push_reply(MOSDOpReply *op)
 
   if (p->is_complete()) {
        dout(7) << " telling replica they are complete" << endl;
-       messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true),
+       messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
                                                        MSG_ADDR_OSD(p->get_peer()));
   }
 
@@ -1541,9 +1681,9 @@ void OSD::remove_replica(PG *pg, object_t oid)
        assert(p);
        const version_t v = it->second;
 
-       p->remove(oid, v);
-       pg->objects_removing[oid][p] = v;
-  
+       // add to list
+       pg->removing(oid, v, p);
+
        // send op
        __uint64_t tid = ++last_tid;
        MOSDOp *op = new MOSDOp(tid, messenger->get_myaddr(),
@@ -1603,7 +1743,7 @@ void OSD::op_rep_remove_reply(MOSDOpReply *op)
   
   if (p->is_complete()) {
        dout(7) << " telling replica they are complete" << endl;
-       messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true),
+       messenger->send_message(new MOSDPGUpdate(osdmap->get_version(), pg->get_pgid(), true, osdmap->get_version()),
                                                        MSG_ADDR_OSD(p->get_peer()));
   }
 
@@ -1616,6 +1756,26 @@ void OSD::op_rep_remove_reply(MOSDOpReply *op)
 }
 
 
+class C_OSD_RepModifySync : public Context {
+public:
+  OSD *osd;
+  MOSDOp *op;
+  C_OSD_RepModifySync(OSD *o, MOSDOp *oo) : osd(o), op(oo) { }
+  void finish(int r) {
+       osd->op_rep_modify_sync(op);
+  }
+};
+
+void OSD::op_rep_modify_sync(MOSDOp *op)
+{
+  osd_lock.Lock();
+  dout(2) << "rep_modify_sync on op " << op << endl;
+  MOSDOpReply *ack2 = new MOSDOpReply(op, 0, osdmap, true);
+  messenger->send_message(ack2, op->get_asker());
+  delete op;
+  osd_lock.Unlock();
+}
+
 void OSD::op_rep_modify(MOSDOp *op)
 { 
   // when we introduce unordered messaging.. FIXME
@@ -1629,25 +1789,16 @@ void OSD::op_rep_modify(MOSDOp *op)
   // PG
   PG *pg = get_pg(op->get_pg());
   assert(pg);
-
   
-  dout(12) << "rep_modify " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
-
-  // pre-ack
-  //MOSDOpReply *ack1 = new MOSDOpReply(op, 0, osdmap);
-  //messenger->send_message(ack1, op->get_asker());
-
-  /*
-  // update pg stamp(s)
-  pg->last_modify_stamp = op->get_version();
-  if (pg->is_complete()) 
-       pg->last_complete_stamp = pg->last_modify_stamp;
-  */
+  dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
 
   int r = 0;
+  Context *onsync = 0;
   if (op->get_op() == OSD_OP_REP_WRITE) {
        // write
-       r = apply_write(op, false, op->get_version());
+       assert(op->get_data().length() == op->get_length());
+       onsync = new C_OSD_RepModifySync(this, op);
+       r = apply_write(op, op->get_version(), onsync);
        if (ov == 0) pg->add_object(store, oid);
   } else if (op->get_op() == OSD_OP_REP_DELETE) {
        // delete
@@ -1658,11 +1809,16 @@ void OSD::op_rep_modify(MOSDOp *op)
        r = store->truncate(oid, op->get_offset());
   } else assert(0);
   
-  // ack
-  MOSDOpReply *ack2 = new MOSDOpReply(op, 0, osdmap, true);
-  messenger->send_message(ack2, op->get_asker());
-
-  delete op;
+  if (onsync) {
+       // ack
+       MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
+       messenger->send_message(ack, op->get_asker());
+  } else {
+       // sync, safe
+       MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true);
+       messenger->send_message(ack, op->get_asker());
+       delete op;
+  }
 }
 
 
@@ -1673,6 +1829,8 @@ void OSD::op_rep_modify(MOSDOp *op)
 
 void OSD::handle_op(MOSDOp *op)
 {
+  osd_lock.Lock();
+
   pg_t pgid = op->get_pg();
   PG *pg = get_pg(pgid);
 
@@ -1685,6 +1843,7 @@ void OSD::handle_op(MOSDOp *op)
          // op's is newer
          dout(7) << "op map " << op->get_map_version() << " > " << osdmap->get_version() << endl;
          wait_for_new_map(op);
+         osd_lock.Unlock();
          return;
        }
 
@@ -1702,6 +1861,7 @@ void OSD::handle_op(MOSDOp *op)
                                                                op->get_asker());
                delete op;
          }
+         osd_lock.Unlock();
          return;
        }
 
@@ -1709,26 +1869,71 @@ void OSD::handle_op(MOSDOp *op)
        if (!pg) {
          dout(7) << "hit non-existent pg " << hex << op->get_pg() << dec << ", waiting" << endl;
          waiting_for_pg[pgid].push_back(op);
+         osd_lock.Unlock();
          return;
        }
        else {
-         if (!pg->is_complete()) {
+         dout(7) << "handle_op " << op << " in " << *pg << endl;
+
+         // must be peered.
+         if (!pg->is_peered()) {
+               dout(7) << "op_write " << *pg << " not peered (yet)" << endl;
+               pg->waiting_for_peered.push_back(op);
+               osd_lock.Unlock();
+               return;
+         }
+
+         const object_t oid = op->get_oid();
+
+         if (!pg->is_complete( osdmap->get_version() )) {
                // consult PG object map
-               if (pg->objects_missing.count(op->get_oid())) {
+               if (pg->objects_missing.count(oid)) {
                  // need to pull
-                 dout(7) << "need to pull object " << hex << op->get_oid() << dec << endl;
-                 if (!pg->objects_pulling.count(op->get_oid())) 
-                       pull_replica(pg, op->get_oid());
-                 pg->waiting_for_missing_object[op->get_oid()].push_back(op);
+                 dout(7) << "need to pull object " << hex << oid << dec << endl;
+                 if (!pg->objects_pulling.count(oid)) 
+                       pull_replica(pg, oid);
+                 pg->waiting_for_missing_object[oid].push_back(op);
+                 osd_lock.Unlock();
+                 return;
+               }
+         }       
+
+         if (!pg->is_clean() &&
+                 (op->get_op() == OSD_OP_WRITE ||
+                  op->get_op() == OSD_OP_TRUNCATE ||
+                  op->get_op() == OSD_OP_DELETE)) {
+               // exists but not replicated?
+               if (pg->objects_unrep.count(oid)) {
+                 dout(7) << "object " << hex << oid << dec << " in " << *pg 
+                                 << " exists but not clean" << endl;
+                 pg->waiting_for_clean_object[oid].push_back(op);
+                 if (pg->objects_pushing.count(oid) == 0)
+                       push_replica(pg, oid);
+                 osd_lock.Unlock();
+                 return;
+               }
+
+               // just stray?
+               //  FIXME: this is a bit to aggressive; includes inactive peers
+               if (pg->objects_stray.count(oid)) {  
+                 dout(7) << "object " << hex << oid << dec << " in " << *pg 
+                                 << " dne but is not clean" << endl;
+                 pg->waiting_for_clean_object[oid].push_back(op);
+                 if (pg->objects_removing.count(oid) == 0)
+                       remove_replica(pg, oid);
+                 osd_lock.Unlock();
                  return;
                }
          }
-         
        }       
        
   } else {
        // REPLICATION OP
-       
+       if (pg) {
+         dout(7) << "handle_rep_op " << op << " in " << *pg << endl;
+       } else {
+         dout(7) << "handle_rep_op " << op << " in pgid " << op->get_pg() << endl;
+       }
     // check osd map
        if (op->get_map_version() != osdmap->get_version()) {
          // make sure source is still primary
@@ -1740,6 +1945,7 @@ void OSD::handle_op(MOSDOp *op)
                dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary changed on pg " << hex << op->get_pg() << dec << endl;
                MOSDOpReply *fail = new MOSDOpReply(op, -1, osdmap, false);
                messenger->send_message(fail, op->get_asker());
+               osd_lock.Unlock();
                return;
          } else {
                dout(5) << "op map " << op->get_map_version() << " != " << osdmap->get_version() << ", primary same on pg " << hex << op->get_pg() << dec << endl;
@@ -1753,6 +1959,8 @@ void OSD::handle_op(MOSDOp *op)
        do_op(op);
   } else
        queue_op(op);
+
+  osd_lock.Unlock();
 }
 
 void OSD::queue_op(MOSDOp *op) {
@@ -1801,35 +2009,20 @@ void OSD::do_op(MOSDOp *op)
        }
   } else {
        // regular op
-       pg_t pgid = op->get_pg();
-       PG *pg = get_pg(pgid);
-
-       // PG must be peered for all client ops.
-       if (!pg) {
-         dout(7) << "op_write pg " << hex << pgid << dec << " dne (yet)" << endl;
-         waiting_for_pg[pgid].push_back(op);
-       }       
-       else if (!pg->is_peered()) {
-         dout(7) << "op_write " << *pg << " not peered (yet)" << endl;
-         pg->waiting_for_peered.push_back(op);
-       }
-       else {
-         // do op
-         switch (op->get_op()) {
-         case OSD_OP_READ:
-               op_read(op, pg);
-               break;
-         case OSD_OP_STAT:
-               op_stat(op, pg);
-               break;
-         case OSD_OP_WRITE:
-         case OSD_OP_DELETE:
-         case OSD_OP_TRUNCATE:
-               op_modify(op, pg);
-               break;
-         default:
-               assert(0);
-         }
+       switch (op->get_op()) {
+       case OSD_OP_READ:
+         op_read(op);
+         break;
+       case OSD_OP_STAT:
+         op_stat(op);
+         break;
+       case OSD_OP_WRITE:
+       case OSD_OP_DELETE:
+       case OSD_OP_TRUNCATE:
+         op_modify(op);
+         break;
+       default:
+         assert(0);
        }
   }
 
@@ -1869,49 +2062,23 @@ void OSD::wait_for_no_ops()
 
 // READ OPS
 
-bool OSD::object_complete(PG *pg, object_t oid, Message *op)
-{
-  if (!pg->is_complete()) {
-       if (pg->objects_missing.count(oid)) {
-         dout(7) << "object " << hex << oid << dec << /*" v " << v << */" in " << *pg 
-                         << " exists but not local (yet)" << endl;
-         pg->waiting_for_missing_object[oid].push_back(op);
-         return false;
-       }
-  }
-  return true;
-}
-
-void OSD::op_read(MOSDOp *op, PG *pg)
+void OSD::op_read(MOSDOp *op)
 {
   object_t oid = op->get_oid();
   lock_object(oid);
   
-  // version?  clean?
-  if (!object_complete(pg, oid, op)) {
-       unlock_object(oid);
-       return;
-  }
-
   // read into a buffer
-  bufferptr bptr = new buffer(op->get_length());   // prealloc space for entire read
+  bufferlist bl;
   long got = store->read(oid, 
                                                 op->get_length(), op->get_offset(),
-                                                bptr.c_str());
+                                                bl);
   // set up reply
   MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); 
   if (got >= 0) {
-       bptr.set_length(got);   // properly size the buffer
-
-       // give it to the reply in a bufferlist
-       bufferlist bl;
-       bl.push_back( bptr );
-       
        reply->set_result(0);
        reply->set_data(bl);
        reply->set_length(got);
   } else {
-       bptr.set_length(0);
        reply->set_result(got);   // error
        reply->set_length(0);
   }
@@ -1929,16 +2096,10 @@ void OSD::op_read(MOSDOp *op, PG *pg)
   unlock_object(oid);
 }
 
-void OSD::op_stat(MOSDOp *op, PG *pg)
+void OSD::op_stat(MOSDOp *op)
 {
   object_t oid = op->get_oid();
   lock_object(oid);
-  
-  // version?  clean?
-  if (!object_complete(pg, oid, op)) {
-       unlock_object(oid);
-       return;
-  }
 
   struct stat st;
   memset(&st, sizeof(st), 0);
@@ -1960,27 +2121,39 @@ void OSD::op_stat(MOSDOp *op, PG *pg)
 
 // WRITE OPS
 
-int OSD::apply_write(MOSDOp *op, bool write_sync, version_t v)
+int OSD::apply_write(MOSDOp *op, version_t v, Context *onsync)
 {
   // take buffers from the message
   bufferlist bl;
-  bl.claim( op->get_data() );
+  bl = op->get_data();
+  //bl.claim( op->get_data() );
   
-  // write out buffers
-  off_t off = op->get_offset();
-  for (list<bufferptr>::iterator it = bl.buffers().begin();
-          it != bl.buffers().end();
-          it++) {
-
-       int r = store->write(op->get_oid(),
-                                                (*it).length(), off,
-                                                (*it).c_str(),
-                                                write_sync);  // write synchronously
-       off += (*it).length();
-       if (r < 0) {
-         dout(1) << "write error on " << hex << op->get_oid() << dec << " len " << (*it).length() << "  off " << off << "  r = " << r << endl;
-         assert(r >= 0);
+  // write 
+  if (onsync) {
+       if (g_conf.fake_osd_sync) {
+         // fake a delayed sync
+         store->write(op->get_oid(),
+                                  op->get_length(),
+                                  op->get_offset(),
+                                  bl,
+                                  false);
+         g_timer.add_event_after(1.0,
+                                                         onsync);
+       } else {
+         // for real
+         store->write(op->get_oid(),
+                                  op->get_length(),
+                                  op->get_offset(),
+                                  bl,
+                                  onsync);
        }
+  } else {
+       // normal business
+       store->write(op->get_oid(),
+                                op->get_length(),
+                                op->get_offset(),
+                                bl,
+                                false);
   }
 
   // set version
@@ -1990,61 +2163,70 @@ int OSD::apply_write(MOSDOp *op, bool write_sync, version_t v)
 }
 
 
-bool OSD::object_clean(PG *pg, object_t oid, version_t& v, Message *op)
+
+void OSD::issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd)
 {
-  v = 0;
+  MOSDOp *op = repop->op;
+  object_t oid = op->get_oid();
+
+  dout(7) << " issue_replica_op in " << *pg << " o " << hex << oid << dec << " to osd" << osd << endl;
   
-  if (pg->is_complete() && pg->is_clean()) {
-       // PG is complete+clean, easy shmeasy!
-       if (store->exists(oid)) {
-         store->getattr(oid, "version", &v, sizeof(v));
-         assert(v>0);
-       } 
-  } else {
-       // PG is recovering, blech.
-
-       // does oid exist, and what version.
-       if (pg->is_complete()) {
-         // pg !clean, complete
-         if (store->exists(oid)) {
-               store->getattr(oid, "version", &v, sizeof(v));
-               assert(v > 0);
-         }
-       } else {
-         // pg !clean, !complete
-         if (pg->objects_missing.count(oid)) 
-               v = pg->objects_missing_v[oid];
-       }
+  // forward the write
+  __uint64_t tid = ++last_tid;
+  MOSDOp *wr = new MOSDOp(tid,
+                                                 messenger->get_myaddr(),
+                                                 oid,
+                                                 pg->get_pgid(),
+                                                 osdmap->get_version(),
+                                                 100+op->get_op());
+  wr->get_data() = op->get_data();   // copy bufferlist
+  wr->set_length(op->get_length());
+  wr->set_offset(op->get_offset());
+  wr->set_version(repop->new_version);
+  wr->set_old_version(repop->old_version);
+  wr->set_pg_role(1); // replica
+  messenger->send_message(wr, MSG_ADDR_OSD(osd));
+  
+  repop->osds.insert(osd);
+  repop->waitfor_ack[tid] = osd;
+  repop->waitfor_sync[tid] = osd;
 
-       if (v > 0) {
-         dout(10) << " pg not clean, checking if " << hex << oid << dec << " v " << v << " is specifically clean yet! " << *pg << endl;
-         // object (logically) exists
-         if (!pg->existant_object_is_clean(oid, v)) {
-               dout(7) << "object " << hex << oid << dec << " v " << v << " in " << *pg 
-                               << " exists but is not clean" << endl;
-               pg->waiting_for_clean_object[oid].push_back(op);
-               if (pg->objects_pushing.count(oid) == 0)
-                 push_replica(pg, oid);
-               return false;
-         }
-       } else {
-         // object (logically) dne
-         if (store->exists(oid) ||
-                 !pg->nonexistant_object_is_clean(oid)) {
-               dout(7) << "object " << hex << oid << dec << " v " << v << " in " << *pg 
-                               << " dne but is not clean" << endl;
-               pg->waiting_for_clean_object[oid].push_back(op);
-               if (pg->objects_removing.count(oid) == 0)
-                 remove_replica(pg, oid);
-               return false;
-         }
-       }
+  replica_ops[tid] = repop;
+  replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid);
+}
+
+
+class C_OSD_WriteSync : public Context {
+public:
+  OSD *osd;
+  OSDReplicaOp *repop;
+  C_OSD_WriteSync(OSD *o, OSDReplicaOp *op) : osd(o), repop(op) {}
+  void finish(int r) {
+       osd->op_modify_sync(repop);
   }
+};
 
-  return true;
+void OSD::op_modify_sync(OSDReplicaOp *repop)
+{
+  dout(2) << "op_modify_sync on op " << repop->op << endl;
+
+  osd_lock.Lock();
+  {
+       repop->local_sync = true;
+       if (repop->can_send_sync()) {
+         dout(2) << "op_modify_sync on " << hex << repop->op->get_oid() << dec << " op " << repop->op << endl;
+         MOSDOpReply *reply = new MOSDOpReply(repop->op, 0, osdmap, true);
+         messenger->send_message(reply, repop->op->get_asker());
+         delete repop->op;
+       }
+       if (repop->can_delete()) {
+         delete repop;
+       }
+  }
+  osd_lock.Unlock();
 }
 
-void OSD::op_modify(MOSDOp *op, PG *pg)
+void OSD::op_modify(MOSDOp *op)
 {
   object_t oid = op->get_oid();
 
@@ -2057,98 +2239,69 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
 
   // version?  clean?
   version_t ov = 0;  // 0 == dne (yet)
-  if (!object_clean(pg, oid, ov, op)) {
-       unlock_object(oid);
-       return;
-  }
+  store->getattr(oid, "version", &ov, sizeof(ov));
+  version_t nv = messenger->peek_lamport();
+  assert(nv > ov);
 
-  version_t v = messenger->peek_lamport();
-
-  dout(12) << opname << " " << hex << oid << dec << " v " << v << endl;  
+  dout(12) << opname << " " << hex << oid << dec << " v " << nv << "  off " << op->get_offset() << " len " << op->get_length() << endl;  
 
   // issue replica writes
-  replica_write_lock.Lock();
-  assert(replica_write_tids.count(op) == 0);
-  assert(replica_write_local.count(op) == 0);
+  OSDReplicaOp *repop = new OSDReplicaOp(op, nv, ov);
 
+  osd_lock.Lock();
+  PG *pg = get_pg(op->get_pg());
   for (unsigned i=1; i<pg->acting.size(); i++) {
-       int osd = pg->acting[i];
-       dout(7) << "  replica write in " << *pg << " o " << hex << oid << dec << " to osd" << osd << endl;
-       
-       // forward the write
-       __uint64_t tid = ++last_tid;
-       MOSDOp *wr = new MOSDOp(tid,
-                                                       messenger->get_myaddr(),
-                                                       oid,
-                                                       op->get_pg(),
-                                                       osdmap->get_version(),
-                                                       100+op->get_op());
-       wr->get_data() = op->get_data();   // copy bufferlist
-       wr->set_length(op->get_length());
-       wr->set_offset(op->get_offset());
-       wr->set_version(v);
-       wr->set_old_version(ov);
-       wr->set_pg_role(1); // replica
-       messenger->send_message(wr, MSG_ADDR_OSD(osd));
-       
-       replica_write_tids[op].insert(tid);
-       replica_writes[tid] = op;
-       replica_pg_osd_tids[pg->get_pgid()][osd].insert(tid);
+       issue_replica_op(pg, repop, pg->acting[i]);
   }
-  replica_write_lock.Unlock();
+  osd_lock.Unlock();
 
   // pre-ack
-  MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
-  messenger->send_message(reply, op->get_asker());
+  //MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+  //messenger->send_message(reply, op->get_asker());
   
-  /*
-  // update pg stamp(s)
-  pg->last_modify_stamp = v;
-  if (pg->is_complete())
-       pg->last_complete_stamp = v;
-  */
-
   // do it
   int r;
   if (op->get_op() == OSD_OP_WRITE) {
        // write
-       r = apply_write(op, true, v);
+       assert(op->get_data().length() == op->get_length());
+       Context *onsync = new C_OSD_WriteSync(this, repop);
+       r = apply_write(op, nv, onsync);
        
        // put new object in proper collection
        if (ov == 0) pg->add_object(store, oid);
+
+       repop->local_ack = true;
   } 
   else if (op->get_op() == OSD_OP_TRUNCATE) {
        // truncate
        r = store->truncate(oid, op->get_offset());
+       repop->local_ack = true;
+       repop->local_sync = true;
   }
   else if (op->get_op() == OSD_OP_DELETE) {
        // delete
-       store->collection_remove(pg->get_pgid(), op->get_oid());
+       pg->remove_object(store, op->get_oid());
        r = store->remove(oid);
+       repop->local_ack = true;
+       repop->local_sync = true;
   }
   else assert(0);
 
-  // reply?
-  replica_write_lock.Lock();
-  if (replica_write_tids.count(op) == 0) {
-       // all replica writes completed.
-       if (replica_write_result[op] == 0) {
-         dout(10) << opname << " wrote locally: rep writes already finished, replying" << endl;
+  // can we reply yet?
+  osd_lock.Lock();
+  {
+       if (repop->can_send_sync()) {
+         dout(10) << opname << " sending sync on " << op << endl;
          MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true);
          messenger->send_message(reply, op->get_asker());
-         delete op;
-       } else {
-         dout(10) << opname << " wrote locally, but rep writes failed, fw to new primary" << endl;
-         //finished.push_back(op);
-         messenger->send_message(op, MSG_ADDR_OSD(pg->get_primary()));
        }
-       replica_write_result.erase(op);
-  } else {
-       // note that it's written locally
-       dout(10) << opname << " wrote locally: rep writes not yet finished, waiting" << endl;
-       replica_write_local.insert(op);
+       else if (repop->can_send_ack()) {
+         dout(10) << opname << " sending ack on " << op << endl;
+         MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, false);
+         messenger->send_message(reply, op->get_asker());
+       }
   }
-  replica_write_lock.Unlock();
+  osd_lock.Unlock();
 
   unlock_object(oid);
 }
index 3ef1d4591d2bca6948de9e262378dff19a26a501..397f1b778cb1999c01a9f851090bf5b590300927 100644 (file)
@@ -22,6 +22,27 @@ class Messenger;
 class Message;
 
 
+class OSDReplicaOp {
+ public:
+  class MOSDOp        *op;
+  map<__uint64_t,int>  waitfor_ack;
+  map<__uint64_t,int>  waitfor_sync;
+  bool                 local_ack;
+  bool                 local_sync;
+  bool                 cancel;
+
+  set<int>         osds;
+  version_t        new_version, old_version;
+  
+  OSDReplicaOp(class MOSDOp *o, version_t nv, version_t ov) : 
+       op(o), 
+       local_ack(false), local_sync(false), cancel(false),
+       new_version(nv), old_version(ov)
+       { }
+  bool can_send_ack() { return !cancel && local_ack && waitfor_ack.empty(); }
+  bool can_send_sync() { return !cancel && local_sync && waitfor_sync.empty(); }
+  bool can_delete() { return cancel || can_send_sync(); }
+};
 
 
 class OSD : public Dispatcher {
@@ -54,9 +75,6 @@ class OSD : public Dispatcher {
   //int read_onode(onode_t& onode);
   //int write_onode(onode_t& onode);
 
-  bool object_complete(PG *pg, object_t oid, Message *op);
-  bool object_clean(PG *pg, object_t oid, version_t& v, Message *op);
-
 
   // -- ops --
   class ThreadPool<class OSD, class MOSDOp>  *threadpool;
@@ -67,7 +85,9 @@ class OSD : public Dispatcher {
   void queue_op(class MOSDOp *m);
   void wait_for_no_ops();
 
-  int apply_write(MOSDOp *op, bool write_sync, version_t v); // for op_write and op_rep_write
+  int apply_write(MOSDOp *op, version_t v,
+                                 Context *onsync = 0); 
+
 
   
  public:
@@ -83,48 +103,50 @@ class OSD : public Dispatcher {
   list<class Message*> waiting_for_osdmap;
   map<version_t, OSDMap*> osdmaps;
 
-  void update_map(bufferlist& state);
+  void update_map(bufferlist& state, bool mkfs=false);
   void wait_for_new_map(Message *m);
   void handle_osd_map(class MOSDMap *m);
   OSDMap *get_osd_map(version_t v);
   
-  // <old replica hack>
-  Mutex                          replica_write_lock;
-  map<__uint64_t, MOSDOp*>       replica_writes;
-  map<MOSDOp*, set<__uint64_t> > replica_write_tids;
-  set<MOSDOp*>                   replica_write_local;
-  map<MOSDOp*, int>              replica_write_result;
-  map<pg_t, map<int, set<__uint64_t> > > replica_pg_osd_tids; // pg -> osd -> tid
-  // </hack>
+  void advance_map(list<pg_t>& ls);
+  void activate_map(list<pg_t>& ls);
+
 
 
   // -- replication --
 
   // PG
   hash_map<pg_t, PG*>      pg_map;
-  void get_pg_list(list<pg_t>& ls);
-  bool pg_exists(pg_t pg);
-  PG *create_pg(pg_t pg);             // create new PG
-  PG *get_pg(pg_t pg);             // return existing PG, load state from store (if needed)
-  void close_pg(pg_t pg);          // close in-memory state
-  void remove_pg(pg_t pg);         // remove state from store
+
+  void  get_pg_list(list<pg_t>& ls);
+  bool  pg_exists(pg_t pg);
+  PG   *create_pg(pg_t pg);          // create new PG
+  PG   *get_pg(pg_t pg);             // return existing PG, load state from store (if needed)
+  void  close_pg(pg_t pg);          // close in-memory state
+  void  remove_pg(pg_t pg);         // remove state from store
 
   __uint64_t               last_tid;
-  map<__uint64_t,PGPeer*>  pull_ops;   // tid -> PGPeer*
-  map<__uint64_t,PGPeer*>  push_ops;   // tid -> PGPeer*
-  map<__uint64_t,PGPeer*>  remove_ops; // tid -> PGPeer*
 
-  hash_map<pg_t, list<Message*> >      waiting_for_pg;
+  hash_map<pg_t, list<Message*> >        waiting_for_pg;
 
+  // replica ops
+  map<__uint64_t, OSDReplicaOp*>         replica_ops;
+  map<pg_t, map<int, set<__uint64_t> > > replica_pg_osd_tids; // pg -> osd -> tid
+  
+  void issue_replica_op(PG *pg, OSDReplicaOp *repop, int osd);
+  void ack_replica_op(__uint64_t tid, int result, bool safe, int fromosd);
 
-  void advance_map(list<pg_t>& ls);
-  void activate_map(list<pg_t>& ls);
+  // recovery
+  map<__uint64_t,PGPeer*>  pull_ops;   // tid -> PGPeer*
+  map<__uint64_t,PGPeer*>  push_ops;   // tid -> PGPeer*
+  map<__uint64_t,PGPeer*>  remove_ops; // tid -> PGPeer*
 
   void start_peers(PG *pg, map< int, map<PG*,int> >& start_map);
 
   void peer_notify(int primary, map<pg_t,version_t>& pg_list);
   void peer_start(int replica, map<PG*,int>& pg_map);
 
+  void plan_recovery(PG *pg);
   void do_recovery(PG *pg);
   void pg_pull(PG *pg, int maxops);
   void pg_push(PG *pg, int maxops);
@@ -150,7 +172,8 @@ class OSD : public Dispatcher {
   void op_rep_remove_reply(class MOSDOpReply *op);
   
   void op_rep_modify(class MOSDOp *op);   // write, trucnate, delete
-  void ack_replica_op(__uint64_t tid, int result, int fromosd);
+  void op_rep_modify_sync(class MOSDOp *op);
+  friend class C_OSD_RepModifySync;
 
  public:
   OSD(int id, Messenger *m);
@@ -166,13 +189,10 @@ class OSD : public Dispatcher {
   void handle_ping(class MPing *m);
   void handle_op(class MOSDOp *m);
 
-  void op_read(class MOSDOp *m, PG *pg);
-  void op_stat(class MOSDOp *m, PG *pg);
-  void op_modify(class MOSDOp *m, PG *pg);
-  //void op_delete(class MOSDOp *m, PG *pg);
-  //void op_truncate(class MOSDOp *m, PG *pg);
-
-  //void op_mkfs(class MOSDOp *m);
+  void op_read(class MOSDOp *m);
+  void op_stat(class MOSDOp *m);
+  void op_modify(class MOSDOp *m);
+  void op_modify_sync(class OSDReplicaOp *repop);
 
   // for replication
   void handle_op_reply(class MOSDOpReply *m);
index 7af46cd9b1196924e0efcb021e2565b1d021637b..39b0af5fa582239add9c15fb840392346d174a7a 100644 (file)
@@ -2,6 +2,8 @@
 #define __OBJECTSTORE_H
 
 #include "include/types.h"
+#include "include/Context.h"
+#include "include/bufferlist.h"
 
 #include <sys/stat.h>
 
@@ -31,12 +33,17 @@ class ObjectStore {
 
   virtual int read(object_t oid, 
                                   size_t len, off_t offset,
-                                  char *buffer) = 0;
+                                  bufferlist& bl) = 0;
+
   virtual int write(object_t oid,
                                        size_t len, off_t offset,
-                                       char *buffer,
-                                       bool fsync=true) = 0;
-  
+                                       bufferlist& bl,
+                                       bool fsync=true) = 0;     
+  virtual int write(object_t oid, 
+                                       size_t len, off_t offset, 
+                                       bufferlist& bl, 
+                                       Context *onsafe) { return -1; }
+
   virtual int setattr(object_t oid, const char *name,
                                          void *value, size_t size) {return 0;} //= 0;
   virtual int getattr(object_t oid, const char *name,
@@ -62,6 +69,8 @@ class ObjectStore {
                                                                 void *value, size_t size) {return 0;} //= 0;
   virtual int collection_listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
   
+  
+  
 };
 
 #endif
index 09b00d57d99926ac014030cc25b09f42e502eb74..698c5dfcbcaff1715291ea6d9b03cab48a3bb974 100644 (file)
@@ -6,12 +6,6 @@
 #define  dout(l)    if (l<=g_conf.debug || l<=g_conf.debug_osd) cout << "osd" << whoami << "  " << *this << " "
 
 
-void PG::mark_peered()
-{
-  dout(10) << "mark_peered" << endl;
-  state_set(PG_STATE_PEERED);
-}
-
 void PG::pulled(object_t oid, version_t v, PGPeer *p)
 {
   dout(10) << "pulled o " << hex << oid << dec << " v " << v << " from osd" << p->get_peer() << endl;
@@ -24,31 +18,8 @@ void PG::pulled(object_t oid, version_t v, PGPeer *p)
   // object is now local
   objects_missing.erase(oid);  
   objects_missing_v.erase(oid);
-  
-  if (objects_missing.empty()) {
-       assert(!is_complete());
-       mark_complete();
-  }
 }
 
-void PG::mark_complete()
-{
-  dout(10) << "mark_complete" << endl;
-
-  // done pulling objects!
-  state_set(PG_STATE_COMPLETE);
-}
-
-void PG::mark_clean()
-{
-  dout(10) << "mark_clean" << endl;
-  state_set(PG_STATE_CLEAN);
-
-  // drop residual peers
-
-  // discard peer state
-  
-}
 
 void PG::pushed(object_t oid, version_t v, PGPeer *p)
 {
@@ -100,6 +71,7 @@ void PG::removed(object_t oid, version_t v, PGPeer *p)
   }
 }
 
+/*
 bool PG::existant_object_is_clean(object_t o, version_t v)
 {
   assert(is_peered() && !is_clean());
@@ -124,12 +96,14 @@ bool PG::nonexistant_object_is_clean(object_t o)
   
   return true;
 }
+*/
 
 
 
-void PG::plan_recovery(ObjectStore *store, version_t current_version) 
+void PG::plan_recovery(ObjectStore *store, version_t current_version, 
+                                          list<PGPeer*>& complete_peers) 
 {
-  dout(10) << "plan_recovery" << endl;
+  dout(10) << "plan_recovery " << current_version << endl;
   assert(is_peered());
 
   // choose newest last_complete epoch
@@ -137,15 +111,17 @@ void PG::plan_recovery(ObjectStore *store, version_t current_version)
   for (map<int, PGPeer*>::iterator pit = peers.begin();
           pit != peers.end();
           pit++) {
+       dout(10) << "  osd" << pit->first << " " 
+                        << pit->second->objects.size() << " objects, last_complete " << pit->second->last_complete << endl;
        if (pit->second->last_complete > last)
          last = pit->second->last_complete;
   }
   dout(10) << " combined last_complete epoch is " << last << endl;
 
-  if (last < current_version-1) {
+  if (last+1 < current_version) {
        dout(1) << "WARNING: last_complete skipped one or more epochs, we're possibly missing something" << endl;
   }
-  if (!last) {
+  if (!last) {  // bootstrap!
        dout(1) << "WARNING: no complete peers available (yet), pg is crashed" << endl;
        return;
   }
@@ -158,10 +134,8 @@ void PG::plan_recovery(ObjectStore *store, version_t current_version)
   scan_local_objects(local_objects, store);
   dout(10) << " " << local_objects.size() << " local objects" << endl;
 
-  if (last_complete == last) {
-       assert(is_complete());
+  if (last_complete == last) 
        master = local_objects;
-  }
   
   for (map<int, PGPeer*>::iterator pit = peers.begin();
           pit != peers.end();
@@ -184,7 +158,7 @@ void PG::plan_recovery(ObjectStore *store, version_t current_version)
   dout(7) << " master list has " << master.size() << " objects" << endl;
 
   // local cleanup?
-  if (!is_complete()) {
+  if (!is_complete(current_version)) {
        // just cleanup old local objects
        // FIXME: do this async?
 
@@ -237,6 +211,10 @@ void PG::plan_recovery(ObjectStore *store, version_t current_version)
        assert(local || !objects_missing[o].empty());  // pull
   }
 
+  if (objects_missing.empty()) {
+       mark_complete(current_version);
+  }
+
   // plan clean -> objects_stray
   for (map<int, PGPeer*>::iterator pit = peers.begin();
           pit != peers.end();
@@ -247,6 +225,11 @@ void PG::plan_recovery(ObjectStore *store, version_t current_version)
        PGPeer *p = pit->second;
        assert(p->is_active());
 
+       if (p->missing.empty() && p->stray.empty()) {
+         p->state_set(PG_PEER_STATE_COMPLETE);
+         complete_peers.push_back(p);
+       }
+       
        if (p->is_complete()) {
          dout(12) << " peer osd" << pit->first << " is complete" << endl;
        } else {
@@ -273,6 +256,9 @@ void PG::plan_recovery(ObjectStore *store, version_t current_version)
        }
   }
 
+  if (objects_unrep.empty() && objects_stray.empty())
+       mark_clean();
+
   // clear peer content lists
   for (map<int, PGPeer*>::iterator pit = peers.begin();
           pit != peers.end();
index 82dd113848a9eaec2673cd396f2424fb49df7ff8..88cad5313f9f8c6c88a204df55700a25b575c861 100644 (file)
@@ -11,16 +11,23 @@ using namespace __gnu_cxx;
 struct PGReplicaInfo {
   int state;
   version_t last_complete;
+  version_t last_any_complete;
   map<object_t,version_t>  objects;        // remote object list
 
   void _encode(bufferlist& blist) {
        blist.append((char*)&state, sizeof(state));
+       blist.append((char*)&last_complete, sizeof(last_complete));
+       blist.append((char*)&last_any_complete, sizeof(last_any_complete));
        ::_encode(objects, blist);
        //::_encode(deleted, blist);
   }
   void _decode(bufferlist& blist, int& off) {
        blist.copy(off, sizeof(state), (char*)&state);
        off += sizeof(state);
+       blist.copy(off, sizeof(last_complete), (char*)&last_complete);
+       off += sizeof(last_complete);
+       blist.copy(off, sizeof(last_any_complete), (char*)&last_any_complete);
+       off += sizeof(last_any_complete);
        ::_decode(objects, blist, off);
        //::_decode(deleted, blist, off);
   }
@@ -127,7 +134,7 @@ class PGPeer {
 
 
 
-
+/*
 // a task list for moving objects around
 class PGQueue {
   list<object_t>  objects;
@@ -160,6 +167,7 @@ class PGQueue {
   }
   bool empty() { return objects.empty(); }
 };
+*/
 
 
 
@@ -168,7 +176,7 @@ class PGQueue {
  */
 
 // any
-#define PG_STATE_COMPLETE    1  // i have full PG contents locally
+//#define PG_STATE_COMPLETE    1  // i have full PG contents locally
 #define PG_STATE_PEERED      2  // primary: peered with everybody
                                 // replica: peered with auth
 
@@ -189,13 +197,16 @@ class PG {
   int         state;   // see bit defns above
   version_t   primary_since;  // (only defined if role==0)
 
+  version_t   last_complete;      // me
+  version_t   last_any_complete;  // anybody in the set
+  
+ public:
   map<int, PGPeer*>         peers;  // primary: (soft state) active peers
 
  public:
   vector<int> acting;
   //pginfo_t    info;
 
-  version_t   last_complete;      // epoch
 
   /*
   lamport_t   last_complete_stamp;     // lamport timestamp of last complete op
@@ -229,6 +240,8 @@ class PG {
        if (objects_missing.empty()) return false;
        if (objects_missing.size() == objects_pulling.size()) return false;
 
+       if (objects_pulling.empty() || pull_pos == objects_missing.end()) 
+         pull_pos = objects_missing.begin();
        while (objects_pulling.count(pull_pos->first)) {
          pull_pos++;
          if (pull_pos == objects_missing.end()) 
@@ -243,6 +256,8 @@ class PG {
        if (objects_unrep.empty()) return false;
        if (objects_unrep.size() == objects_pushing.size()) return false;
 
+       if (objects_pushing.empty() || push_pos == objects_unrep.end()) 
+         push_pos = objects_unrep.begin();
        while (objects_pushing.count(push_pos->first)) {
          push_pos++;
          if (push_pos == objects_unrep.end()) 
@@ -257,6 +272,8 @@ class PG {
        if (objects_stray.empty()) return false;
        if (objects_stray.size() == objects_removing.size()) return false;
 
+       if (objects_removing.empty() || remove_pos == objects_stray.end()) 
+         remove_pos = objects_stray.begin();
        while (objects_removing.count(remove_pos->first)) {
          remove_pos++;
          if (remove_pos == objects_stray.end()) 
@@ -268,8 +285,22 @@ class PG {
        return true;
   }
 
+  void pulling(object_t oid, version_t v, PGPeer *p) {
+       p->pull(oid, v);
+       objects_pulling[oid] = p;
+  }
   void pulled(object_t oid, version_t v, PGPeer *p);
+
+  void pushing(object_t oid, version_t v, PGPeer *p) {
+       p->push(oid, v);
+       objects_pushing[oid].insert(p);
+  }
   void pushed(object_t oid, version_t v, PGPeer *p);
+
+  void removing(object_t oid, version_t v, PGPeer *p) {
+       p->remove(oid, v);
+       objects_removing[oid][p] = v;
+  }
   void removed(object_t oid, version_t v, PGPeer *p);
 
 
@@ -298,7 +329,8 @@ class PG {
 
 
  public:
-  void plan_recovery(ObjectStore *store, version_t current_version);
+  void plan_recovery(ObjectStore *store, version_t current_version,
+                                        list<PGPeer*>& complete_peers);
 
   void discard_recovery_plan() {
        assert(waiting_for_peered.empty());
@@ -315,7 +347,8 @@ class PG {
   PG(int osd, pg_t p) : whoami(osd), pgid(p),
        role(0),
        state(0),
-       last_complete(0)
+       primary_since(0),
+       last_complete(0), last_any_complete(0)
        //last_complete_stamp(0), last_modify_stamp(0), last_clean_stamp(0) 
        { }
   
@@ -324,7 +357,9 @@ class PG {
   int        get_nrep() { return acting.size(); }
 
   version_t  get_last_complete() { return last_complete; }
-  void       set_last_complete(version_t v) { last_complete = v; }
+  //void       set_last_complete(version_t v) { last_complete = v; }
+  version_t  get_last_any_complete() { return last_any_complete; }
+  //void       set_last_any_complete(version_t v) { last_any_complete = v; }
 
   version_t  get_primary_since() { return primary_since; }
   void       set_primary_since(version_t v) { primary_since = v; }
@@ -339,25 +374,35 @@ class PG {
   bool       is_primary() { return role == 0; }
   bool       is_residual() { return role < 0; }
 
-  bool existant_object_is_clean(object_t o, version_t v);
-  bool nonexistant_object_is_clean(object_t o);
-
   int  get_state() { return state; }
   bool state_test(int m) { return (state & m) != 0; }
   void set_state(int s) { state = s; }
   void state_set(int m) { state |= m; }
   void state_clear(int m) { state &= ~m; }
 
-  bool       is_complete()  { return state_test(PG_STATE_COMPLETE); }
+  bool       is_complete(version_t v)  { 
+       //return state_test(PG_STATE_COMPLETE); 
+       return v == last_complete;
+  }
   bool       is_peered()    { return state_test(PG_STATE_PEERED); }
   //bool       is_crowned()   { return state_test(PG_STATE_CROWNED); }
   bool       is_clean()     { return state_test(PG_STATE_CLEAN); }
   //bool       is_flushing() { return state_test(PG_STATE_FLUSHING); }
   bool       is_stray() { return state_test(PG_STATE_STRAY); }
 
-  void       mark_peered();
-  void       mark_complete();
-  void       mark_clean();
+  void       mark_peered() { 
+       state_set(PG_STATE_PEERED);
+  }
+  void       mark_complete(version_t v) {
+       last_complete = v;
+       if (v > last_any_complete) last_any_complete = v;
+  }
+  void       mark_any_complete(version_t v) {
+       if (v > last_any_complete) last_any_complete = v;
+  }
+  void       mark_clean() {
+       state_set(PG_STATE_CLEAN);
+  }
 
   int num_active_ops() {
        int o = 0;
@@ -403,10 +448,10 @@ class PG {
        store->collection_getattr(pgid, "state", &state, sizeof(state));        
   }
 
-  void add_object(ObjectStore *store, object_t oid) {
+  void add_object(ObjectStore *store, const object_t oid) {
        store->collection_add(pgid, oid);
   }
-  void remove_object(ObjectStore *store, object_t oid) {
+  void remove_object(ObjectStore *store, const object_t oid) {
        store->collection_remove(pgid, oid);
   }
   void list_objects(ObjectStore *store, list<object_t>& ls) {
@@ -437,9 +482,10 @@ class PG {
 inline ostream& operator<<(ostream& out, PG& pg)
 {
   out << "pg[" << hex << pg.get_pgid() << dec << " " << pg.get_role();
-  if (pg.is_complete()) out << " complete";
+  //if (pg.is_complete()) out << " complete";
   if (pg.is_peered()) out << " peered";
   if (pg.is_clean()) out << " clean";
+  out << " lc=" << pg.get_last_complete();
   out << "]";
   return out;
 }
index e1c5689a2ccfcb786674618a6fc1b192db564d12..cf00fcbe5467f7ac93d56b4093b4fd530040d838 100644 (file)
@@ -53,6 +53,21 @@ void Filer::dispatch(Message *m)
 }
 
 
+bool Filer::is_active() 
+{
+  if (!op_reads.empty() ||
+         !op_modify.empty() ||
+         !op_probes.empty()) {
+       for (hash_map<tid_t,PendingOSDRead_t*>::iterator it = op_reads.begin();
+                it != op_reads.end();
+                it++) dout(10) << " pending read op " << it->first << endl;
+       for (hash_map<tid_t,PendingOSDOp_t*>::iterator it = op_modify.begin();
+                it != op_modify.end();
+                it++) dout(10) << " pending modify op " << it->first << endl;
+       return true;
+  }
+  return false;
+}
 
 void Filer::handle_osd_map(MOSDMap *m)
 {
@@ -115,7 +130,7 @@ Filer::read(inode_t& inode,
   // map buffer into OSD extents
   file_to_extents(inode, len, offset, p->extents);
 
-  dout(7) << "osd read ino " << inode.ino << " len " << len << " off " << offset << " in " << p->extents.size() << " object extents" << endl;
+  dout(7) << "osd read ino " << hex << inode.ino << dec << " len " << len << " off " << offset << " in " << p->extents.size() << " object extents" << endl;
 
   // issue reads
   for (list<OSDExtent>::iterator it = p->extents.begin();
@@ -129,7 +144,7 @@ Filer::read(inode_t& inode,
                                                   OSD_OP_READ);
        m->set_length(it->len);
        m->set_offset(it->offset);
-       dout(15) << " read on " << last_tid << " from oid " << it->oid << " off " << it->offset << " len " << it->len << " (" << it->buffer_extents.size() << " buffer bits)" << endl;
+       dout(15) << " read on " << last_tid << " from oid " << hex << it->oid << dec  << " off " << it->offset << " len " << it->len << " (" << it->buffer_extents.size() << " buffer bits)" << endl;
        messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
 
        // add to gather set
@@ -193,7 +208,7 @@ Filer::handle_osd_read_reply(MOSDOpReply *m)
                for (map<size_t,size_t>::iterator bit = eit->buffer_extents.begin();
                         bit != eit->buffer_extents.end();
                         bit++) {
-                 dout(21) << "object " << eit->oid << " extent " << eit->offset << " len " << eit->len << " : ox offset " << ox_off << " -> buffer extent " << bit->first << " len " << bit->second << endl;
+                 dout(21) << "object " << hex << eit->oid << dec << " extent " << eit->offset << " len " << eit->len << " : ox offset " << ox_off << " -> buffer extent " << bit->first << " len " << bit->second << endl;
                  by_off[bit->first] = new bufferlist;
 
                  if (ox_off + bit->second <= ox_len) {
@@ -310,7 +325,7 @@ Filer::write(inode_t& inode,
   list<OSDExtent> extents;
   file_to_extents(inode, len, offset, extents);
 
-  dout(7) << "osd write ino " << inode.ino << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl;
+  dout(7) << "osd write ino " << hex << inode.ino << dec << " len " << len << " off " << offset << " in " << extents.size() << " extents" << endl;
 
   size_t off = 0;  // ptr into buffer
 
@@ -347,7 +362,7 @@ Filer::write(inode_t& inode,
        op_modify[last_tid] = p;
 
        // send
-       dout(15) << " write on " << last_tid << endl;
+       dout(15) << " write on " << last_tid << "  oid " << hex << it->oid << dec << " off " << it->offset << " len " << it->len << endl;
        messenger->send_message(m, MSG_ADDR_OSD(it->osd), 0);
   }
 
@@ -451,7 +466,7 @@ int Filer::truncate(inode_t& inode,
   list<OSDExtent> extents;
   file_to_extents(inode, old_size, new_size, extents);
 
-  dout(7) << "osd truncate ino " << inode.ino << " to new size " << new_size << " from old_size " << old_size << " in " << extents.size() << " extents" << endl;
+  dout(7) << "osd truncate ino " << hex << inode.ino << dec << " to new size " << new_size << " from old_size " << old_size << " in " << extents.size() << " extents" << endl;
 
   int n = 0;
   for (list<OSDExtent>::iterator it = extents.begin();
index fc08c6ac9e15a4e3468d0183981da90efbe34d8b..6eb82a2e557a72118d31f8e9a99a39dccc8341b9 100644 (file)
@@ -106,12 +106,7 @@ class Filer : public Dispatcher {
 
   void dispatch(Message *m);
 
-  bool is_active() {
-       if (!op_reads.empty() ||
-               !op_modify.empty() ||
-               !op_probes.empty()) return true;
-       return false;
-  }
+  bool is_active();
 
   // osd fun
   int read(inode_t& inode,