]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
object revisions;
authorsageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 7 Dec 2006 19:11:58 +0000 (19:11 +0000)
committersageweil <sageweil@29311d96-e01e-0410-9327-a35deaab8ce9>
Thu, 7 Dec 2006 19:11:58 +0000 (19:11 +0000)
acker is now second osd;
primitive read balancing

git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@984 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/osd/FakeStore.h
ceph/osd/OSD.cc
ceph/osd/OSD.h
ceph/osd/ObjectStore.h
ceph/osd/PG.cc
ceph/osd/PG.h
ceph/osdc/Filer.cc
ceph/osdc/Filer.h
ceph/osdc/ObjectCacher.h
ceph/osdc/Objecter.cc
ceph/osdc/Objecter.h

index 3f70f3b81de434725a157f8448c3439f217dc62b..eaa4126e84e4647ec31eb891871b34a0d5af3cb6 100644 (file)
@@ -66,7 +66,7 @@ class FakeStore : public ObjectStore,
 
   // ------------------
   // objects
-  int pick_object_revision(object_t& oid) {
+  int pick_object_revision_lt(object_t& oid) {
     return 0;
   }
   bool exists(object_t oid);
index d37ef233901c0e2f67e0b43bfef521ca36fb90d9..83dee0dd673c2b5e04a58a9df826be97e4686bb6 100644 (file)
@@ -115,6 +115,8 @@ OSD::OSD(int id, Messenger *m, MonMap *mm, char *dev)
 
   state = STATE_BOOTING;
 
+  hb_stat_ops = 0;
+  hb_stat_qlen = 0;
 
   pending_ops = 0;
   waiting_for_no_ops = false;
@@ -466,9 +468,6 @@ void OSD::activate_pg(pg_t pgid, epoch_t epoch)
       dispatch(*it);
     }
   }
-
-  // kick myself w/ a ping .. HACK
-  //messenger->send_message(new MPing, MSG_ADDR_OSD(whoami));
 }
 
 
@@ -482,7 +481,18 @@ void OSD::heartbeat()
   utime_t since = now;
   since.sec_ref() -= g_conf.osd_heartbeat_interval;
 
-  dout(-15) << "heartbeat " << now << endl;
+  // calc my stats
+  float avg_qlen = 0;
+  if (hb_stat_ops) avg_qlen = (float)hb_stat_qlen / (float)hb_stat_ops;
+
+  dout(5) << "heartbeat " << now 
+         << ": ops " << hb_stat_ops
+         << ", avg qlen " << avg_qlen
+         << endl;
+  
+  // reset until next time around
+  hb_stat_ops = 0;
+  hb_stat_qlen = 0;
 
   // send pings
   set<int> pingset;
@@ -504,7 +514,7 @@ void OSD::heartbeat()
        i != pingset.end();
        i++) {
     _share_map_outgoing( MSG_ADDR_OSD(*i), osdmap->get_inst(*i) );
-    messenger->send_message(new MOSDPing(osdmap->get_epoch()), 
+    messenger->send_message(new MOSDPing(osdmap->get_epoch(), avg_qlen), 
                             MSG_ADDR_OSD(*i), osdmap->get_inst(*i));
   }
 
@@ -600,17 +610,6 @@ void OSD::_share_map_outgoing(msg_addr_t dest, const entity_inst_t& inst)
 
 void OSD::dispatch(Message *m) 
 {
-  // check clock regularly
-  //utime_t now = g_clock.now();
-  //dout(-20) << now << endl;
-
-  /*// -- don't need lock --
-  switch (m->get_type()) {
-    return;
-  }
-  */
-
-
   // lock!
   osd_lock.Lock();
 
@@ -666,7 +665,6 @@ void OSD::dispatch(Message *m)
 
       
 
-
       // need OSDMap
       switch (m->get_type()) {
 
@@ -770,9 +768,14 @@ void OSD::handle_osd_ping(MOSDPing *m)
   dout(20) << "osdping from " << m->get_source() << endl;
   _share_map_incoming(m->get_source(), m->get_source_inst(), ((MOSDPing*)m)->map_epoch);
   
+  int from = m->get_source().num();
+  peer_qlen[from] = m->avg_qlen;
+
   //if (!m->ack)
   //messenger->send_message(new MOSDPing(osdmap->get_epoch(), true),
   //m->get_source());
+  delete m;
 }
 
 
@@ -2176,6 +2179,11 @@ void OSD::op_rep_modify(MOSDOp *op, PG *pg)
   eversion_t nv = op->get_version();
 
   const char *opname = MOSDOp::get_opname(op->get_op());
+
+  // check crev
+  objectrev_t crev = 0;
+  store->getattr(oid, "crev", (char*)&crev, sizeof(crev));
+
   dout(10) << "op_rep_modify " << opname 
            << " " << oid 
            << " v " << nv 
@@ -2194,9 +2202,9 @@ void OSD::op_rep_modify(MOSDOp *op, PG *pg)
   int ackerosd = pg->acting[0];
 
   if ((g_conf.osd_rep == OSD_REP_CHAIN || g_conf.osd_rep == OSD_REP_SPLAY)) {
-    ackerosd = pg->get_tail();
+    ackerosd = pg->get_acker();
   
-    if (pg->is_tail()) {
+    if (pg->is_acker()) {
       // i am tail acker.
       if (pg->repop_gather.count(op->get_rep_tid())) {
         repop = pg->repop_gather[ op->get_rep_tid() ];
@@ -2218,10 +2226,13 @@ void OSD::op_rep_modify(MOSDOp *op, PG *pg)
     }
 
     // chain?  forward?
-    if (g_conf.osd_rep == OSD_REP_CHAIN && !pg->is_tail()) {
+    if (g_conf.osd_rep == OSD_REP_CHAIN && !pg->is_acker()) {
       // chain rep, not at the tail yet.
       int myrank = osdmap->calc_pg_rank(whoami, pg->acting);
-      issue_repop(pg, op, pg->acting[myrank+1]);
+      int next = myrank+1;
+      if (next == (int)pg->acting.size())
+       next = 1;
+      issue_repop(pg, op, pg->acting[next]);   
     }
   }
 
@@ -2234,14 +2245,14 @@ void OSD::op_rep_modify(MOSDOp *op, PG *pg)
   if (repop) {
     // acker.  we'll apply later.
     if (op->get_op() != OSD_OP_WRNOOP) {
-      prepare_log_transaction(repop->t, op, nv, pg, op->get_pg_trim_to());
-      prepare_op_transaction(repop->t, op, nv, pg);
+      prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), pg, op->get_pg_trim_to());
+      prepare_op_transaction(repop->t, op, nv, crev, op->get_rev(), pg);
     }
   } else {
     // middle|replica.
     if (op->get_op() != OSD_OP_WRNOOP) {
-      prepare_log_transaction(t, op, nv, pg, op->get_pg_trim_to());
-      prepare_op_transaction(t, op, nv, pg);
+      prepare_log_transaction(t, op, nv, crev, op->get_rev(), pg, op->get_pg_trim_to());
+      prepare_op_transaction(t, op, nv, crev, op->get_rev(), pg);
     }
 
     oncommit = new C_OSD_RepModifyCommit(this, op, ackerosd, pg->info.last_complete);
@@ -2290,6 +2301,10 @@ void OSD::handle_op(MOSDOp *op)
 
   logger->set("buf", buffer_total_alloc);
 
+  // update qlen stats
+  hb_stat_ops++;
+  hb_stat_qlen += pending_ops;
+
 
   // require same or newer map
   if (!require_same_or_newer_map(op, op->get_map_epoch())) return;
@@ -2317,7 +2332,7 @@ void OSD::handle_op(MOSDOp *op)
     
     if (read) {
       // read. am i the (same) acker?
-      if (pg->get_acker() != whoami ||
+      if (//pg->get_acker() != whoami ||
           op->get_map_epoch() < pg->info.history.same_acker_since) {
         dout(7) << "acting acker is osd" << pg->get_acker()
                 << " since " << pg->info.history.same_acker_since 
@@ -2361,11 +2376,62 @@ void OSD::handle_op(MOSDOp *op)
     }
     
     // missing object?
-    if (op->get_op() != OSD_OP_PUSH &&
-        waitfor_missing_object(op, pg)) return;
+    if (read && op->get_oid().rev > 0) {
+      // versioned read.  hrm.
+      // are we missing a revision that we might need?
+      object_t moid = op->get_oid();
+      if (pick_missing_object_rev(moid, pg)) {
+       // is there a local revision we might use instead?
+       object_t loid = op->get_oid();
+       if (store->pick_object_revision_lt(loid) &&
+           moid <= loid) {
+         // we need moid.  pull it.
+         dout(10) << "handle_op read on " << op->get_oid()
+                  << ", have " << loid
+                  << ", but need missing " << moid
+                  << ", pulling" << endl;
+         pull(pg, moid);
+         pg->waiting_for_missing_object[moid].push_back(op);
+         return;
+       } 
+         
+       dout(10) << "handle_op read on " << op->get_oid()
+                << ", have " << loid
+                << ", don't need missing " << moid 
+                << endl;
+      }
+    } else {
+      // live revision.  easy.
+      if (op->get_op() != OSD_OP_PUSH &&
+         waitfor_missing_object(op, pg)) return;
+    }
 
     dout(7) << "handle_op " << *op << " in " << *pg << endl;
     
+    
+    // balance reads?
+    if (read &&
+       g_conf.osd_balance_reads) {
+      // am i above my average?
+      float my_avg = hb_stat_qlen / hb_stat_ops;
+      if (pending_ops > my_avg) {
+       // is there a peer who is below my average?
+       for (unsigned i=1; i<pg->acting.size(); ++i) {
+         int peer = pg->acting[i];
+         if (peer_qlen.count(peer) &&
+             peer_qlen[peer] < my_avg) {
+           // take the first one
+           dout(-10) << "my qlen " << pending_ops << " > my_avg " << my_avg
+                     << ", fwd to peer w/ qlen " << peer_qlen[peer]
+                     << " osd" << peer
+                     << endl;
+           messenger->send_message(op, MSG_ADDR_OSD(peer));
+           return;
+         }
+       }
+      }
+    }
+
   } else {
     // REPLICATION OP (it's from another OSD)
 
@@ -2649,6 +2715,56 @@ bool OSD::block_if_wrlocked(MOSDOp* op)
 // ===============================
 // OPS
 
+/*
+int OSD::list_missing_revs(object_t oid, set<object_t>& revs, PG *pg)
+{
+  int c = 0;
+  oid.rev = 0;
+  
+  map<object_t,eversion_t>::iterator p = pg->missing.missing.lower_bound(oid);
+  if (p == pg->missing.missing.end()) 
+    return 0;  // clearly not
+
+  while (p->first.ino == oid.ino &&
+        p->first.bno == oid.bno) {
+    revs.insert(p->first);
+    c++;
+  }
+  return c;
+}*/
+
+bool OSD::pick_missing_object_rev(object_t& oid, PG *pg)
+{
+  map<object_t,eversion_t>::iterator p = pg->missing.missing.upper_bound(oid);
+  if (p == pg->missing.missing.end()) 
+    return false;  // clearly no candidate
+
+  if (p->first.ino == oid.ino && p->first.bno == oid.bno) {
+    oid = p->first;  // yes!  it's an upper bound revision for me.
+    return true;
+  }
+  return false;
+}
+
+bool OSD::pick_object_rev(object_t& oid)
+{
+  object_t t = oid;
+
+  if (!store->pick_object_revision_lt(t))
+    return false; // we have no revisions of this object!
+  
+  objectrev_t crev;
+  int r = store->getattr(t, "crev", &crev, sizeof(crev));
+  assert(r >= 0);
+  if (crev <= oid.rev) {
+    dout(10) << "pick_object_rev choosing " << t << " crev " << crev << " for " << oid << endl;
+    oid = t;
+    return true;
+  }
+
+  return false;  
+}
+
 bool OSD::waitfor_missing_object(MOSDOp *op, PG *pg)
 {
   const object_t oid = op->get_oid();
@@ -2681,6 +2797,8 @@ bool OSD::waitfor_missing_object(MOSDOp *op, PG *pg)
 }
 
 
+
+
 // READ OPS
 
 /** op_read
@@ -2689,7 +2807,7 @@ bool OSD::waitfor_missing_object(MOSDOp *op, PG *pg)
  */
 void OSD::op_read(MOSDOp *op)//, PG *pg)
 {
-  const object_t oid = op->get_oid();
+  object_t oid = op->get_oid();
   
   // if the target object is locked for writing by another client, put 'op' to the waiting queue
   // for _any_ op type -- eg only the locker can unlock!
@@ -2700,30 +2818,38 @@ void OSD::op_read(MOSDOp *op)//, PG *pg)
     //<< " in " << *pg 
            << endl;
 
-  // read into a buffer
+  long r = 0;
   bufferlist bl;
-  long got = store->read(oid, 
-                         op->get_offset(), op->get_length(),
-                         bl);
+  
+  if (oid.rev && !pick_object_rev(oid)) {
+    // we have no revision for this request.
+    r = -EEXIST;
+  } else {
+    // read into a buffer
+    r = store->read(oid, 
+                   op->get_offset(), op->get_length(),
+                   bl);
+  }
+  
   // set up reply
   MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap->get_epoch(), true); 
-  if (got >= 0) {
+  if (r >= 0) {
     reply->set_result(0);
     reply->set_data(bl);
-    reply->set_length(got);
+    reply->set_length(r);
       
     logger->inc("c_rd");
-    logger->inc("c_rdb", got);
+    logger->inc("c_rdb", r);
     
   } else {
-    reply->set_result(got);   // error
+    reply->set_result(r);   // error
     reply->set_length(0);
   }
   
-  dout(10) << " read got " << got << " / " << op->get_length() << " bytes from obj " << oid << endl;
+  dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << endl;
   
   logger->inc("rd");
-  if (got >= 0) logger->inc("rdb", got);
+  if (r >= 0) logger->inc("rdb", r);
   
   // send it
   messenger->send_message(reply, op->get_client(), op->get_client_inst());
@@ -2745,7 +2871,14 @@ void OSD::op_stat(MOSDOp *op)//, PG *pg)
 
   struct stat st;
   memset(&st, sizeof(st), 0);
-  int r = store->stat(oid, &st);
+  int r = 0;
+
+  if (oid.rev && !pick_object_rev(oid)) {
+    // we have no revision for this request.
+    r = -EEXIST;
+  } else {
+    r = store->stat(oid, &st);
+  }
   
   dout(3) << "op_stat on " << oid 
           << " r = " << r
@@ -3011,7 +3144,8 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
     int peer = pg->acting[i];
     if (pg->peer_missing.count(peer) &&
         pg->peer_missing[peer].is_missing(oid)) {
-      // push it before this update.  FIXME, this is probably extra much work (eg if we're about to overwrite)
+      // push it before this update. 
+      // FIXME, this is probably extra much work (eg if we're about to overwrite)
       pg->peer_missing[peer].got(oid);
       push(pg, oid, peer);
     }
@@ -3033,7 +3167,12 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
     return; // op will be handled later, after the object unlocks
 
 
+  // check crev
+  objectrev_t crev = 0;
+  store->getattr(oid, "crev", (char*)&crev, sizeof(crev));
+
   // assign version
+  eversion_t clone_version;
   eversion_t nv = pg->log.top;
   if (op->get_op() != OSD_OP_WRNOOP) {
     nv.epoch = osdmap->get_epoch();
@@ -3041,11 +3180,25 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
     assert(nv > pg->info.last_update);
     assert(nv > pg->log.top);
 
+    // will clone?
+    if (crev && op->get_rev() && op->get_rev() > crev) {
+      clone_version = nv;
+      nv.version++;
+    }
+
     if (op->get_version().version) {
-      // replay
-      if (nv.version < op->get_version().version)
+      // replay!
+      if (nv.version < op->get_version().version) {
         nv.version = op->get_version().version; 
-    } 
+
+       // clone?
+       if (crev && op->get_rev() && op->get_rev() > crev) {
+         // backstep clone
+         clone_version = nv;
+         clone_version.version--;
+       }
+      }
+    }
   }
 
   // set version in op, for benefit of client and our eventual reply
@@ -3054,6 +3207,8 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
   dout(10) << "op_modify " << opname 
            << " " << oid 
            << " v " << nv 
+          << " crev " << crev
+          << " rev " << op->get_rev()
            << " " << op->get_offset() << "~" << op->get_length()
            << endl;  
 
@@ -3080,7 +3235,10 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
 
   if (g_conf.osd_rep == OSD_REP_CHAIN && !alone) {
     // chain rep.  send to #2 only.
-    issue_repop(pg, op, pg->acting[1]);
+    int next = pg->acting[1];
+    if (pg->acting.size() > 2)
+      next = pg->acting[2];
+    issue_repop(pg, op, next);
   } 
   else if (g_conf.osd_rep == OSD_REP_SPLAY && !alone) {
     // splay rep.  send to rest.
@@ -3101,8 +3259,8 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
     // we are acker.
     if (op->get_op() != OSD_OP_WRNOOP) {
       // log and update later.
-      prepare_log_transaction(repop->t, op, nv, pg, pg->peers_complete_thru);
-      prepare_op_transaction(repop->t, op, nv, pg);
+      prepare_log_transaction(repop->t, op, nv, crev, op->get_rev(), pg, pg->peers_complete_thru);
+      prepare_op_transaction(repop->t, op, nv, crev, op->get_rev(), pg);
     }
 
     // (logical) local ack.
@@ -3117,10 +3275,10 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
   } else {
     // chain or splay.  apply.
     ObjectStore::Transaction t;
-    prepare_log_transaction(t, op, nv, pg, pg->peers_complete_thru);
-    prepare_op_transaction(t, op, nv, pg);
+    prepare_log_transaction(t, op, nv, crev, op->get_rev(), pg, pg->peers_complete_thru);
+    prepare_op_transaction(t, op, nv, crev, op->get_rev(), pg);
 
-    C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, pg->get_tail(), 
+    C_OSD_RepModifyCommit *oncommit = new C_OSD_RepModifyCommit(this, op, pg->get_acker(), 
                                                                 pg->info.last_complete);
     unsigned r = store->apply_transaction(t, oncommit);
     if (r != 0 &&   // no errors
@@ -3136,21 +3294,34 @@ void OSD::op_modify(MOSDOp *op, PG *pg)
 
 
 void OSD::prepare_log_transaction(ObjectStore::Transaction& t, 
-                                  MOSDOp *op, eversion_t& version, PG *pg,
+                                  MOSDOp *op, eversion_t& version, 
+                                 objectrev_t crev, objectrev_t rev,
+                                 PG *pg,
                                   eversion_t trim_to)
 {
   const object_t oid = op->get_oid();
-  
-  int opcode = PG::Log::Entry::UPDATE;
+
+  // clone entry?
+  if (crev && rev && rev > crev) {
+    eversion_t cv = version;
+    cv.version--;
+    PG::Log::Entry cloneentry(PG::Log::Entry::CLONE, oid, cv,
+                           op->get_client(), op->get_tid());
+    pg->log.add(cloneentry);
+
+    dout(10) << "prepare_log_transaction " << op->get_op()
+            << " " << cloneentry
+            << " in " << *pg << endl;
+  }
+
+  // actual op
+  int opcode = PG::Log::Entry::MODIFY;
   if (op->get_op() == OSD_OP_DELETE) opcode = PG::Log::Entry::DELETE;
   PG::Log::Entry logentry(opcode, oid, version,
                           op->get_client(), op->get_tid());
 
   dout(10) << "prepare_log_transaction " << op->get_op()
            << " " << logentry
-    //           << (logentry.is_delete() ? " - ":" + ")
-    //<< oid 
-           << " v " << version
            << " in " << *pg << endl;
 
   // append to log
@@ -3168,14 +3339,20 @@ void OSD::prepare_log_transaction(ObjectStore::Transaction& t,
  * apply an op to the store wrapped in a transaction.
  */
 void OSD::prepare_op_transaction(ObjectStore::Transaction& t, 
-                                 MOSDOp *op, eversion_t& version, PG *pg)
+                                 MOSDOp *op, eversion_t& version, 
+                                objectrev_t crev, objectrev_t rev,
+                                PG *pg)
 {
   const object_t oid = op->get_oid();
   const pg_t pgid = op->get_pg();
 
+  bool did_clone = false;
+
   dout(10) << "prepare_op_transaction " << MOSDOp::get_opname( op->get_op() )
            << " " << oid 
            << " v " << version
+          << " crev " << crev
+          << " rev " << rev
            << " in " << *pg << endl;
   
   // WRNOOP does nothing.
@@ -3193,6 +3370,15 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
   // write pg info
   t.collection_setattr(pgid, "info", &pg->info, sizeof(pg->info));
 
+  // clone?
+  if (crev && rev && rev > crev) {
+    object_t noid = oid;
+    noid.rev = rev;
+    dout(10) << "prepare_op_transaction cloning " << oid << " crev " << crev << " to " << noid << endl;
+    t.clone(oid, noid);
+    did_clone = true;
+  }  
+
   // apply the op
   switch (op->get_op()) {
   case OSD_OP_WRLOCK:
@@ -3281,5 +3467,10 @@ void OSD::prepare_op_transaction(ObjectStore::Transaction& t,
     
     // object version
     t.setattr(oid, "version", &version, sizeof(version));
+
+    // set object crev
+    if (crev == 0 ||   // new object
+       did_clone)     // we cloned
+      t.setattr(oid, "crev", &rev, sizeof(rev));
   }
 }
index bd121bbf2deba42397bb34effe0c3cd10d7254b5..ae5dba7a8e01a3c0868a1e6866d1928b9726fd7a 100644 (file)
@@ -91,8 +91,14 @@ public:
   } *next_heartbeat;
 
   // global lock
-  Mutex osd_lock;                          
+  Mutex osd_lock;
 
+  // -- stats --
+  int hb_stat_ops;  // ops since last heartbeat
+  int hb_stat_qlen; // cumulative queue length since last hb
+
+  hash_map<int, float> peer_qlen;
+  
   // per-pg locking (serializing)
   hash_set<pg_t>               pg_lock;
   hash_map<pg_t, list<Cond*> > pg_lock_waiters;  
@@ -130,10 +136,15 @@ public:
 
   void do_op(Message *m, PG *pg);  // actually do it
 
-  void prepare_log_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version, PG *pg, eversion_t trim_to);
-  void prepare_op_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version, PG *pg);
+  void prepare_log_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version, 
+                              objectrev_t crev, objectrev_t rev, PG *pg, eversion_t trim_to);
+  void prepare_op_transaction(ObjectStore::Transaction& t, MOSDOp* op, eversion_t& version, 
+                             objectrev_t crev, objectrev_t rev, PG *pg);
   
   bool waitfor_missing_object(MOSDOp *op, PG *pg);
+  bool pick_missing_object_rev(object_t& oid, PG *pg);
+  bool pick_object_rev(object_t& oid);
+
 
   
  friend class PG;
index 7407b5c420311881666f69272bcd6e64544ad60a..21fbd867974ede768dab054cefc824d54e846468 100644 (file)
@@ -77,6 +77,7 @@ public:
     static const int OP_SETATTR =      14;  // oid, attrname, attrval
     static const int OP_SETATTRS =     15;  // oid, attrset
     static const int OP_RMATTR =       16;  // oid, attrname
+    static const int OP_CLONE =        17;  // oid, newoid
 
     static const int OP_MKCOLL =       20;  // cid
     static const int OP_RMCOLL =       21;  // cid
@@ -169,6 +170,12 @@ public:
       oids.push_back(oid);
       attrnames.push_back(name);
     }
+    void clone(object_t oid, object_t noid) {
+      int op = OP_CLONE;
+      ops.push_back(op);
+      oids.push_back(oid);
+      oids.push_back(noid);
+    }
     void create_collection(coll_t cid) {
       int op = OP_MKCOLL;
       ops.push_back(op);
@@ -306,6 +313,14 @@ public:
         }
         break;
 
+      case Transaction::OP_CLONE:
+       {
+          object_t oid = t.oids.front(); t.oids.pop_front();
+          object_t noid = t.oids.front(); t.oids.pop_front();
+         clone(oid, noid);
+       }
+       break;
+
       case Transaction::OP_MKCOLL:
         {
           coll_t cid = t.cids.front(); t.cids.pop_front();
@@ -384,7 +399,7 @@ public:
   virtual int statfs(struct statfs *buf) = 0;
 
   // objects
-  virtual int pick_object_revision(object_t& oid) = 0;
+  virtual int pick_object_revision_lt(object_t& oid) = 0;
 
   virtual bool exists(object_t oid) = 0;                   // useful?
   virtual int stat(object_t oid, struct stat *st) = 0;     // struct stat?
@@ -421,6 +436,10 @@ public:
   virtual int rmattr(object_t oid, const char *name,
                      Context *onsafe=0) {return 0;}
 
+  virtual int clone(object_t oid, object_t noid) {
+    return -1; 
+  }
+
   virtual int listattr(object_t oid, char *attrs, size_t size) {return 0;} //= 0;
   
   // collections
index 7a0e9ffd2691f4e736f660d8a90b8de7632fbeb7..4dee6f03bd166ee43f2b46ddc543c3a0521452c0 100644 (file)
@@ -442,7 +442,7 @@ void PG::generate_backlog()
     
     // add entry
     Log::Entry e;
-    e.op = Log::Entry::UPDATE;           // FIXME when we do smarter op codes!
+    e.op = Log::Entry::MODIFY;           // FIXME when we do smarter op codes!
     e.oid = *it;
     osd->store->getattr(*it, 
                         "version",
index 027acdf73c58a3be7c65c1fb07d8ba571621718a..f8a040346e88e401c31155ec6fdf02fb865706be 100644 (file)
@@ -232,23 +232,26 @@ public:
     class Entry {
     public:
       const static int LOST = 0;
-      const static int UPDATE = 1;
-      const static int DELETE = 2;
+      const static int MODIFY = 1;
+      const static int CLONE = 2;  
+      const static int DELETE = 3;
 
       int        op;   // write, zero, trunc, remove
       object_t   oid;
       eversion_t version;
+      objectrev_t rev;
+      
       reqid_t    reqid;  // caller+tid to uniquely identify request
-      //msg_addr_t who;  // who did this op,
-      //tid_t      tid;  // and their tid.
-
+      
       Entry() : op(0) {}
       Entry(int _op, object_t _oid, const eversion_t& v, 
             const msg_addr_t& a, tid_t t) : 
         op(_op), oid(_oid), version(v), reqid(a,t) {}
       
       bool is_delete() const { return op == DELETE; }
-      bool is_update() const { return !is_delete(); }
+      bool is_clone() const { return op == CLONE; }
+      bool is_modify() const { return op == MODIFY; }
+      bool is_update() const { return is_clone() || is_modify(); }
     };
 
     list<Entry> log;  // the actual log.
@@ -362,7 +365,7 @@ public:
       // add to log
       log.push_back(e);
       assert(e.version > top);
-      assert(top.version == 0 || e.version.version == top.version + 1);
+      assert(top.version == 0 || e.version.version > top.version);
       top = e.version;
 
       // to our index
@@ -575,15 +578,21 @@ public:
   int        get_nrep() const { return acting.size(); }
 
   int        get_primary() { return acting.empty() ? -1:acting[0]; }
-  int        get_tail() { return acting.empty() ? -1:acting[ acting.size()-1 ]; }
-  int        get_acker() { return g_conf.osd_rep == OSD_REP_PRIMARY ? get_primary():get_tail(); }
+  //int        get_tail() { return acting.empty() ? -1:acting[ acting.size()-1 ]; }
+  //int        get_acker() { return g_conf.osd_rep == OSD_REP_PRIMARY ? get_primary():get_tail(); }
+  int        get_acker() { 
+    if (g_conf.osd_rep == OSD_REP_PRIMARY ||
+       acting.size() <= 1) 
+      return get_primary();
+    return acting[1];
+  }
   
   int        get_role() const { return role; }
   void       set_role(int r) { role = r; }
 
   bool       is_primary() const { return role == PG_ROLE_HEAD; }
+  bool       is_acker() const { return role == PG_ROLE_ACKER; }
   bool       is_head() const { return role == PG_ROLE_HEAD; }
-  bool       is_tail() const { return role == PG_ROLE_TAIL; }
   bool       is_middle() const { return role == PG_ROLE_MIDDLE; }
   bool       is_residual() const { return role == PG_ROLE_STRAY; }
   
@@ -648,9 +657,11 @@ inline ostream& operator<<(ostream& out, const PG::Info& pgi)
 inline ostream& operator<<(ostream& out, const PG::Log::Entry& e)
 {
   return out << " " << e.version 
-             << (e.is_update() ? "   ":" - ")
-             << hex << e.oid << dec 
-             << " by " << e.reqid;
+             << (e.is_delete() ? " - ":
+                (e.is_clone() ? " c ":
+                 (e.is_modify() ? " m ":
+                  " ? ")))
+             << e.oid << " by " << e.reqid;
 }
 
 inline ostream& operator<<(ostream& out, const PG::Log& log) 
index a89ee9fe24127dbe1d30336e24e649b00fba1231..47094a30568361a63d90f3779946a3b3d947d8d8 100644 (file)
@@ -156,7 +156,8 @@ void Filer::_probed(Probe *probe, object_t oid, off_t size)
 
 void Filer::file_to_extents(inode_t inode,
                             off_t offset, size_t len,
-                            list<ObjectExtent>& extents) 
+                            list<ObjectExtent>& extents,
+                           objectrev_t rev) 
 {
   dout(10) << "file_to_extents " << offset << "~" << len 
            << " on " << hex << inode.ino << dec
@@ -190,6 +191,7 @@ void Filer::file_to_extents(inode_t inode,
     else {
       ex = &object_extents[oid];
       ex->oid = oid;
+      ex->rev = rev;
       ex->pgid = objecter->osdmap->object_to_pg( oid, inode.layout );
     }
     
index e0504c2975961c45507997d2dac0646788e6ffcb..161bfec30453120d86d04b9a382cee8f52b9e37c 100644 (file)
@@ -96,9 +96,10 @@ class Filer {
             bufferlist& bl,
             int flags, 
             Context *onack,
-            Context *oncommit) {
+            Context *oncommit,
+           objectrev_t rev=0) {
     Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
-    file_to_extents(inode, offset, len, wr->extents);
+    file_to_extents(inode, offset, len, wr->extents, rev);
     return objecter->modifyx(wr, onack, oncommit) > 0 ? 0:-1;
   }
 
@@ -147,7 +148,8 @@ class Filer {
   void file_to_extents(inode_t inode,
                        off_t offset,
                        size_t len,
-                       list<ObjectExtent>& extents);
+                       list<ObjectExtent>& extents,
+                      objectrev_t rev=0);
   
 };
 
index b722062f2c457d2be93eb7214984377dc807acc3..27b154023209d62cd6d14cb23fd1e298a81d6de8 100644 (file)
@@ -96,8 +96,9 @@ class ObjectCacher {
   private:
     // ObjectCacher::Object fields
     ObjectCacher *oc;
-    object_t  oid;
+    object_t  oid;   // this _always_ is oid.rev=0
     inodeno_t ino;
+       objectrev_t rev; // last rev we're written
     
   public:
     map<off_t, BufferHead*>     data;
@@ -476,7 +477,8 @@ class ObjectCacher {
 
   int file_write(inode_t& inode,
                  off_t offset, size_t len, 
-                 bufferlist& bl) {
+                 bufferlist& bl,
+                                objectrev_t rev=0) {
     Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
     filer.file_to_extents(inode, offset, len, wr->extents);
     return writex(wr, inode.ino);
@@ -498,7 +500,8 @@ class ObjectCacher {
   int file_atomic_sync_write(inode_t& inode,
                              off_t offset, size_t len, 
                              bufferlist& bl,
-                             Mutex &lock) {
+                             Mutex &lock,
+                                                        objectrev_t rev=0) {
     Objecter::OSDWrite *wr = new Objecter::OSDWrite(bl);
     filer.file_to_extents(inode, offset, len, wr->extents);
     return atomic_sync_writex(wr, inode.ino, lock);
index c97e8fbeade29ea49faf6d14f7fd75795f632f45..5e56781a2056982a5cb3ebe08c5ccf2527eb90c0 100644 (file)
@@ -135,7 +135,7 @@ void Objecter::scan_pgs(set<pg_t>& changed_pgs)
       if (!other.empty() &&
           !pg.acting.empty() &&
           other[0] == pg.acting[0] &&
-          other[other.size()-1] == pg.acting[pg.acting.size()-1]) 
+          other[other.size() > 1 ? 1:0] == pg.acting[pg.acting.size() > 1 ? 1:0]) 
         continue;
     }
     else if (g_conf.osd_rep == OSD_REP_CHAIN) {
@@ -255,11 +255,13 @@ void Objecter::handle_osd_op_reply(MOSDOpReply *m)
 
 // stat -----------------------------------
 
-tid_t Objecter::stat(object_t oid, off_t *size, Context *onfinish)
+tid_t Objecter::stat(object_t oid, off_t *size, Context *onfinish,
+                                        objectrev_t rev)
 {
   OSDStat *st = new OSDStat(size);
   st->extents.push_back(ObjectExtent(oid, 0, 0));
   st->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+  st->extents.front().rev = rev;
   st->onfinish = onfinish;
 
   return stat_submit(st);
@@ -352,11 +354,13 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
 
 
 tid_t Objecter::read(object_t oid, off_t off, size_t len, bufferlist *bl, 
-                     Context *onfinish)
+                     Context *onfinish, 
+                                        objectrev_t rev)
 {
   OSDRead *rd = new OSDRead(bl);
   rd->extents.push_back(ObjectExtent(oid, off, len));
   rd->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+  rd->extents.front().rev = rev;
   readx(rd, onfinish);
   return last_tid;
 }
@@ -575,12 +579,14 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
 // write ------------------------------------
 
 tid_t Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl, 
-                      Context *onack, Context *oncommit)
+                      Context *onack, Context *oncommit,
+                                         objectrev_t rev)
 {
   OSDWrite *wr = new OSDWrite(bl);
   wr->extents.push_back(ObjectExtent(oid, off, len));
   wr->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
   wr->extents.front().buffer_extents[0] = len;
+  wr->extents.front().rev = rev;
   modifyx(wr, onack, oncommit);
   return last_tid;
 }
@@ -589,11 +595,13 @@ tid_t Objecter::write(object_t oid, off_t off, size_t len, bufferlist &bl,
 // zero
 
 tid_t Objecter::zero(object_t oid, off_t off, size_t len,  
-                     Context *onack, Context *oncommit)
+                     Context *onack, Context *oncommit,
+                                        objectrev_t rev)
 {
   OSDModify *z = new OSDModify(OSD_OP_ZERO);
   z->extents.push_back(ObjectExtent(oid, off, len));
   z->extents.front().pgid = osdmap->object_to_pg( oid, g_OSD_FileLayout );
+  z->extents.front().rev = rev;
   modifyx(z, onack, oncommit);
   return last_tid;
 }
@@ -647,6 +655,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
                          wr->op);
   m->set_length(ex.length);
   m->set_offset(ex.start);
+  m->set_rev(ex.rev);
 
   if (wr->tid_version.count(tid)) 
     m->set_version(wr->tid_version[tid]);  // we're replaying this op!
index b8c187f424aee260576ae73e1f863046a9136608..72e637789f988f20b990991312f30c691549750d 100644 (file)
@@ -110,7 +110,7 @@ class Objecter {
       if (g_conf.osd_rep == OSD_REP_PRIMARY)
         return acting[0];
       else
-        return acting[acting.size()-1];
+        return acting[acting.size() > 1 ? 1:0];
     }
   };
 
@@ -170,12 +170,16 @@ class Objecter {
 
   // even lazier
   tid_t read(object_t oid, off_t off, size_t len, bufferlist *bl, 
-             Context *onfinish);
+             Context *onfinish, 
+                        objectrev_t rev=0);
   tid_t write(object_t oid, off_t off, size_t len, bufferlist &bl, 
-              Context *onack, Context *oncommit);
+              Context *onack, Context *oncommit, 
+                         objectrev_t rev=0);
   tid_t zero(object_t oid, off_t off, size_t len,  
-             Context *onack, Context *oncommit);
-  tid_t stat(object_t oid, off_t *size, Context *onfinish);  
+             Context *onack, Context *oncommit, 
+                        objectrev_t rev=0);
+  tid_t stat(object_t oid, off_t *size, Context *onfinish, 
+                        objectrev_t rev=0);  
 
   tid_t lock(int op, object_t oid, Context *onack, Context *oncommit);