]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
*** empty log message ***
authorsage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 14 Feb 2006 17:29:17 +0000 (17:29 +0000)
committersage <sage@29311d96-e01e-0410-9327-a35deaab8ce9>
Tue, 14 Feb 2006 17:29:17 +0000 (17:29 +0000)
git-svn-id: https://ceph.svn.sf.net/svnroot/ceph@627 29311d96-e01e-0410-9327-a35deaab8ce9

ceph/osd/OSD.cc
ceph/osd/OSD.h

index 113885c2c8d7b6c6db095277f7b8308842893087..1f81dfc5bf5e533c291386721f99e3be14a147f5 100644 (file)
@@ -461,6 +461,8 @@ void OSD::ack_replica_op(__uint64_t tid, int result, bool safe, int fromosd)
        put_repop(repop);
 
   } else {
+       assert(0);  // for now
+
        // failure
        get_repop(repop);
 
@@ -1834,19 +1836,16 @@ public:
 
 void OSD::op_rep_modify_safe(MOSDOp *op)
 {
-  //object_t oid = op->get_oid();
+  // hack: hack_blah is true until 'ack' has been sent.
   if (op->hack_blah) {
        dout(0) << "got rep_modify_safe before rep_modify applied, waiting" << endl;
        g_timer.add_event_after(1, new C_OSD_RepModifySafe(this, op));
-  } else 
-       //lock_object(oid);      // ... just to make sure the original write is finished with *op
-  {
+  } else {
        dout(10) << "rep_modify_safe on op " << *op << endl;
-       MOSDOpReply *ack2 = new MOSDOpReply(op, 0, osdmap, true);
-       messenger->send_message(ack2, op->get_asker());
+       MOSDOpReply *safe = new MOSDOpReply(op, 0, osdmap, true);
+       messenger->send_message(safe, op->get_asker());
        delete op;
   }
-  //unlock_object(oid);
 }
 
 void OSD::op_rep_modify(MOSDOp *op)
@@ -1854,55 +1853,54 @@ void OSD::op_rep_modify(MOSDOp *op)
   // when we introduce unordered messaging.. FIXME
   object_t oid = op->get_oid();
 
-  //lock_object(oid);
-  {
-       version_t ov = 0;
-       if (store->exists(oid)) 
-         store->getattr(oid, "version", &ov, sizeof(ov));
-       if (op->get_old_version() != ov) 
-         dout(0) << "rep_modify old version is " << ov << "  msg sez " << op->get_old_version() << endl;
-       assert(op->get_old_version() == ov);
-       
-       // PG
-       PG *pg = get_pg(op->get_pg());
-       assert(pg);
-       
-       dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
-       
-       int r = 0;
-       Context *onsafe = 0;
-       if (op->get_op() == OSD_OP_REP_WRITE) {
-         // write
-         assert(op->get_data().length() == op->get_length());
-         onsafe = new C_OSD_RepModifySafe(this, op);
-         op->hack_blah = true;
-         r = apply_write(op, op->get_version(), onsafe);
-         if (ov == 0) pg->add_object(store, oid);
-         
-         logger->inc("r_wr");
-         logger->inc("r_wrb", op->get_length());
-         op->hack_blah = false;
-       } else if (op->get_op() == OSD_OP_REP_DELETE) {
-         // delete
-         store->collection_remove(pg->get_pgid(), op->get_oid());
-         r = store->remove(oid);
-       } else if (op->get_op() == OSD_OP_REP_TRUNCATE) {
-         // truncate
-         r = store->truncate(oid, op->get_offset());
-       } else assert(0);
+  version_t ov = 0;
+  if (store->exists(oid)) 
+       store->getattr(oid, "version", &ov, sizeof(ov));
+  if (op->get_old_version() != ov) 
+       dout(0) << "rep_modify old version is " << ov << "  msg sez " << op->get_old_version() << endl;
+  assert(op->get_old_version() == ov);
+  
+  // PG
+  PG *pg = get_pg(op->get_pg());
+  assert(pg);
+  
+  dout(12) << "rep_modify in " << *pg << " o " << hex << oid << dec << " v " << op->get_version() << " (i have " << ov << ")" << endl;
+  
+  int r = 0;
+  Context *onsafe = 0;
+  
+  op->hack_blah = true;  // hack: make sure any 'safe' goes out _after_ our ack
+  
+  if (op->get_op() == OSD_OP_REP_WRITE) {
+       // write
+       assert(op->get_data().length() == op->get_length());
+       onsafe = new C_OSD_RepModifySafe(this, op);
+       r = apply_write(op, op->get_version(), onsafe);
+       if (ov == 0) pg->add_object(store, oid);
        
-       if (onsafe) {
-         // ack
-         MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
-         messenger->send_message(ack, op->get_asker());
-       } else {
-         // safe, safe
-         MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true);
-         messenger->send_message(ack, op->get_asker());
-         delete op;
-       }
+       logger->inc("r_wr");
+       logger->inc("r_wrb", op->get_length());
+  } else if (op->get_op() == OSD_OP_REP_DELETE) {
+       // delete
+       store->collection_remove(pg->get_pgid(), op->get_oid());
+       r = store->remove(oid);
+  } else if (op->get_op() == OSD_OP_REP_TRUNCATE) {
+       // truncate
+       r = store->truncate(oid, op->get_offset());
+  } else assert(0);
+  
+  if (onsafe) {
+       // ack
+       MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, false);
+       messenger->send_message(ack, op->get_asker());
+  } else {
+       // safe, safe
+       MOSDOpReply *ack = new MOSDOpReply(op, 0, osdmap, true);
+       messenger->send_message(ack, op->get_asker());
+       delete op;
   }
-  //unlock_object(oid);
+  
+  op->hack_blah = false;  // hack: make sure any 'safe' goes out _after_ our ack
 }
 
 
@@ -2078,9 +2076,9 @@ void OSD::dequeue_op(object_t oid)
   unlock_object(oid);
   
   // finish
-  dout(10) << "finish op " << op << endl;
   osd_lock.Lock();
   {
+       dout(10) << "dequeue_op finish op " << op << endl;
        assert(pending_ops > 0);
        pending_ops--;
        if (pending_ops == 0 && waiting_for_no_ops)
@@ -2175,58 +2173,53 @@ void OSD::wait_for_no_ops()
 void OSD::op_read(MOSDOp *op)
 {
   object_t oid = op->get_oid();
-  //lock_object(oid);
-  {
-       // read into a buffer
-       bufferlist bl;
-       long got = store->read(oid, 
-                                                  op->get_length(), op->get_offset(),
-                                                  bl);
-       // set up reply
-       MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); 
-       if (got >= 0) {
-         reply->set_result(0);
-         reply->set_data(bl);
-         reply->set_length(got);
-         
-         logger->inc("c_rd");
-         logger->inc("c_rdb", got);
+
+  // read into a buffer
+  bufferlist bl;
+  long got = store->read(oid, 
+                                                op->get_length(), op->get_offset(),
+                                                bl);
+  // set up reply
+  MOSDOpReply *reply = new MOSDOpReply(op, 0, osdmap, true); 
+  if (got >= 0) {
+       reply->set_result(0);
+       reply->set_data(bl);
+       reply->set_length(got);
          
-       } else {
-         reply->set_result(got);   // error
-         reply->set_length(0);
-       }
-       
-       dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
+       logger->inc("c_rd");
+       logger->inc("c_rdb", got);
        
-       logger->inc("rd");
-       if (got >= 0) logger->inc("rdb", got);
-       
-       // send it
-       messenger->send_message(reply, op->get_asker());
+  } else {
+       reply->set_result(got);   // error
+       reply->set_length(0);
   }
-  //unlock_object(oid);
+  
+  dout(12) << "read got " << got << " / " << op->get_length() << " bytes from obj " << hex << oid << dec << endl;
+  
+  logger->inc("rd");
+  if (got >= 0) logger->inc("rdb", got);
+  
+  // send it
+  messenger->send_message(reply, op->get_asker());
+  
   delete op;
 }
 
 void OSD::op_stat(MOSDOp *op)
 {
   object_t oid = op->get_oid();
-  //lock_object(oid);
-  {
-       struct stat st;
-       memset(&st, sizeof(st), 0);
-       int r = store->stat(oid, &st);
-       
-       dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
-       
-       MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap, true);
-       reply->set_object_size(st.st_size);
-       messenger->send_message(reply, op->get_asker());
-       
-       logger->inc("stat");
-  }
-  //unlock_object(oid);
+
+  struct stat st;
+  memset(&st, sizeof(st), 0);
+  int r = store->stat(oid, &st);
+  
+  dout(3) << "stat on " << hex << oid << dec << " r = " << r << " size = " << st.st_size << endl;
+  
+  MOSDOpReply *reply = new MOSDOpReply(op, r, osdmap, true);
+  reply->set_object_size(st.st_size);
+  messenger->send_message(reply, op->get_asker());
+  
+  logger->inc("stat");
 
   delete op;
 }
@@ -2371,6 +2364,7 @@ void OSD::op_modify_safe(OSDReplicaOp *repop)
 {
   dout(10) << "op_modify_safe on op " << *repop->op << endl;
   get_repop(repop);
+  assert(!repop->local_safe);
   repop->local_safe = true;
   put_repop(repop);
 }
@@ -2424,6 +2418,7 @@ void OSD::op_modify(MOSDOp *op)
                pg->add_object(store, oid);          // FIXME : be careful w/ locking
          
          get_repop(repop);
+         assert(!repop->local_ack);
          repop->local_ack = true;
          put_repop(repop);
 
@@ -2434,6 +2429,8 @@ void OSD::op_modify(MOSDOp *op)
          // truncate
          r = store->truncate(oid, op->get_offset());
          get_repop(repop);
+         assert(!repop->local_ack);
+         assert(!repop->local_safe);
          repop->local_ack = true;
          repop->local_safe = true;
          put_repop(repop);
@@ -2443,6 +2440,8 @@ void OSD::op_modify(MOSDOp *op)
          pg->remove_object(store, op->get_oid());  // be careful with locking
          r = store->remove(oid);
          get_repop(repop);
+         assert(!repop->local_ack);
+         assert(!repop->local_safe);
          repop->local_ack = true;
          repop->local_safe = true;
          put_repop(repop);
index 816bdda47fdded91a82dbd9f9241e575a28b36e0..954797c321e47470cec2711ea51d4a09def38a96 100644 (file)
@@ -63,9 +63,15 @@ class OSDReplicaOp {
        sent_ack(false), sent_safe(false),
        new_version(nv), old_version(ov)
        { }
-  bool can_send_ack() { return !sent_ack && !cancel && local_ack && waitfor_ack.empty(); }
-  bool can_send_safe() { return !sent_safe && !cancel && local_ack && local_safe && waitfor_safe.empty(); }
-  bool can_delete() { return local_safe && (cancel || waitfor_safe.empty()); }
+  bool can_send_ack() { return !sent_ack && !cancel && 
+                                                 local_ack && 
+                                                 waitfor_ack.empty(); }
+  bool can_send_safe() { return !sent_safe && !cancel && 
+                                                  local_ack && local_safe && 
+                                                  waitfor_ack.empty() && waitfor_safe.empty(); }
+  bool can_delete() { return local_safe && 
+                                               (cancel || 
+                                                (waitfor_ack.empty() && waitfor_safe.empty())); }
 };
 
 inline ostream& operator<<(ostream& out, OSDReplicaOp& repop)
@@ -75,6 +81,7 @@ inline ostream& operator<<(ostream& out, OSDReplicaOp& repop)
   if (repop.local_safe) out << " local_safe";
   if (repop.cancel) out << " cancel";
   out << " op=" << repop.op;
+  out << " repop=" << &repop;
   out << ")";
   return out;
 }