]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
big cleanup of OSD, generic PG interface. do_op moved into PG. new ObjectLayout...
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 6 Mar 2007 22:51:00 +0000 (22:51 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 6 Mar 2007 22:51:00 +0000 (22:51 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@1175 29311d96-e01e-0410-9327-a35deaab8ce9

20 files changed:
branches/sage/pgs/config.cc
branches/sage/pgs/include/types.h
branches/sage/pgs/messages/MOSDOp.h
branches/sage/pgs/messages/MOSDOpReply.h
branches/sage/pgs/msg/FakeMessenger.cc
branches/sage/pgs/osd/OSD.cc
branches/sage/pgs/osd/OSD.h
branches/sage/pgs/osd/OSDMap.h
branches/sage/pgs/osd/PG.cc
branches/sage/pgs/osd/PG.h
branches/sage/pgs/osd/RAID4PG.cc
branches/sage/pgs/osd/RAID4PG.h
branches/sage/pgs/osd/ReplicatedPG.cc
branches/sage/pgs/osd/ReplicatedPG.h
branches/sage/pgs/osd/osd_types.h
branches/sage/pgs/osdc/Filer.cc
branches/sage/pgs/osdc/ObjectCacher.cc
branches/sage/pgs/osdc/ObjectCacher.h
branches/sage/pgs/osdc/Objecter.cc
branches/sage/pgs/osdc/Objecter.h

index f197a549792b35ca65698b111f049f88458230e7..57b8bd85ec453b77345e5e7ad98434abd35b059b 100644 (file)
@@ -161,7 +161,7 @@ md_config_t g_conf = {
   mds_decay_halflife: 30,
 
   mds_beacon_interval: 5.0,
-  mds_beacon_grace: 10.0,
+  mds_beacon_grace: 100000.0,
 
   mds_log: true,
   mds_log_max_len:  MDS_CACHE_SIZE / 3,
index e6996fe2d2fe9dc5031c2ac68fdd0eced1a62242..9bcd4c6417f7507c931d0cb8472b87fbf6b4212d 100644 (file)
@@ -141,11 +141,15 @@ struct FileLayout {
   char pg_size;        // pg size (num replicas, or raid4 stripe width)
   int  preferred;      // preferred primary osd?
 
+  // -- pg -> disk layout --
+  int  object_stripe_unit;  // for per-object raid
+
   FileLayout() { }
   FileLayout(int su, int sc, int os, int pgt, int pgs, int o=-1) :
     stripe_unit(su), stripe_count(sc), object_size(os), 
-    pg_type(pgt), pg_size(pgs),
-    preferred(o) {
+    pg_type(pgt), pg_size(pgs), preferred(o),
+    object_stripe_unit(su)   // note: bad default, we pbly want su/(pgs-1)
+  {
     assert(object_size % stripe_unit == 0);
   }
 
index 6139df56d833e39503e0c56a71af9975a6dd88f1..954d0a1353e678bc37ae2205479d0334bac48b60 100644 (file)
@@ -75,8 +75,6 @@ public:
 
 private:
   struct {
-    long pcid;
-    
     // who's asking?
     entity_inst_t client;
     reqid_t    reqid;  // minor weirdness: entity_name_t is in reqid_t too.
@@ -86,7 +84,7 @@ private:
     
     object_t oid;
     objectrev_t rev;
-    pg_t pg;
+    ObjectLayout layout;
     
     epoch_t map_epoch;
     
@@ -119,7 +117,8 @@ private:
   void set_rep_tid(tid_t t) { st.rep_tid = t; }
 
   const object_t get_oid() { return st.oid; }
-  const pg_t     get_pg() { return st.pg; }
+  const pg_t     get_pg() { return st.layout.pgid; }
+  const ObjectLayout& get_layout() { return st.layout; }
   const epoch_t  get_map_epoch() { return st.map_epoch; }
 
   //const int        get_pg_role() { return st.pg_role; }  // who am i asking for?
@@ -154,12 +153,9 @@ private:
   size_t get_data_len() { return data.length(); }
 
 
-  // keep a pcid (procedure call id) to match up request+reply
-  void set_pcid(long pcid) { this->st.pcid = pcid; }
-  long get_pcid() { return st.pcid; }
 
   MOSDOp(entity_inst_t asker, int inc, long tid,
-         object_t oid, pg_t pg, epoch_t mapepoch, int op) :
+         object_t oid, ObjectLayout ol, epoch_t mapepoch, int op) :
     Message(MSG_OSD_OP) {
     memset(&st, 0, sizeof(st));
     this->st.client = asker;
@@ -168,7 +164,7 @@ private:
     this->st.reqid.tid = tid;
 
     this->st.oid = oid;
-    this->st.pg = pg;
+    this->st.layout = ol;
     this->st.map_epoch = mapepoch;
     this->st.op = op;
 
@@ -182,6 +178,8 @@ private:
   //void set_pg_role(int r) { st.pg_role = r; }
   //void set_rg_nrep(int n) { st.rg_nrep = n; }
 
+  void set_layout(const ObjectLayout& l) { st.layout = l; }
+
   void set_length(size_t l) { st.length = l; }
   void set_offset(size_t o) { st.offset = o; }
   void set_version(eversion_t v) { st.version = v; }
@@ -204,8 +202,7 @@ private:
     ::_encode(data, payload);
   }
 
-  virtual char *get_type_name() { return "oop"; }
-
+  virtual char *get_type_name() { return "osd_op"; }
   void print(ostream& out) {
     out << "osd_op(" << st.reqid
        << " " << get_opname(st.op)
index 05106e096d1768bd720186d0d397a94110c1e41d..6ec15861dcbddbcf431e7bdf9be4fe64dec52b09 100644 (file)
@@ -36,7 +36,7 @@ class MOSDOpReply : public Message {
     tid_t rep_tid;
     
     object_t oid;
-    pg_t pg;
+    ObjectLayout layout;  // pgid, etc.
     
     int op;
     
@@ -60,7 +60,7 @@ class MOSDOpReply : public Message {
   long     get_tid() { return st.reqid.tid; }
   long     get_rep_tid() { return st.rep_tid; }
   object_t get_oid() { return st.oid; }
-  pg_t     get_pg() { return st.pg; }
+  pg_t     get_pg() { return st.layout.pgid; }
   int      get_op()  { return st.op; }
   bool     get_commit() { return st.commit; }
   
@@ -105,7 +105,7 @@ public:
     this->st.rep_tid = req->st.rep_tid;
 
     this->st.oid = req->st.oid;
-    this->st.pg = req->st.pg;
+    this->st.layout = req->st.layout;
     this->st.result = result;
     this->st.commit = commit;
 
@@ -132,7 +132,7 @@ public:
     ::_encode(data, payload);
   }
 
-  virtual char *get_type_name() { return "oopr"; }
+  virtual char *get_type_name() { return "osd_op_reply"; }
   
   void print(ostream& out) {
     out << "osd_op_reply(" << st.reqid
index d2db8c8f7e11c55c063570169ac767a7b0c7c909..ae267dfb7632346ea3f778a9afcd9c5e24b75052 100644 (file)
@@ -155,9 +155,9 @@ int fakemessenger_do_loop_2()
       
       if (m) {
         //dout(18) << "got " << m << endl;
-        dout(1) << "---- " << m->get_dest() 
+        dout(1) << "==== " << m->get_dest() 
                << " <- " << m->get_source()
-                << " ---- " << *m 
+                << " ==== " << *m 
                 << endl;
         
         if (g_conf.fakemessenger_serialize) {
@@ -329,7 +329,7 @@ int FakeMessenger::send_message(Message *m, entity_inst_t inst, int port, int fr
     }
     dm->queue_incoming(m);
 
-    dout(1) << "--> " << get_myname() << " -> " << inst.name << " " << *m << endl;
+    dout(1) << "--> " << get_myname() << " -> " << inst.name << " --- " << *m << endl;
     
   }
   catch (...) {
index 1d4392748f207297d1b5ebecc2bbf4d0cf12a751..75d822b63d985f947753969396ea3d8a42953d65 100644 (file)
@@ -361,20 +361,55 @@ int OSD::read_superblock()
 }
 
 
-// object locks
 
-PG *OSD::lock_pg(pg_t pgid) 
+
+
+// ======================================================
+// PG's
+
+PG *OSD::_create_lock_pg(pg_t pgid, ObjectStore::Transaction& t)
 {
-  osd_lock.Lock();
-  PG *pg = _lock_pg(pgid);
-  osd_lock.Unlock();
+  dout(10) << "_create_lock_pg " << pgid << endl;
+
+  if (pg_map.count(pgid)) 
+    dout(0) << "_create_lock_pg on " << pgid << ", already have " << *pg_map[pgid] << endl;
+  
+  // create
+  PG *pg;
+  if (pgid.is_rep())
+    pg = new ReplicatedPG(this, pgid);
+  else if (pgid.is_raid4())
+    pg = new RAID4PG(this, pgid);
+  else 
+    assert(0);
+
+  assert(pg_map.count(pgid) == 0);
+  pg_map[pgid] = pg;
+
+  // lock
+  pg->lock();
+  pg_lock.insert(pgid);
+
+  pg->get(); // because it's in pg_map
+  pg->get(); // because we're locking it
+
+  // create collection
+  assert(!store->collection_exists(pgid));
+  t.create_collection(pgid);
+
   return pg;
 }
 
+bool OSD::_have_pg(pg_t pgid)
+{
+  return pg_map.count(pgid);
+}
+
 PG *OSD::_lock_pg(pg_t pgid)
 {
   assert(pg_map.count(pgid));
 
+  // wait?
   if (pg_lock.count(pgid)) {
     Cond c;
     dout(15) << "lock_pg " << pgid << " waiting as " << &c << endl;
@@ -396,14 +431,10 @@ PG *OSD::_lock_pg(pg_t pgid)
   dout(15) << "lock_pg " << pgid << endl;
   pg_lock.insert(pgid);
 
-  return pg_map[pgid];  
-}
-
-void OSD::unlock_pg(pg_t pgid) 
-{
-  osd_lock.Lock();
-  _unlock_pg(pgid);
-  osd_lock.Unlock();
+  PG *pg = pg_map[pgid];
+  pg->lock();
+  pg->get();    // because we're "locking" it and returning a pointer copy.
+  return pg;
 }
 
 void OSD::_unlock_pg(pg_t pgid) 
@@ -412,6 +443,8 @@ void OSD::_unlock_pg(pg_t pgid)
   assert(pg_lock.count(pgid));
   pg_lock.erase(pgid);
 
+  pg_map[pgid]->put_unlock();
+
   if (pg_lock_waiters.count(pgid)) {
     // someone is in line
     Cond *c = pg_lock_waiters[pgid].front();
@@ -424,9 +457,14 @@ void OSD::_unlock_pg(pg_t pgid)
   }
 }
 
-void OSD::_remove_pg(pg_t pgid
+void OSD::_remove_unlock_pg(PG *pg
 {
-  dout(10) << "_remove_pg " << pgid << endl;
+  pg_t pgid = pg->info.pgid;
+
+  dout(10) << "_remove_unlock_pg " << pgid << endl;
+
+  // there shouldn't be any waiters, since we're a stray, and pg is presumably clean0.
+  assert(pg_lock_waiters.count(pgid) == 0);
 
   // remove from store
   list<object_t> olist;
@@ -435,19 +473,126 @@ void OSD::_remove_pg(pg_t pgid)
   ObjectStore::Transaction t;
   {
     for (list<object_t>::iterator p = olist.begin();
-         p != olist.end();
-         p++)
+        p != olist.end();
+        p++)
       t.remove(*p);
     t.remove_collection(pgid);
     t.remove(pgid.to_object());  // log too
   }
   store->apply_transaction(t);
-  
-  // hose from memory
-  delete pg_map[pgid];
+
+  // mark deleted
+  pg->mark_deleted();
+
+  // unlock
+  pg_lock.erase(pgid);
+  pg->put();   
+
+  // remove from map
   pg_map.erase(pgid);
+  pg->put_unlock();     // will delete, if last reference
+}
+
+
+
+void OSD::load_pgs()
+{
+  dout(10) << "load_pgs" << endl;
+  assert(pg_map.empty());
+
+  list<coll_t> ls;
+  store->list_collections(ls);
+
+  for (list<coll_t>::iterator it = ls.begin();
+       it != ls.end();
+       it++) {
+    pg_t pgid = *it;
+
+    PG *pg = 0;
+    if (pgid.is_rep())
+      new ReplicatedPG(this, pgid);
+    else if (pgid.is_raid4())
+      new RAID4PG(this, pgid);
+    else 
+      assert(0);
+    pg_map[pgid] = pg;
+    pg->get();
+
+    // read pg info
+    store->collection_getattr(pgid, "info", &pg->info, sizeof(pg->info));
+    
+    // read pg log
+    pg->read_log(store);
+
+    // generate state for current mapping
+    int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting);
+    int role = osdmap->calc_pg_role(whoami, pg->acting, nrep);
+    pg->set_role(role);
+
+    dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl;
+  }
 }
+
+
+/**
+ * check epochs starting from start to verify the pg acting set hasn't changed
+ * up until now
+ */
+void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
+{
+  dout(15) << "project_pg_history " << pgid
+           << " from " << from << " to " << osdmap->get_epoch()
+           << ", start " << h
+           << endl;
+
+  vector<int> last;
+  osdmap->pg_to_acting_osds(pgid, last);
+
+  for (epoch_t e = osdmap->get_epoch()-1;
+       e >= from;
+       e--) {
+    // verify during intermediate epoch
+    OSDMap oldmap;
+    get_map(e, oldmap);
+
+    vector<int> acting;
+    oldmap.pg_to_acting_osds(pgid, acting);
+
+    // acting set change?
+    if (acting != last && 
+        e <= h.same_since) {
+      dout(15) << "project_pg_history " << pgid << " changed in " << e+1 
+                << " from " << acting << " -> " << last << endl;
+      h.same_since = e+1;
+    }
+
+    // primary change?
+    if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) &&
+        e <= h.same_primary_since) {
+      dout(15) << "project_pg_history " << pgid << " primary changed in " << e+1 << endl;
+      h.same_primary_since = e+1;
+    
+      if (g_conf.osd_rep == OSD_REP_PRIMARY)
+        h.same_acker_since = h.same_primary_since;
+    }
 
+    // acker change?
+    if (g_conf.osd_rep != OSD_REP_PRIMARY) {
+      if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) &&
+          e <= h.same_acker_since) {
+        dout(15) << "project_pg_history " << pgid << " acker changed in " << e+1 << endl;
+        h.same_acker_since = e+1;
+      }
+    }
+
+    if (h.same_since > e &&
+        h.same_primary_since > e &&
+        h.same_acker_since > e) break;
+  }
+
+  dout(15) << "project_pg_history end " << h << endl;
+}
 
 void OSD::activate_pg(pg_t pgid, epoch_t epoch)
 {
@@ -1016,7 +1161,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
        int role = osdmap->calc_pg_role(whoami, acting, nrep);
        if (role < 0) continue;
        
-       PG *pg = create_pg(pgid, t);
+       PG *pg = _create_lock_pg(pgid, t);
        pg->set_role(role);
        pg->acting.swap(acting);
        pg->last_epoch_started_any = 
@@ -1025,8 +1170,9 @@ void OSD::advance_map(ObjectStore::Transaction& t)
          pg->info.history.same_primary_since = 
            pg->info.history.same_acker_since = osdmap->get_epoch();
        pg->activate(t);
-       
+
        dout(7) << "created " << *pg << endl;
+       _unlock_pg(pgid);
       }
 
       for (ps_t ps = 0; ps < maxlps; ++ps) {
@@ -1036,7 +1182,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
        int nrep = osdmap->pg_to_acting_osds(pgid, acting);
        int role = osdmap->calc_pg_role(whoami, acting, nrep);
        
-       PG *pg = create_pg(pgid, t);
+       PG *pg = _create_lock_pg(pgid, t);
        pg->acting.swap(acting);
        pg->set_role(role);
        pg->last_epoch_started_any = 
@@ -1047,11 +1193,11 @@ void OSD::advance_map(ObjectStore::Transaction& t)
        pg->activate(t);
        
        dout(7) << "created " << *pg << endl;
+       _unlock_pg(pgid);
       }
     }
 
     // raided
-    /*
     for (int size = g_conf.osd_min_raid_width;
         size <= g_conf.osd_max_raid_width;
         size++) {
@@ -1062,7 +1208,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
        int role = osdmap->calc_pg_role(whoami, acting, nrep);
        if (role < 0) continue;
        
-       PG *pg = create_pg(pgid, t);
+       PG *pg = _create_lock_pg(pgid, t);
        pg->set_role(role);
        pg->acting.swap(acting);
        pg->last_epoch_started_any = 
@@ -1071,8 +1217,9 @@ void OSD::advance_map(ObjectStore::Transaction& t)
          pg->info.history.same_primary_since = 
            pg->info.history.same_acker_since = osdmap->get_epoch();
        pg->activate(t);
-       
+
        dout(7) << "created " << *pg << endl;
+       _unlock_pg(pgid);
       }
 
       for (ps_t ps = 0; ps < maxlps; ++ps) {
@@ -1082,7 +1229,7 @@ void OSD::advance_map(ObjectStore::Transaction& t)
        int nrep = osdmap->pg_to_acting_osds(pgid, acting);
        int role = osdmap->calc_pg_role(whoami, acting, nrep);
        
-       PG *pg = create_pg(pgid, t);
+       PG *pg = _create_lock_pg(pgid, t);
        pg->acting.swap(acting);
        pg->set_role(role);
        pg->last_epoch_started_any = 
@@ -1093,9 +1240,9 @@ void OSD::advance_map(ObjectStore::Transaction& t)
        pg->activate(t);
        
        dout(7) << "created " << *pg << endl;
+       _unlock_pg(pgid);
       }
     }
-    */
     dout(1) << "mkfs done, created " << pg_map.size() << " pgs" << endl;
 
   } else {
@@ -1392,142 +1539,6 @@ bool OSD::require_same_or_newer_map(Message *m, epoch_t epoch)
 
 
 
-// ======================================================
-// PG's
-
-bool OSD::pg_exists(pg_t pgid) 
-{
-  return store->collection_exists(pgid);
-}
-
-PG *OSD::create_pg(pg_t pgid, ObjectStore::Transaction& t)
-{
-  if (pg_map.count(pgid)) {
-    dout(0) << "create_pg on " << pgid << ", already have " << *pg_map[pgid] << endl;
-  }
-  assert(pg_map.count(pgid) == 0);
-  assert(!pg_exists(pgid));
-
-  PG *pg;
-  if (pgid.is_rep())
-    pg = new ReplicatedPG(this, pgid);
-  else if (pgid.is_raid4())
-    assert(0); //pg = new RAID4PG(this, pgid);
-  else 
-    assert(0);
-  pg_map[pgid] = pg;
-
-  t.create_collection(pgid);
-
-  return pg;
-}
-
-
-PG *OSD::get_pg(pg_t pgid)
-{
-  if (pg_map.count(pgid))
-    return pg_map[pgid];
-  return 0;
-}
-
-void OSD::load_pgs()
-{
-  dout(10) << "load_pgs" << endl;
-  assert(pg_map.empty());
-
-  list<coll_t> ls;
-  store->list_collections(ls);
-
-  for (list<coll_t>::iterator it = ls.begin();
-       it != ls.end();
-       it++) {
-    pg_t pgid = *it;
-
-    PG *pg = 0;
-    if (pgid.is_rep())
-      new ReplicatedPG(this, pgid);
-    else if (pgid.is_raid4())
-      assert(0); //new RAID4PG(this, pgid);
-    else 
-      assert(0);
-    pg_map[pgid] = pg;
-
-    // read pg info
-    store->collection_getattr(pgid, "info", &pg->info, sizeof(pg->info));
-    
-    // read pg log
-    pg->read_log(store);
-
-    // generate state for current mapping
-    int nrep = osdmap->pg_to_acting_osds(pgid, pg->acting);
-    int role = osdmap->calc_pg_role(whoami, pg->acting, nrep);
-    pg->set_role(role);
-
-    dout(10) << "load_pgs loaded " << *pg << " " << pg->log << endl;
-  }
-}
-
-
-/**
- * check epochs starting from start to verify the pg acting set hasn't changed
- * up until now
- */
-void OSD::project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from)
-{
-  dout(15) << "project_pg_history " << pgid
-           << " from " << from << " to " << osdmap->get_epoch()
-           << ", start " << h
-           << endl;
-
-  vector<int> last;
-  osdmap->pg_to_acting_osds(pgid, last);
-
-  for (epoch_t e = osdmap->get_epoch()-1;
-       e >= from;
-       e--) {
-    // verify during intermediate epoch
-    OSDMap oldmap;
-    get_map(e, oldmap);
-
-    vector<int> acting;
-    oldmap.pg_to_acting_osds(pgid, acting);
-
-    // acting set change?
-    if (acting != last && 
-        e <= h.same_since) {
-      dout(15) << "project_pg_history " << pgid << " changed in " << e+1 
-                << " from " << acting << " -> " << last << endl;
-      h.same_since = e+1;
-    }
-
-    // primary change?
-    if (!(!acting.empty() && !last.empty() && acting[0] == last[0]) &&
-        e <= h.same_primary_since) {
-      dout(15) << "project_pg_history " << pgid << " primary changed in " << e+1 << endl;
-      h.same_primary_since = e+1;
-    
-      if (g_conf.osd_rep == OSD_REP_PRIMARY)
-        h.same_acker_since = h.same_primary_since;
-    }
-
-    // acker change?
-    if (g_conf.osd_rep != OSD_REP_PRIMARY) {
-      if (!(!acting.empty() && !last.empty() && acting[acting.size()-1] == last[last.size()-1]) &&
-          e <= h.same_acker_since) {
-        dout(15) << "project_pg_history " << pgid << " acker changed in " << e+1 << endl;
-        h.same_acker_since = e+1;
-      }
-    }
-
-    if (h.same_since > e &&
-        h.same_primary_since > e &&
-        h.same_acker_since > e) break;
-  }
-
-  dout(15) << "project_pg_history end " << h << endl;
-}
-
 
 /** do_notifies
  * Send an MOSDPGNotify to a primary, with a list of PGs that I have
@@ -1608,7 +1619,7 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
       }
       
       // ok, create PG!
-      pg = create_pg(pgid, t);
+      pg = _create_lock_pg(pgid, t);
       osdmap->pg_to_acting_osds(pgid, pg->acting);
       pg->set_role(0);
       pg->info.history = history;
@@ -1625,8 +1636,6 @@ void OSD::handle_pg_notify(MOSDPGNotify *m)
         take_waiters(waiting_for_pg[pgid]);
         waiting_for_pg.erase(pgid);
       }
-
-      _lock_pg(pgid);
     } else {
       // already had it.  am i (still) the primary?
       pg = _lock_pg(pgid);
@@ -1809,7 +1818,7 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
       assert(role > 0);
 
       ObjectStore::Transaction t;
-      pg = create_pg(pgid, t);
+      pg = _create_lock_pg(pgid, t);
       pg->acting.swap( acting );
       pg->set_role(role);
       pg->info.history = history;
@@ -1818,7 +1827,6 @@ void OSD::handle_pg_query(MOSDPGQuery *m)
       store->apply_transaction(t);
 
       dout(10) << *pg << " dne (before), but i am role " << role << endl;
-      _lock_pg(pgid);
     } else {
       pg = _lock_pg(pgid);
       
@@ -1908,11 +1916,7 @@ void OSD::handle_pg_remove(MOSDPGRemove *m)
     dout(10) << *pg << " removing." << endl;
     assert(pg->get_role() == -1);
     
-    _remove_pg(pgid);
-
-    // unlock.  there shouldn't be any waiters, since we're a stray, and pg is presumably clean0.
-    assert(pg_lock_waiters.count(pgid) == 0);
-    _unlock_pg(pgid);
+    _remove_unlock_pg(pg);
   }
 
   delete m;
@@ -1929,7 +1933,7 @@ void OSD::handle_pg_remove(MOSDPGRemove *m)
 void OSD::handle_op(MOSDOp *op)
 {
   const pg_t pgid = op->get_pg();
-  PG *pg = get_pg(pgid);
+  PG *pg = _have_pg(pgid) ? _lock_pg(pgid):0;
 
 
   logger->set("buf", buffer_total_alloc);
@@ -1940,7 +1944,10 @@ void OSD::handle_op(MOSDOp *op)
 
 
   // require same or newer map
-  if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
+  if (!require_same_or_newer_map(op, op->get_map_epoch())) {
+    _unlock_pg(pgid);
+    return;
+  }
 
   // share our map with sender, if they're old
   _share_map_incoming(op->get_source_inst(), op->get_map_epoch());
@@ -1961,6 +1968,7 @@ void OSD::handle_op(MOSDOp *op)
               << pgid 
               << ", waiting" << endl;
       waiting_for_pg[pgid].push_back(op);
+      _unlock_pg(pgid);
       return;
     }
 
@@ -1970,6 +1978,7 @@ void OSD::handle_op(MOSDOp *op)
              << " after " << op->get_map_epoch() 
              << ", dropping" << endl;
       assert(op->get_map_epoch() < osdmap->get_epoch());
+      _unlock_pg(pgid);
       delete op;
       return;
     }
@@ -1979,6 +1988,7 @@ void OSD::handle_op(MOSDOp *op)
              << " after " << op->get_map_epoch() 
              << ", dropping" << endl;
       assert(op->get_map_epoch() < osdmap->get_epoch());
+      _unlock_pg(pgid);
       delete op;
       return;
     }
@@ -1991,6 +2001,7 @@ void OSD::handle_op(MOSDOp *op)
           dout(7) << *pg << " queueing replay at " << op->get_version()
                   << " for " << *op << endl;
           pg->replay_queue[op->get_version()] = op;
+         _unlock_pg(pgid);
           return;
         } else {
           dout(7) << *pg << " replay at " << op->get_version() << " <= " << pg->info.last_update 
@@ -2001,12 +2012,14 @@ void OSD::handle_op(MOSDOp *op)
       
       dout(7) << *pg << " not active (yet)" << endl;
       pg->waiting_for_active.push_back(op);
+      _unlock_pg(pgid);
       return;
     }
     
     // missing object?
     if (pg->is_missing_object(op->get_oid())) {
       pg->wait_for_missing_object(op->get_oid(), op);
+      _unlock_pg(pgid);
       return;
     }
     /*
@@ -2054,6 +2067,7 @@ void OSD::handle_op(MOSDOp *op)
          int peer = pg->acting[1];
          dout(-10) << "fwd client read op to osd" << peer << " for " << op->get_client() << " " << op->get_client_inst() << endl;
          messenger->send_message(op, osdmap->get_inst(peer));
+         _unlock_pg(pgid);
          return;
        }
       }
@@ -2077,6 +2091,7 @@ void OSD::handle_op(MOSDOp *op)
                        << " osd" << peer
                        << endl;
              messenger->send_message(op, osdmap->get_inst(peer));
+             _unlock_pg(pgid);
              return;
            }
          }
@@ -2101,6 +2116,7 @@ void OSD::handle_op(MOSDOp *op)
       dout(10) << "handle_rep_op pg changed " << pg->info.history
                << " after " << op->get_map_epoch() 
                << ", dropping" << endl;
+      _unlock_pg(pgid);
       delete op;
       return;
     }
@@ -2110,10 +2126,17 @@ void OSD::handle_op(MOSDOp *op)
   }
   
   if (g_conf.osd_maxthreads < 1) {
-    _lock_pg(pgid);
-    do_op(op, pg); // do it now
+
+    if (op->get_type() == MSG_OSD_OP)
+      pg->do_op((MOSDOp*)op); // do it now
+    else if (op->get_type() == MSG_OSD_OPREPLY)
+      pg->do_op_reply((MOSDOpReply*)op);
+    else 
+      assert(0);
+
     _unlock_pg(pgid);
   } else {
+    _unlock_pg(pgid);
     // queue for worker threads
     /*if (read) 
       enqueue_op(0, op);     // no locking needed for reads
@@ -2136,7 +2159,6 @@ void OSD::handle_op_reply(MOSDOpReply *op)
   
   // make sure we have the pg
   const pg_t pgid = op->get_pg();
-  PG *pg = get_pg(pgid);
 
   // require same or newer map
   if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
@@ -2144,14 +2166,15 @@ void OSD::handle_op_reply(MOSDOpReply *op)
   // share our map with sender, if they're old
   _share_map_incoming(op->get_source_inst(), op->get_map_epoch());
 
-  if (!pg) {
+  if (!_have_pg(pgid)) {
     // hmm.
     delete op;
-  }
+    return;
+  } 
 
   if (g_conf.osd_maxthreads < 1) {
-    _lock_pg(pgid);
-    do_op(op, pg); // do it now
+    PG *pg = _lock_pg(pgid);
+    pg->do_op_reply(op); // do it now
     _unlock_pg(pgid);
   } else {
     enqueue_op(pgid, op);     // queue for worker threads
@@ -2211,7 +2234,12 @@ void OSD::dequeue_op(pg_t pgid)
   osd_lock.Unlock();
 
   // do it
-  do_op(op, pg);
+  if (op->get_type() == MSG_OSD_OP)
+    pg->do_op((MOSDOp*)op); // do it now
+  else if (op->get_type() == MSG_OSD_OPREPLY)
+    pg->do_op_reply((MOSDOpReply*)op);
+  else 
+    assert(0);
 
   // finish
   osd_lock.Lock();
@@ -2237,93 +2265,6 @@ void OSD::dequeue_op(pg_t pgid)
 
 
 
-/** do_op - do an op
- * object lock will be held (if multithreaded)
- * osd_lock NOT held.
- */
-void OSD::do_op(Message *m, PG *pg) 
-{
-  //dout(15) << "do_op " << *m << endl;
-
-  if (m->get_type() == MSG_OSD_OP) {
-    MOSDOp *op = (MOSDOp*)m;
-
-    logger->inc("op");
-
-    switch (op->get_op()) {
-      
-      // reads
-    case OSD_OP_READ:
-      if (block_if_wrlocked(op)) 
-       return;
-      pg->op_read(op);
-      break;
-    case OSD_OP_STAT:
-      if (block_if_wrlocked(op)) 
-       return;
-      pg->op_stat(op);
-      break;
-      
-      // rep stuff
-    case OSD_OP_PULL:
-      pg->op_pull(op);
-      break;
-    case OSD_OP_PUSH:
-      pg->op_push(op);
-      break;
-      
-      // writes
-    case OSD_OP_WRNOOP:
-    case OSD_OP_WRITE:
-    case OSD_OP_ZERO:
-    case OSD_OP_DELETE:
-    case OSD_OP_TRUNCATE:
-    case OSD_OP_WRLOCK:
-    case OSD_OP_WRUNLOCK:
-    case OSD_OP_RDLOCK:
-    case OSD_OP_RDUNLOCK:
-    case OSD_OP_UPLOCK:
-    case OSD_OP_DNLOCK:
-      if (op->get_source().is_osd()) {
-        pg->op_rep_modify(op);
-      } else {
-       // locked by someone else?
-       // for _any_ op type -- eg only the locker can unlock!
-       if (op->get_op() != OSD_OP_WRNOOP &&  // except WRNOOP; we just want to flush
-           block_if_wrlocked(op)) 
-         return; // op will be handled later, after the object unlocks
-       
-       // share latest osd map with rest of pg?
-       osd_lock.Lock();
-       {
-         for (unsigned i=1; i<pg->acting.size(); i++) {
-           int osd = pg->acting[i];
-           _share_map_outgoing( osdmap->get_inst(osd) ); 
-         }
-       }
-       osd_lock.Unlock();
-       
-       // go go gadget pg
-        pg->op_modify(op);
-
-       if (op->get_op() == OSD_OP_WRITE) {
-         logger->inc("c_wr");
-         logger->inc("c_wrb", op->get_length());
-       }
-      }
-      break;
-      
-    default:
-      assert(0);
-    }
-  } 
-  else if (m->get_type() == MSG_OSD_OPREPLY) {
-    pg->op_reply((MOSDOpReply*)m);
-  } else
-    assert(0);
-}
-
-
 
 void OSD::wait_for_no_ops()
 {
index 1fa0752712a326bc207eb164ac058c355ab62eb5..df9c710f3dd88e2a4daa699309967bec0fd6edde 100644 (file)
 #include "ObjectStore.h"
 #include "PG.h"
 
+#include "messages/MOSDOp.h"
+
+
 #include <map>
 using namespace std;
+
 #include <ext/hash_map>
 #include <ext/hash_set>
 using namespace __gnu_cxx;
 
-#include "messages/MOSDOp.h"
 
 class Messenger;
 class Message;
+class Logger;
+class ObjectStore;
+class OSDMap;
 
 
-  
-
 class OSD : public Dispatcher {
 public:
+  // -- states --
+  static const int STATE_BOOTING = 1;
+  static const int STATE_ACTIVE = 2;
+  static const int STATE_STOPPING = 3;
 
-  /** superblock
-   */
+
+  /** OSD **/
+protected:
+  Mutex osd_lock;     // global lock
+  SafeTimer timer;    // safe timer
+
+  Messenger   *messenger; 
+  Logger      *logger;
+  ObjectStore *store;
+  MonMap      *monmap;
+
+  int whoami;
+  char dev_path[100];
+
+public:
+  int get_nodeid() { return whoami; }
+  
+private:
+  /** superblock **/
   OSDSuperblock superblock;
   epoch_t  boot_epoch;      
 
@@ -55,29 +80,15 @@ public:
   int read_superblock();
 
 
-  /** OSD **/
- protected:
-  Messenger *messenger;
-  int whoami;
-
-  static const int STATE_BOOTING = 1;
-  static const int STATE_ACTIVE = 2;
-  static const int STATE_STOPPING = 3;
-
+  // -- state --
   int state;
 
+public:
   bool is_booting() { return state == STATE_BOOTING; }
   bool is_active() { return state == STATE_ACTIVE; }
   bool is_stopping() { return state == STATE_STOPPING; }
 
-
-  MonMap *monmap;
-
-  class Logger      *logger;
-
-  // local store
-  char dev_path[100];
-  class ObjectStore *store;
+private:
 
   // heartbeat
   void heartbeat();
@@ -91,9 +102,6 @@ public:
     }
   };
 
-  // global lock
-  Mutex osd_lock;
-  SafeTimer timer;
 
   // -- stats --
   int hb_stat_ops;  // ops since last heartbeat
@@ -101,26 +109,21 @@ public:
 
   hash_map<int, float> peer_qlen;
   
-  // per-pg locking (serializing)
-  hash_set<pg_t>               pg_lock;
-  hash_map<pg_t, list<Cond*> > pg_lock_waiters;  
-  PG *lock_pg(pg_t pgid);
-  PG *_lock_pg(pg_t pgid);
-  void unlock_pg(pg_t pgid);
-  void _unlock_pg(pg_t pgid);
 
-  // finished waiting messages, that will go at tail of dispatch()
+  // -- waiters --
   list<class Message*> finished;
+
   void take_waiters(list<class Message*>& ls) {
     finished.splice(finished.end(), ls);
   }
   
-  // object locking
-  hash_map<object_t, list<Message*> > waiting_for_wr_unlock; /** list of operations for each object waiting for 'wrunlock' */
-
+  // -- object locking --
+  hash_map<object_t, list<Message*> > waiting_for_wr_unlock; 
+  
   bool block_if_wrlocked(MOSDOp* op);
 
-  // -- ops --
+
+  // -- op queue --
   class ThreadPool<class OSD*, pg_t>   *threadpool;
   hash_map<pg_t, list<Message*> >       op_queue;
   int   pending_ops;
@@ -136,26 +139,17 @@ public:
     o->dequeue_op(pgid);
   };
 
-  void do_op(Message *m, PG *pg);  // actually do it
 
-  void prepare_log_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version, 
-                              objectrev_t crev, objectrev_t rev, PG *pg, eversion_t trim_to);
-  void prepare_op_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version, 
-                             objectrev_t crev, objectrev_t rev, PG *pg);
-  
-  bool waitfor_missing_object(MOSDOp *op, PG *pg);
-
-
-  
   friend class PG;
   friend class ReplicatedPG;
-  friend class C_OSD_WriteCommit;
+  friend class RAID4PG;
+
 
  protected:
 
   // -- osd map --
-  class OSDMap  *osdmap;
-  list<class Message*> waiting_for_osdmap;
+  OSDMap         *osdmap;
+  list<Message*>  waiting_for_osdmap;
 
   hash_map<entity_name_t, epoch_t>  peer_map_epoch;  // FIXME types
   bool _share_map_incoming(const entity_inst_t& inst, epoch_t epoch);
@@ -176,18 +170,23 @@ public:
 
 
 
-  // -- replication --
+  // -- placement groups --
+  hash_map<pg_t, PG*> pg_map;
+  hash_map<pg_t, list<Message*> > waiting_for_pg;
 
-  // PG
-  hash_map<pg_t, PG*>      pg_map;
-  void  load_pgs();
-  bool  pg_exists(pg_t pg);
-  PG   *create_pg(pg_t pg, ObjectStore::Transaction& t);          // create new PG
-  PG   *get_pg(pg_t pg);             // return existing PG, or null
-  void  _remove_pg(pg_t pg);         // remove from store and memory
+  // per-pg locking (serializes AND acquired pg lock)
+  hash_set<pg_t>               pg_lock;
+  hash_map<pg_t, list<Cond*> > pg_lock_waiters;  
+  
+  PG   *_lock_pg(pg_t pgid);
+  void  _unlock_pg(pg_t pgid);
 
-  void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from);
+  PG   *_create_lock_pg(pg_t pg, ObjectStore::Transaction& t);          // create new PG
+  bool  _have_pg(pg_t pgid);
+  void  _remove_unlock_pg(PG *pg);         // remove from store and memory
 
+  void load_pgs();
+  void project_pg_history(pg_t pgid, PG::Info::History& h, epoch_t from);
   void activate_pg(pg_t pgid, epoch_t epoch);
 
   class C_Activate : public Context {
@@ -202,10 +201,9 @@ public:
   };
 
 
+  // -- tids --
+  // for ops i issue
   tid_t               last_tid;
-  int                 num_pulling;
-
-  hash_map<pg_t, list<Message*> >        waiting_for_pg;
 
   Mutex tid_lock;
   tid_t get_tid() {
@@ -216,18 +214,14 @@ public:
     return t;
   }
 
-  //void handle_rep_op_ack(MOSDOpReply *m);
 
-  // recovery
+  // -- generic pg recovery --
+  int num_pulling;
+
   void do_notifies(map< int, list<PG::Info> >& notify_list);
   void do_queries(map< int, map<pg_t,PG::Query> >& query_map);
   void repeer(PG *pg, map< int, map<pg_t,PG::Query> >& query_map);
 
-  /*
-  void pull(PG *pg, object_t oid);
-  void push(PG *pg, object_t oid, int dest);
-  */
   bool require_current_map(Message *m, epoch_t v);
   bool require_same_or_newer_map(Message *m, epoch_t e);
 
@@ -236,23 +230,11 @@ public:
   void handle_pg_log(class MOSDPGLog *m);
   void handle_pg_remove(class MOSDPGRemove *m);
 
-  /*
-  void op_pull(class MOSDOp *op, PG *pg);
-  void op_push(class MOSDOp *op, PG *pg);
-  void op_rep_modify(class MOSDOp *op, PG *pg);   // write, trucnate, delete
-  void op_rep_modify_commit(class MOSDOp *op, int ackerosd, 
-                            eversion_t last_complete);
-  */
-
-  friend class C_OSD_RepModifyCommit;
-
 
  public:
   OSD(int id, Messenger *m, MonMap *mm, char *dev = 0);
   ~OSD();
 
-  int get_nodeid() { return whoami; }
-  
   // startup/shutdown
   int init();
   int shutdown();
@@ -263,13 +245,6 @@ public:
 
   void handle_osd_ping(class MOSDPing *m);
   void handle_op(class MOSDOp *m);
-
-  void op_read(class MOSDOp *m);//, PG *pg);
-  void op_stat(class MOSDOp *m);//, PG *pg);
-  void op_modify(class MOSDOp *m, PG *pg);
-  void op_modify_commit(pg_t pgid, tid_t rep_tid, eversion_t pg_complete_thru);
-
-  // for replication
   void handle_op_reply(class MOSDOpReply *m);
 
   void force_remount();
index bc8027007047a0aa39351e5b7d5268bf7822ae66..58085e7da4843977cb3804805dd90e924c645331 100644 (file)
@@ -264,7 +264,7 @@ private:
   /****   mapping facilities   ****/
 
   // oid -> pg
-  pg_t object_to_pg(object_t oid, FileLayout& layout) {
+  ObjectLayout file_to_object_layout(object_t oid, FileLayout& layout) {
     static crush::Hash H(777);
     
     // calculate ps (placement seed)
@@ -299,8 +299,9 @@ private:
       assert(0);
     }
 
-    // construct pg
-    return pg_t(layout.pg_type, layout.pg_size, ps, layout.preferred);
+    // construct object layout
+    return ObjectLayout(pg_t(layout.pg_type, layout.pg_size, ps, layout.preferred), 
+                       layout.object_stripe_unit);
   }
 
 
index 1d27afee541caad6a113d9e441970a76a5b02a5b..7421c6cfb2d87504e46ef0ac775fa3f4e934cf6b 100644 (file)
@@ -1196,3 +1196,6 @@ bool PG::pick_object_rev(object_t& oid)
   return false;  
 }
 
+
+
+
index 32e511fdb73b7009a479dbc31a952bd50af8f221..d1c5c34a6a7f99c83706f29fb734c6d0ab530568 100644 (file)
@@ -379,9 +379,47 @@ public:
   static const int STATE_STRAY =  16; // i must notify the primary i exist.
 
 
- protected:
+protected:
   OSD *osd;
 
+  /** locking and reference counting.
+   * I destroy myself when the reference count hits zero.
+   * lock() should be called before doing anything.
+   * get() should be called on pointer copy (to another thread, etc.).
+   * put() should be called on destruction of some previously copied pointer.
+   * put_unlock() when done with the current pointer (_most common_).
+   */  
+  Mutex _lock;
+  int  ref;
+  bool deleted;
+
+public:
+  void lock() {
+    //cout << info.pgid << " lock" << endl;
+    _lock.Lock();
+  }
+  void get() {
+    //cout << info.pgid << " get " << ref << endl;
+    assert(_lock.is_locked());
+    ++ref; 
+  }
+  void put() { 
+    //cout << info.pgid << " put " << ref << endl;
+    assert(_lock.is_locked());
+    --ref; 
+    assert(ref > 0);  // last put must be a put_unlock.
+  }
+  void put_unlock() { 
+    //cout << info.pgid << " put_unlock " << ref << endl;
+    assert(_lock.is_locked());
+    --ref; 
+    _lock.Unlock();
+    if (ref == 0) delete this;
+  }
+
+  void mark_deleted() { deleted = true; }
+  bool is_deleted() { return deleted; }
+
 public:
   // pg state
   Info        info;
@@ -481,12 +519,12 @@ public:
     return 0;
   }
 
-  virtual void op_rep_modify_commit(MOSDOp *op, int ackerosd, eversion_t last_complete) = 0;
   friend class C_OSD_RepModify_Commit;
 
  public:  
   PG(OSD *o, pg_t p) : 
     osd(o), 
+    ref(0), deleted(false),
     info(p),
     role(0),
     state(0),
@@ -558,16 +596,9 @@ public:
 
 
 
-
-
   // abstract bits
-  virtual void op_stat(MOSDOp *op) = 0;
-  virtual int op_read(MOSDOp *op) = 0;
-  virtual void op_modify(MOSDOp *op) = 0;
-  virtual void op_rep_modify(MOSDOp *op) = 0;
-  virtual void op_push(MOSDOp *op) = 0;
-  virtual void op_pull(MOSDOp *op) = 0;
-  virtual void op_reply(MOSDOpReply *op) = 0;
+  virtual void do_op(MOSDOp *op) = 0;
+  virtual void do_op_reply(MOSDOpReply *op) = 0;
 
   virtual bool same_for_read_since(epoch_t e) = 0;
   virtual bool same_for_modify_since(epoch_t e) = 0;
index d1ad6c93fa8f9d9f4fc86c8f6a3be3606bc15ef0..e63fd1b1433ae5f87080ca4c97fbdf7cd93a3604 100644 (file)
 
 
 
+void RAID4PG::do_op(MOSDOp *op)
+{
+
+
+}
+
+
+
+void RAID4PG::do_op_reply(MOSDOpReply *reply)
+{
+
+}
+
+
+
+// -----------------
+// pg changes
+
+bool RAID4PG::same_for_read_since(epoch_t e)
+{
+  return e >= info.history.same_since;   // whole pg set same
+}
+
+bool RAID4PG::same_for_modify_since(epoch_t e)
+{
+  return e >= info.history.same_since;   // whole pg set same
+}
+
+bool RAID4PG::same_for_rep_modify_since(epoch_t e)
+{
+  return e >= info.history.same_since;   // whole pg set same
+}
+
+
+// -----------------
+// RECOVERY
+
+bool RAID4PG::is_missing_object(object_t oid)
+{
+  return false;
+}
+
+void RAID4PG::wait_for_missing_object(object_t oid, MOSDOp *op)
+{
+  assert(0);
+}
+
+void RAID4PG::note_failed_osd(int o)
+{
+  dout(10) << "note_failed_osd osd" << o << endl;
+  assert(0);
+}
+
+void RAID4PG::on_acker_change()
+{
+  dout(10) << "on_acker_change" << endl;
+  assert(0);
+}
+
+
+void RAID4PG::on_role_change()
+{
+  dout(10) << "on_role_change" << endl;
+  assert(0);
+}
+
+
+void RAID4PG::clean_up_local(ObjectStore::Transaction&)
+{
+}
+
+void RAID4PG::cancel_recovery() 
+{
+  assert(0);
+}
+
+bool RAID4PG::do_recovery() 
+{
+  assert(0);
+  return false;
+}
+
+void RAID4PG::clean_replicas() 
+{
+  assert(0);
+}
+
 
 
index 2a6f3a8a148896bb07b539fa813760ddfe95ff77..9c75118c060692a4c6dab913bae8a2565aa9094d 100644 (file)
@@ -32,9 +32,6 @@ protected:
   void prepare_op_transaction(ObjectStore::Transaction& t, 
                              MOSDOp *op, eversion_t& version, 
                              objectrev_t crev, objectrev_t rev);
-  
-public:
-  RAID4PG(OSD *o, pg_t p) : PG(o,p) { }
 
   void op_stat(MOSDOp *op);
   int op_read(MOSDOp *op);
@@ -43,7 +40,13 @@ public:
   void op_push(MOSDOp *op);
   void op_pull(MOSDOp *op);
 
-  void op_reply(MOSDOpReply *r);
+
+  
+public:
+  RAID4PG(OSD *o, pg_t p) : PG(o,p) { }
+
+  void do_op(MOSDOp *op);
+  void do_op_reply(MOSDOpReply *r);
 
   bool same_for_read_since(epoch_t e);
   bool same_for_modify_since(epoch_t e);
@@ -57,6 +60,14 @@ public:
   void on_acker_change();
   void on_role_change();
 
+  void clean_up_local(ObjectStore::Transaction& t);
+
+  void cancel_recovery();
+  bool do_recovery();
+
+  void clean_replicas();
+
+
 };
 
 
index 967b80f7353b3dff6e909a23cfb9dfa188a3a34a..3d2a88a90b08da446c5e9524b9e57acec8d2ee0c 100644 (file)
@@ -94,6 +94,91 @@ void ReplicatedPG::wait_for_missing_object(object_t oid, MOSDOp *op)
 
 
 
+
+/** do_op - do an op
+ * pg lock will be held (if multithreaded)
+ * osd_lock NOT held.
+ */
+void ReplicatedPG::do_op(MOSDOp *op) 
+{
+  //dout(15) << "do_op " << *op << endl;
+
+  osd->logger->inc("op");
+
+  switch (op->get_op()) {
+    
+    // reads
+  case OSD_OP_READ:
+    if (osd->block_if_wrlocked(op)) 
+      return;
+    op_read(op);
+    break;
+  case OSD_OP_STAT:
+    if (osd->block_if_wrlocked(op)) 
+      return;
+    op_stat(op);
+    break;
+    
+    // rep stuff
+  case OSD_OP_PULL:
+    op_pull(op);
+    break;
+  case OSD_OP_PUSH:
+    op_push(op);
+    break;
+    
+    // writes
+  case OSD_OP_WRNOOP:
+  case OSD_OP_WRITE:
+  case OSD_OP_ZERO:
+  case OSD_OP_DELETE:
+  case OSD_OP_TRUNCATE:
+  case OSD_OP_WRLOCK:
+  case OSD_OP_WRUNLOCK:
+  case OSD_OP_RDLOCK:
+  case OSD_OP_RDUNLOCK:
+  case OSD_OP_UPLOCK:
+  case OSD_OP_DNLOCK:
+    if (op->get_source().is_osd()) {
+      op_rep_modify(op);
+    } else {
+      // go go gadget pg
+      op_modify(op);
+      
+      if (op->get_op() == OSD_OP_WRITE) {
+       osd->logger->inc("c_wr");
+       osd->logger->inc("c_wrb", op->get_length());
+      }
+    }
+    break;
+    
+  default:
+    assert(0);
+  }
+}
+
+void ReplicatedPG::do_op_reply(MOSDOpReply *r)
+{
+  // must be replication.
+  tid_t rep_tid = r->get_rep_tid();
+  
+  if (rep_gather.count(rep_tid)) {
+    // oh, good.
+    int fromosd = r->get_source().num();
+    repop_ack(rep_gather[rep_tid], 
+             r->get_result(), r->get_commit(), 
+             fromosd, 
+             r->get_pg_complete_thru());
+    delete r;
+  } else {
+    // early ack.
+    waiting_for_repop[rep_tid].push_back(r);
+  }
+}
+
+
+
+
 // ========================================================================
 // READS
 
@@ -358,19 +443,19 @@ void ReplicatedPG::prepare_op_transaction(ObjectStore::Transaction& t,
 // ========================================================================
 // rep op gather
 
-class C_OSD_WriteCommit : public Context {
+class C_OSD_ModifyCommit : public Context {
 public:
-  OSD *osd;
-  pg_t pgid;
+  ReplicatedPG *pg;
   tid_t rep_tid;
   eversion_t pg_last_complete;
-  C_OSD_WriteCommit(OSD *o, pg_t p, tid_t rt, eversion_t lc) : osd(o), pgid(p), rep_tid(rt), pg_last_complete(lc) {}
+  C_OSD_ModifyCommit(ReplicatedPG *p, tid_t rt, eversion_t lc) : pg(p), rep_tid(rt), pg_last_complete(lc) {
+    pg->get();  // we're copying the pointer
+  }
   void finish(int r) {
-    ReplicatedPG *pg = (ReplicatedPG*)(osd->_lock_pg(pgid));
-    if (pg) {
+    pg->lock();
+    if (!pg->is_deleted()) 
       pg->op_modify_commit(rep_tid, pg_last_complete);
-      osd->_unlock_pg(pg->info.pgid);
-    }
+    pg->put_unlock();
   }
 };
 
@@ -386,7 +471,7 @@ void ReplicatedPG::apply_repop(RepGather *repop)
   dout(10) << "apply_repop  applying update on " << *repop << endl;
   assert(!repop->applied);
 
-  Context *oncommit = new C_OSD_WriteCommit(osd, info.pgid, repop->rep_tid, repop->pg_local_last_complete);
+  Context *oncommit = new C_OSD_ModifyCommit(this, repop->rep_tid, repop->pg_local_last_complete);
   unsigned r = osd->store->apply_transaction(repop->t, oncommit);
   if (r)
     dout(-10) << "apply_repop  apply transaction return " << r << " on " << *repop << endl;
@@ -474,7 +559,7 @@ void ReplicatedPG::issue_repop(MOSDOp *op, int dest)
   // forward the write/update/whatever
   MOSDOp *wr = new MOSDOp(op->get_client_inst(), op->get_client_inc(), op->get_reqid().tid,
                           oid,
-                          info.pgid,
+                          ObjectLayout(info.pgid),
                           osd->osdmap->get_epoch(),
                           op->get_op());
   wr->get_data() = op->get_data();   // _copy_ bufferlist
@@ -662,7 +747,7 @@ objectrev_t ReplicatedPG::assign_version(MOSDOp *op)
 // commit (to disk) callback
 class C_OSD_RepModifyCommit : public Context {
 public:
-  OSD *osd;
+  ReplicatedPG *pg;
   MOSDOp *op;
   int destosd;
 
@@ -673,9 +758,11 @@ public:
   bool acked;
   bool waiting;
 
-  C_OSD_RepModifyCommit(OSD *o, MOSDOp *oo, int dosd, eversion_t lc) : 
-    osd(o), op(oo), destosd(dosd), pg_last_complete(lc),
-    acked(false), waiting(false) { }
+  C_OSD_RepModifyCommit(ReplicatedPG *p, MOSDOp *oo, int dosd, eversion_t lc) : 
+    pg(p), op(oo), destosd(dosd), pg_last_complete(lc),
+    acked(false), waiting(false) { 
+    pg->get();  // we're copying the pointer.
+  }
   void finish(int r) {
     lock.Lock();
     assert(!waiting);
@@ -686,9 +773,9 @@ public:
     assert(acked);
     lock.Unlock();
 
-    PG *pg = osd->lock_pg(op->get_pg());
+    pg->lock();
     pg->op_rep_modify_commit(op, destosd, pg_last_complete);
-    osd->unlock_pg(op->get_pg());
+    pg->put_unlock();
   }
   void ack() {
     lock.Lock();
@@ -710,6 +797,22 @@ void ReplicatedPG::op_modify(MOSDOp *op)
   object_t oid = op->get_oid();
   const char *opname = MOSDOp::get_opname(op->get_op());
 
+  // locked by someone else?
+  // for _any_ op type -- eg only the locker can unlock!
+  if (op->get_op() != OSD_OP_WRNOOP &&  // except WRNOOP; we just want to flush
+      osd->block_if_wrlocked(op)) 
+    return; // op will be handled later, after the object unlocks
+  
+  // share latest osd map with rest of pg?
+  osd->osd_lock.Lock();
+  {
+    for (unsigned i=1; i<acting.size(); i++) {
+      osd->_share_map_outgoing( osd->osdmap->get_inst(acting[i]) ); 
+    }
+  }
+  osd->osd_lock.Unlock();
+  
+
   // dup op?
   if (is_dup(op->get_reqid())) {
     dout(-3) << "op_modify " << opname << " dup op " << op->get_reqid()
@@ -793,7 +896,7 @@ void ReplicatedPG::op_modify(MOSDOp *op)
     prepare_log_transaction(t, op, nv, crev, op->get_rev(), peers_complete_thru);
     prepare_op_transaction(t, op, nv, crev, op->get_rev());
 
-    C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(osd, op, get_acker(), 
+    C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, get_acker(), 
                                                                 info.last_complete);
     unsigned r = osd->store->apply_transaction(t, oncommit);
     if (r != 0 &&   // no errors
@@ -897,7 +1000,7 @@ void ReplicatedPG::op_rep_modify(MOSDOp *op)
       prepare_op_transaction(t, op, nv, crev, op->get_rev());
     }
 
-    oncommit = new C_OSD_RepModifyCommit(osd, op, ackerosd, info.last_complete);
+    oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, info.last_complete);
 
     // apply log update. and possibly update itself.
     unsigned tr = osd->store->apply_transaction(t, oncommit);
@@ -1152,25 +1255,6 @@ void ReplicatedPG::op_push(MOSDOp *op)
 
 
 
-void ReplicatedPG::op_reply(MOSDOpReply *r)
-{
-  // must be replication.
-  tid_t rep_tid = r->get_rep_tid();
-  
-  if (rep_gather.count(rep_tid)) {
-    // oh, good.
-    int fromosd = r->get_source().num();
-    repop_ack(rep_gather[rep_tid], 
-             r->get_result(), r->get_commit(), 
-             fromosd, 
-             r->get_pg_complete_thru());
-    delete r;
-  } else {
-    // early ack.
-    waiting_for_repop[rep_tid].push_back(r);
-  }
-}
-
 
 
 void ReplicatedPG::note_failed_osd(int o)
index 555bf0580fcce94a3ea34adae8bfe80af49175a9..38b287e065ea03eed6e32d79a6499648f4828b8b 100644 (file)
@@ -100,7 +100,8 @@ protected:
                              MOSDOp *op, eversion_t& version, 
                              objectrev_t crev, objectrev_t rev);
 
-  friend class C_OSD_WriteCommit;
+  friend class C_OSD_ModifyCommit;
+  friend class C_OSD_RepModifyCommit;
 
 
   // pg on-disk content
@@ -113,6 +114,13 @@ protected:
   void clean_replicas();
 
 
+  void op_stat(MOSDOp *op);
+  int op_read(MOSDOp *op);
+  void op_modify(MOSDOp *op);
+  void op_rep_modify(MOSDOp *op);
+  void op_push(MOSDOp *op);
+  void op_pull(MOSDOp *op);
+
 
 
 
@@ -123,14 +131,9 @@ public:
   { }
   ~ReplicatedPG() {}
 
-  void op_stat(MOSDOp *op);
-  int op_read(MOSDOp *op);
-  void op_modify(MOSDOp *op);
-  void op_rep_modify(MOSDOp *op);
-  void op_push(MOSDOp *op);
-  void op_pull(MOSDOp *op);
 
-  void op_reply(MOSDOpReply *r);
+  void do_op(MOSDOp *op);
+  void do_op_reply(MOSDOpReply *r);
 
   bool same_for_read_since(epoch_t e);
   bool same_for_modify_since(epoch_t e);
index 6ca4fe5839822bf094be9607db1d2e0d849d0b12..d3d34896eddfef80c904ba983be7a26993a2dbda 100644 (file)
@@ -110,7 +110,7 @@ inline ostream& operator<<(ostream& out, pg_t pg)
     out << pg.preferred() << 'p';
   out << hex << pg.ps() << dec;
 
-  out << "=" << hex << (__uint64_t)pg << dec;
+  //out << "=" << hex << (__uint64_t)pg << dec;
   return out;
 }
 
@@ -126,7 +126,27 @@ namespace __gnu_cxx {
 }
 
 
+/** ObjectLayout
+ *
+ * describes an object's placement and layout in the storage cluster.  
+ * most importatly, which pg it belongs to.
+ * if that pg is raided, it also specifies the object's stripe_unit.
+ */
+struct ObjectLayout {
+  pg_t pgid;            // what pg do i belong to
+  int  stripe_unit;     // for object raid in raid pgs
+
+  ObjectLayout() : pgid(0), stripe_unit(0) { }
+  ObjectLayout(pg_t p, int su=0) : pgid(p), stripe_unit(su) { }
+};
 
+inline ostream& operator<<(ostream& out, const ObjectLayout &ol)
+{
+  out << "pg" << ol.pgid;
+  if (ol.stripe_unit)
+    out << ".su=" << ol.stripe_unit;
+  return out;
+}
 
 
 
@@ -173,18 +193,19 @@ class ObjectExtent {
   size_t      length;    // in object
 
   objectrev_t rev;       // which revision?
-  pg_t        pgid;      // where to find the object
+
+  ObjectLayout layout;   // object layout (pgid, etc.)
 
   map<size_t, size_t>  buffer_extents;  // off -> len.  extents in buffer being mapped (may be fragmented bc of striping!)
   
-  ObjectExtent() : start(0), length(0), rev(0), pgid(0) {}
-  ObjectExtent(object_t o, off_t s=0, size_t l=0) : oid(o), start(s), length(l), rev(0), pgid(0) { }
+  ObjectExtent() : start(0), length(0), rev(0) {}
+  ObjectExtent(object_t o, off_t s=0, size_t l=0) : oid(o), start(s), length(l), rev(0) { }
 };
 
 inline ostream& operator<<(ostream& out, ObjectExtent &ex)
 {
   return out << "extent(" 
-             << ex.oid << " in " << hex << ex.pgid << dec
+             << ex.oid << " in " << ex.layout
              << " " << ex.start << "~" << ex.length
              << ")";
 }
index 368c9d3a9555d6d0f0ae33005a57256963895781..240c5c0aef8d85b07771a5c185f2259ead0b9ab4 100644 (file)
@@ -79,7 +79,7 @@ void Filer::_probe(Probe *probe)
        p++) {
     dout(10) << "_probe  probing " << p->oid << endl;
     C_Probe *c = new C_Probe(this, probe, p->oid);
-    probe->ops[p->oid] = objecter->stat(p->oid, &c->size, c);
+    probe->ops[p->oid] = objecter->stat(p->oid, &c->size, p->layout, c);
   }
 }
 
@@ -192,7 +192,7 @@ void Filer::file_to_extents(inode_t inode,
       ex = &object_extents[oid];
       ex->oid = oid;
       ex->rev = rev;
-      ex->pgid = objecter->osdmap->object_to_pg( oid, inode.layout );
+      ex->layout = objecter->osdmap->file_to_object_layout( oid, inode.layout );
     }
     
     // map range into object
@@ -219,7 +219,7 @@ void Filer::file_to_extents(inode_t inode,
     }
     ex->buffer_extents[cur-offset] = x_len;
         
-    dout(15) << "file_to_extents  " << *ex << " in " << ex->pgid << endl;
+    dout(15) << "file_to_extents  " << *ex << " in " << ex->layout << endl;
     //cout << "map: ino " << ino << " oid " << ex.oid << " osd " << ex.osd << " offset " << ex.offset << " len " << ex.len << " ... left " << left << endl;
     
     left -= x_len;
index e2520f595096dec63d13ee8d89a87da89cbc5a3f..91a67165ae3c53a651a2863ce947cccbf11cc2e5 100644 (file)
@@ -386,7 +386,7 @@ void ObjectCacher::bh_read(BufferHead *bh)
   C_ReadFinish *onfinish = new C_ReadFinish(this, bh->ob->get_oid(), bh->start(), bh->length());
 
   // go
-  objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), &onfinish->bl,
+  objecter->read(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), &onfinish->bl,
                  onfinish);
 }
 
@@ -463,7 +463,7 @@ void ObjectCacher::bh_write(BufferHead *bh)
   C_WriteCommit *oncommit = new C_WriteCommit(this, bh->ob->get_oid(), bh->start(), bh->length());
 
   // go
-  tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->bl,
+  tid_t tid = objecter->write(bh->ob->get_oid(), bh->start(), bh->length(), bh->ob->get_layout(), bh->bl,
                               onack, oncommit);
 
   // set bh last_write_tid
@@ -701,7 +701,7 @@ int ObjectCacher::readx(Objecter::OSDRead *rd, inodeno_t ino, Context *onfinish)
     dout(10) << "readx " << *ex_it << endl;
 
     // get Object cache
-    Object *o = get_object(ex_it->oid, ino);
+    Object *o = get_object(ex_it->oid, ino, ex_it->layout);
     
     // map extent into bufferheads
     map<off_t, BufferHead*> hits, missing, rx;
@@ -826,7 +826,7 @@ int ObjectCacher::writex(Objecter::OSDWrite *wr, inodeno_t ino)
        ex_it != wr->extents.end();
        ex_it++) {
     // get object cache
-    Object *o = get_object(ex_it->oid, ino);
+    Object *o = get_object(ex_it->oid, ino, ex_it->layout);
 
     // map it all into a single bufferhead.
     BufferHead *bh = o->map_write(wr);
@@ -967,7 +967,7 @@ int ObjectCacher::atomic_sync_readx(Objecter::OSDRead *rd, inodeno_t ino, Mutex&
     for (map<object_t,ObjectExtent>::iterator i = by_oid.begin();
          i != by_oid.end();
          i++) {
-      Object *o = get_object(i->first, ino);
+      Object *o = get_object(i->first, ino, i->second.layout);
       rdlock(o);
     }
 
@@ -1008,7 +1008,7 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute
     // make sure we aren't already locking/locked...
     object_t oid = wr->extents.front().oid;
     Object *o = 0;
-    if (objects.count(oid)) o = get_object(oid, ino);
+    if (objects.count(oid)) o = get_object(oid, ino, wr->extents.front().layout);
     if (!o || 
         (o->lock_state != Object::LOCK_WRLOCK &&
          o->lock_state != Object::LOCK_WRLOCKING &&
@@ -1041,7 +1041,7 @@ int ObjectCacher::atomic_sync_writex(Objecter::OSDWrite *wr, inodeno_t ino, Mute
   for (map<object_t,ObjectExtent>::iterator i = by_oid.begin();
        i != by_oid.end();
        i++) {
-    Object *o = get_object(i->first, ino);
+    Object *o = get_object(i->first, ino, i->second.layout);
     wrlock(o);
   }
   
@@ -1085,7 +1085,7 @@ void ObjectCacher::rdlock(Object *o)
     commit->tid = 
       ack->tid = 
       o->last_write_tid = 
-      objecter->lock(OSD_OP_RDLOCK, o->get_oid(), ack, commit);
+      objecter->lock(OSD_OP_RDLOCK, o->get_oid(), o->get_layout(), ack, commit);
   }
   
   // stake our claim.
@@ -1129,7 +1129,7 @@ void ObjectCacher::wrlock(Object *o)
     commit->tid = 
       ack->tid = 
       o->last_write_tid = 
-      objecter->lock(op, o->get_oid(), ack, commit);
+      objecter->lock(op, o->get_oid(), o->get_layout(), ack, commit);
   }
   
   // stake our claim.
@@ -1173,7 +1173,7 @@ void ObjectCacher::rdunlock(Object *o)
   commit->tid = 
     lockack->tid = 
     o->last_write_tid = 
-    objecter->lock(OSD_OP_RDUNLOCK, o->get_oid(), lockack, commit);
+    objecter->lock(OSD_OP_RDUNLOCK, o->get_oid(), o->get_layout(), lockack, commit);
 }
 
 void ObjectCacher::wrunlock(Object *o)
@@ -1206,7 +1206,7 @@ void ObjectCacher::wrunlock(Object *o)
   commit->tid = 
     lockack->tid = 
     o->last_write_tid = 
-    objecter->lock(op, o->get_oid(), lockack, commit);
+    objecter->lock(op, o->get_oid(), o->get_layout(), lockack, commit);
 }
 
 
index 27b154023209d62cd6d14cb23fd1e298a81d6de8..e435913e9600939fb0faccdd516b513356cbffa2 100644 (file)
@@ -99,6 +99,7 @@ class ObjectCacher {
     object_t  oid;   // this _always_ is oid.rev=0
     inodeno_t ino;
        objectrev_t rev; // last rev we're written
+       ObjectLayout layout;
     
   public:
     map<off_t, BufferHead*>     data;
@@ -127,9 +128,9 @@ class ObjectCacher {
     int rdlock_ref;  // how many ppl want or are using a READ lock
 
   public:
-    Object(ObjectCacher *_oc, object_t o, inodeno_t i) : 
+    Object(ObjectCacher *_oc, object_t o, inodeno_t i, ObjectLayout& l) : 
       oc(_oc),
-      oid(o), ino(i), 
+      oid(o), ino(i), layout(l),
       last_write_tid(0), last_ack_tid(0), last_commit_tid(0),
       lock_state(LOCK_NONE), wrlock_ref(0), rdlock_ref(0)
       {}
@@ -137,6 +138,9 @@ class ObjectCacher {
     object_t get_oid() { return oid; }
     inodeno_t get_ino() { return ino; }
 
+       ObjectLayout& get_layout() { return layout; }
+       void set_layout(ObjectLayout& l) { layout = l; }
+
     bool can_close() {
       return data.empty() && lock_state == LOCK_NONE &&
         waitfor_ack.empty() && waitfor_commit.empty() &&
@@ -216,13 +220,13 @@ class ObjectCacher {
   
 
   // objects
-  Object *get_object(object_t oid, inodeno_t ino) {
+  Object *get_object(object_t oid, inodeno_t ino, ObjectLayout &l) {
     // have it?
     if (objects.count(oid))
       return objects[oid];
 
     // create it.
-    Object *o = new Object(this, oid, ino);
+    Object *o = new Object(this, oid, ino, l);
     objects[oid] = o;
     objects_by_ino[ino].insert(o);
     return o;
index c531a840803b134710511b134f328b4c47473fb3..b24b10393e48a2e745d04a6a3380eaafcabeaf8f 100644 (file)
@@ -251,12 +251,12 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
 // stat -----------------------------------
 
-tid_t Objecter::stat(object_t oid, off_t *size, Context *onfinish,
+tid_t Objecter::stat(object_t oid, off_t *size, ObjectLayout& ol, Context *onfinish,
                                         objectrev_t rev)
 {
   OSDStat *st = new OSDStat(size);
   st->extents.push_back(ObjectExtent(oid, 0, 0));
-  st->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+  st->extents.front().layout = ol;
   st->extents.front().rev = rev;
   st->onfinish = onfinish;
 
@@ -267,17 +267,18 @@ tid_t Objecter::stat_submit(OSDStat *st)
 {
   // find OSD
   ObjectExtent &ex = st->extents.front();
-  PG &pg = get_pg( ex.pgid );
+  PG &pg = get_pg( ex.layout.pgid );
 
   // send
   last_tid++;
   assert(client_inc >= 0);
   MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
-                         ex.oid, ex.pgid, osdmap->get_epoch(), 
+                         ex.oid, ex.layout, osdmap->get_epoch(), 
                          OSD_OP_STAT);
+
   dout(10) << "stat_submit " << st << " tid " << last_tid
            << " oid " << ex.oid
-           << " pg " << ex.pgid
+           << " " << ex.layout
            << " osd" << pg.acker() 
            << endl;
 
@@ -350,13 +351,13 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
 // read -----------------------------------
 
 
-tid_t Objecter::read(object_t oid, off_t off, size_t len, bufferlist *bl, 
+tid_t Objecter::read(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist *bl, 
                      Context *onfinish, 
                                         objectrev_t rev)
 {
   OSDRead *rd = new OSDRead(bl);
   rd->extents.push_back(ObjectExtent(oid, off, len));
-  rd->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+  rd->extents.front().layout = ol;
   rd->extents.front().rev = rev;
   readx(rd, onfinish);
   return last_tid;
@@ -379,20 +380,20 @@ tid_t Objecter::readx(OSDRead *rd, Context *onfinish)
 tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex) 
 {
   // find OSD
-  PG &pg = get_pg( ex.pgid );
+  PG &pg = get_pg( ex.layout.pgid );
 
   // send
   last_tid++;
   assert(client_inc >= 0);
   MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
-                         ex.oid, ex.pgid, osdmap->get_epoch(), 
+                         ex.oid, ex.layout, osdmap->get_epoch(), 
                          OSD_OP_READ);
   m->set_length(ex.length);
   m->set_offset(ex.start);
   dout(10) << "readx_submit " << rd << " tid " << last_tid
            << " oid " << ex.oid << " " << ex.start << "~" << ex.length
            << " (" << ex.buffer_extents.size() << " buffer fragments)" 
-           << " pg " << ex.pgid
+           << " " << ex.layout
            << " osd" << pg.acker() 
            << endl;
 
@@ -576,13 +577,13 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
 
 // write ------------------------------------
 
-tid_t Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl, 
+tid_t Objecter::write(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist &bl, 
                       Context *onack, Context *oncommit,
                                          objectrev_t rev)
 {
   OSDWrite *wr = new OSDWrite(bl);
   wr->extents.push_back(ObjectExtent(oid, off, len));
-  wr->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+  wr->extents.front().layout = ol;
   wr->extents.front().buffer_extents[0] = len;
   wr->extents.front().rev = rev;
   modifyx(wr, onack, oncommit);
@@ -592,13 +593,13 @@ tid_t Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl,
 
 // zero
 
-tid_t Objecter::zero(object_t oid, off_t off, size_t len,  
+tid_t Objecter::zero(object_t oid, off_t off, size_t len, ObjectLayout& ol,
                      Context *onack, Context *oncommit,
                                         objectrev_t rev)
 {
   OSDModify *z = new OSDModify(OSD_OP_ZERO);
   z->extents.push_back(ObjectExtent(oid, off, len));
-  z->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+  z->extents.front().layout = ol;
   z->extents.front().rev = rev;
   modifyx(z, onack, oncommit);
   return last_tid;
@@ -607,12 +608,12 @@ tid_t Objecter::zero(object_t oid, off_t off, size_t len,
 
 // lock ops
 
-tid_t Objecter::lock(int op, object_t oid, 
+tid_t Objecter::lock(int op, object_t oid, ObjectLayout& ol, 
                      Context *onack, Context *oncommit)
 {
   OSDModify *l = new OSDModify(op);
   l->extents.push_back(ObjectExtent(oid, 0, 0));
-  l->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+  l->extents.front().layout = ol;
   modifyx(l, onack, oncommit);
   return last_tid;
 }
@@ -639,7 +640,7 @@ tid_t Objecter::modifyx(OSDModify *wr, Context *onack, Context *oncommit)
 tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
 {
   // find
-  PG &pg = get_pg( ex.pgid );
+  PG &pg = get_pg( ex.layout.pgid );
     
   // send
   tid_t tid;
@@ -649,7 +650,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
     tid = ++last_tid;
 
   MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
-                         ex.oid, ex.pgid, osdmap->get_epoch(),
+                         ex.oid, ex.layout, osdmap->get_epoch(),
                          wr->op);
   m->set_length(ex.length);
   m->set_offset(ex.start);
@@ -691,7 +692,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
   dout(10) << "modifyx_submit " << MOSDOp::get_opname(wr->op) << " tid " << tid
            << "  oid " << ex.oid
            << " " << ex.start << "~" << ex.length 
-           << " pg " << ex.pgid 
+           << " " << ex.layout 
            << " osd" << pg.primary()
            << endl;
   if (pg.primary() >= 0)
index f74081eafc3129bb1d3507a1c101519242a3b1bc..4887ed08648e7a995636e61d34444938bd524f89 100644 (file)
@@ -174,19 +174,19 @@ class Objecter {
   //tid_t lockx(OSDLock *l, Context *onack, Context *oncommit);
 
   // even lazier
-  tid_t read(object_t oid, off_t off, size_t len, bufferlist *bl, 
+  tid_t read(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist *bl, 
              Context *onfinish, 
                         objectrev_t rev=0);
-  tid_t write(object_t oid, off_t off, size_t len, bufferlist &bl, 
+  tid_t write(object_t oid, off_t off, size_t len, ObjectLayout& ol, bufferlist &bl, 
               Context *onack, Context *oncommit, 
                          objectrev_t rev=0);
-  tid_t zero(object_t oid, off_t off, size_t len,  
+  tid_t zero(object_t oid, off_t off, size_t len, ObjectLayout& ol,  
              Context *onack, Context *oncommit, 
                         objectrev_t rev=0);
-  tid_t stat(object_t oid, off_t *size, Context *onfinish, 
+  tid_t stat(object_t oid, off_t *size, ObjectLayout& ol, Context *onfinish, 
                         objectrev_t rev=0);  
 
-  tid_t lock(int op, object_t oid, Context *onack, Context *oncommit);
+  tid_t lock(int op, object_t oid, ObjectLayout& ol, Context *onack, Context *oncommit);
 
 
   void ms_handle_failure(Message *m, entity_name_t dest, const entity_inst_t& inst);