]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: some ObjectContext changes
authorSage Weil <sage@newdream.net>
Thu, 21 May 2009 19:35:26 +0000 (12:35 -0700)
committerSage Weil <sage@newdream.net>
Thu, 21 May 2009 19:35:26 +0000 (12:35 -0700)
src/TODO
src/osd/ReplicatedPG.cc
src/osd/ReplicatedPG.h

index 82061d57ddbc6a6619f15c6534b8f5b51556b7e3..b6ea6b965de795c04caea6a65c93eb47a3b0b4ec 100644 (file)
--- a/src/TODO
+++ b/src/TODO
@@ -52,6 +52,10 @@ v0.9
 - make mds exhert memory pressure on client caps, leases
 - optionally separate osd interfaces (ips) for clients and osds (replication, peering, etc.)
 
+- object access modes
+ - unify read/write paths
+ - add objectcontext for clones as they're created (esp if write is delayed)
+ - make snap selection behave
 
 
 
index c025b54885595df7d9d99fd3747f23e85043fa4b..07fb75548939f66a20976b5ff944caebdb67f683 100644 (file)
@@ -1442,6 +1442,8 @@ void ReplicatedPG::apply_repop(RepGather *repop)
   
   repop->applied = true;
   
+  repop->obc->finish_write();
+
   put_object_context(repop->obc);
   repop->obc = 0;
 
@@ -1651,49 +1653,55 @@ void ReplicatedPG::repop_ack(RepGather *repop, int result, int ack_type,
 // -------------------------------------------------------
 
 
-ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(pobject_t poid)
+ReplicatedPG::ObjectContext *ReplicatedPG::get_object_context(sobject_t soid)
 {
-  ObjectContext *obc = &object_contexts[poid];
+  ObjectContext *obc = &object_contexts[soid];
   obc->ref++;
 
   if (obc->ref > 1) {
-    dout(10) << "get_object_context " << poid << " "
+    dout(10) << "get_object_context " << soid << " "
             << (obc->ref-1) << " -> " << obc->ref << dendl;
     return obc;    // already had it
   }
 
   // pull info off disk
-  obc->poid = poid;
+  obc->soid = soid;
     
   struct stat st;
-  int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
+  int r = osd->store->stat(info.pgid.to_coll(), soid, &st);
   if (r == 0) {
     obc->exists = true;
     obc->size = st.st_size;
     
     bufferlist bv;
-    r = osd->store->getattr(info.pgid.to_coll(), poid, OI_ATTR, bv);
+    r = osd->store->getattr(info.pgid.to_coll(), soid, OI_ATTR, bv);
     assert(r >= 0);
     obc->oi.decode(bv);
   } else {
-    obc->oi.soid = poid;
+    obc->oi.soid = soid;
     obc->exists = false;
     obc->size = 0;
   }
 
-  dout(10) << "get_object_context " << poid << " read " << obc->oi << dendl;
+  dout(10) << "get_object_context " << soid << " read " << obc->oi << dendl;
   return obc;
 }
 
 void ReplicatedPG::put_object_context(ObjectContext *obc)
 {
-  dout(10) << "put_object_context " << obc->poid << " "
+  dout(10) << "put_object_context " << obc->soid << " "
           << obc->ref << " -> " << (obc->ref-1) << dendl;
 
+  if (obc->wake) {
+    osd->take_waiters(obc->waiting);
+    obc->wake = false;
+  }
+
   --obc->ref;
   if (obc->ref == 0) {
-    object_contexts.erase(obc->poid);
+    assert(obc->waiting.empty());
 
+    object_contexts.erase(obc->soid);
     if (object_contexts.empty())
       kick();
   }
@@ -1826,6 +1834,8 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   // note my stats
   utime_t now = g_clock.now();
 
+  obc->start_write();
+
   // issue replica writes
   tid_t rep_tid = osd->get_tid();
   RepGather *repop = new_repop(ctx, obc, noop, rep_tid);
index 4a5c03db66c8f36c8c1dc1e364b688ef9aba3aed..e29ac3398eae02967020416f79982b2c1defb2bd 100644 (file)
@@ -45,27 +45,27 @@ public:
     - idle
       - no in-progress or waiting writes.
       - read: ok
-      - write: move to 'delayed' or 'rmw'
-      - rmw: move to 'rmw'
+      - write: ok.  move to 'delayed' or 'rmw'
+      - rmw: ok.  move to 'rmw'
          
     - delayed
       - delayed write in progress.  delay write application on primary.
       - when done, move to 'idle'
       - read: ok
       - write: ok
-      - rmw: move to 'delayed-flushing'
-
-    - delayed-flushing
-      - waiting for delayed writes to flush, then move to 'rmw'
-      - read, write, rmw: wait
+      - rmw: no.  move to 'delayed-flushing'
 
     - rmw
       - rmw cycles in flight.  applied immediately at primary.
       - when done, move to 'idle'
       - read: same client ok.  otherwise, move to 'rmw-flushing'
-      - write: ok
-      - rmw: same client ok.  otherwise, wait for rmw to flush
+      - write: same client ok.  otherwise, start write, but also move to 'rmw-flushing'
+      - rmw: same client ok.  otherwise, move to 'rmw-flushing'
       
+    - delayed-flushing
+      - waiting for delayed writes to flush, then move to 'rmw'
+      - read, write, rmw: wait
+
     - rmw-flushing
       - waiting for rmw to flush, then move to 'idle'
       - read, write, rmw: wait
@@ -80,19 +80,144 @@ public:
    * replicas ack.
    */
   struct ObjectContext {
+    sobject_t soid;
+    int ref;
+
     enum {
-      IDLE, DELAYED, DELAYED_FLUSHING, RMW, RMW_FLUSHING
+      IDLE,
+      DELAYED,
+      RMW,
+      DELAYED_FLUSHING,
+      RMW_FLUSHING
     } state;
 
-    int ref;
-    sobject_t poid;
+    static const char *get_state_name(int s) {
+      switch (s) {
+      case IDLE: return "idle";
+      case DELAYED: return "delayed";
+      case RMW: return "rmw";
+      case DELAYED_FLUSHING: return "delayed-flushing";
+      case RMW_FLUSHING: return "rmw-flushing";
+      default: return "???";
+      }
+    }
+
+    int num_wr, num_rmw;
+    entity_inst_t client;
+    list<Message*> waiting;
+    bool wake;
 
     bool exists;
     __u64 size;
 
     object_info_t oi;
     
-    ObjectContext() : state(IDLE), ref(0), exists(false), size(0), oi(poid) {}
+    bool try_read(entity_inst_t& c) {
+      switch (state) {
+      case IDLE:
+      case DELAYED:
+       return true;
+      case RMW:
+       if (c == client)
+         return true;
+       state = RMW_FLUSHING;
+       return false;
+      case DELAYED_FLUSHING:
+      case RMW_FLUSHING:
+       return false;
+      default:
+       assert(0);
+      }
+    }
+    bool try_write(entity_inst_t& c) {
+      switch (state) {
+      case IDLE:
+       state = DELAYED;
+      case DELAYED:
+       return true;
+      case RMW:
+       if (c == client)
+         return true;
+       state = RMW_FLUSHING;
+       return true;
+      case DELAYED_FLUSHING:
+      case RMW_FLUSHING:
+       return false;
+      default:
+       assert(0);
+      }
+    }
+    bool try_rmw(entity_inst_t& c) {
+      switch (state) {
+      case IDLE:
+       state = RMW;
+       client = c;
+       return true;
+      case DELAYED:
+       state = DELAYED_FLUSHING;
+       return false;
+      case RMW:
+       if (c == client)
+         return true;
+       state = RMW_FLUSHING;
+       return false;
+      case DELAYED_FLUSHING:
+      case RMW_FLUSHING:
+       return false;
+      default:
+       assert(0);
+      }
+    }
+
+    void start_write() {
+      num_wr++;
+    }
+    void finish_write() {
+      assert(num_wr > 0);
+      --num_wr;
+      if (num_wr == 0)
+       switch (state) {
+       case DELAYED:
+         assert(!num_rmw);
+         state = IDLE;
+         wake = true;
+         break;
+       case RMW:
+       case DELAYED_FLUSHING:
+       case RMW_FLUSHING:
+         if (!num_rmw && !num_wr) {
+           state = IDLE;
+           wake = true;
+         }
+         break;
+       default:
+         assert(0);
+       }
+    }
+
+    void start_rmw() {
+      ++num_rmw;
+    }
+    void finish_rmw() {
+      assert(num_rmw > 0);
+      --num_rmw;
+      if (num_rmw == 0) {
+       switch (state) {
+       case RMW:
+       case RMW_FLUSHING:
+         if (!num_rmw && !num_wr) {
+           state = IDLE;
+           wake = true;
+         }
+         break;
+       default:
+         assert(0);
+       }
+      }
+    }
+
+    ObjectContext() : ref(0), state(IDLE), num_wr(0), num_rmw(0), wake(false),
+                     exists(false), size(0), oi(soid) {}
   };
 
   /*
@@ -317,6 +442,15 @@ public:
 };
 
 
+inline ostream& operator<<(ostream& out, ReplicatedPG::ObjectContext& obc)
+{
+  out << "obc(" << obc.soid << " " << obc.get_state_name(obc.state);
+  if (!obc.waiting.empty())
+    out << " WAITING";
+  out << ")";
+  return out;
+}
+
 inline ostream& operator<<(ostream& out, ReplicatedPG::RepGather& repop)
 {
   out << "repgather(" << &repop << " rep_tid=" << repop.rep_tid