]> git-server-git.apps.pok.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: verify inc_lock
authorSage Weil <sage@newdream.net>
Mon, 24 Mar 2008 18:27:24 +0000 (11:27 -0700)
committerSage Weil <sage@newdream.net>
Mon, 24 Mar 2008 18:27:24 +0000 (11:27 -0700)
src/messages/MOSDOp.h
src/osd/ReplicatedPG.cc
src/osdc/Objecter.cc
src/osdc/Objecter.h

index 79a92d0013803bcededcefd0358186fe4ef96373..6eb97fce841bd7a388074841a539605f7ac470b6 100644 (file)
@@ -27,6 +27,8 @@
  *
  */
 
+#define EINCLOCKED 100
+
 class MOSDOp : public Message {
 public:
   static const char* get_opname(int op) {
@@ -91,6 +93,8 @@ public:
   off_t get_length() const { return le64_to_cpu(head.length); }
   off_t get_offset() const { return le64_to_cpu(head.offset); }
 
+  unsigned get_inc_lock() const { return le32_to_cpu(head.inc_lock); }
+
   void set_peer_stat(const osd_peer_stat_t& stat) { head.peer_stat = stat; }
   const ceph_osd_peer_stat& get_peer_stat() { return head.peer_stat; }
 
index 5603e8054fc98b5ef6962dcb8aff4a1534590047..2d18171d952e9241ae122b5560d450ce98b7ac9f 100644 (file)
@@ -512,41 +512,56 @@ void ReplicatedPG::op_read(MOSDOp *op)
   if (oid.rev && !pick_object_rev(oid)) {
     // we have no revision for this request.
     r = -EEXIST;
-  } else {
-    switch (op->get_op()) {
-    case CEPH_OSD_OP_READ:
-      {
-       // read into a buffer
-       bufferlist bl;
-       r = osd->store->read(poid, 
-                            op->get_offset(), op->get_length(),
-                            bl);
-       reply->set_data(bl);
-       if (r >= 0) 
-         reply->set_length(r);
-       else
-         reply->set_length(0);
-       dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
-      }
-      osd->logger->inc("c_rd");
-      osd->logger->inc("c_rdb", op->get_length());
-      break;
-
-    case CEPH_OSD_OP_STAT:
-      {
-       struct stat st;
-       memset(&st, sizeof(st), 0);
-       r = osd->store->stat(poid, &st);
-       if (r >= 0)
-         reply->set_length(st.st_size);
-      }
-      break;
-
-    default:
-      assert(0);
+    goto done;
+  } 
+  
+  // check inc_lock?
+  if (op->get_inc_lock() > 0) {
+    __u32 cur = 0;
+    osd->store->getattr(poid, "inc_lock", &cur, sizeof(cur));
+    if (cur > op->get_inc_lock()) {
+      dout(10) << " inc_lock " << cur << " > " << op->get_inc_lock()
+              << " on " << poid << dendl;
+      r = -EINCLOCKED;
+      goto done;
     }
   }
   
+  switch (op->get_op()) {
+  case CEPH_OSD_OP_READ:
+    {
+      // read into a buffer
+      bufferlist bl;
+      r = osd->store->read(poid, 
+                          op->get_offset(), op->get_length(),
+                          bl);
+      reply->set_data(bl);
+      if (r >= 0) 
+       reply->set_length(r);
+      else
+       reply->set_length(0);
+      dout(10) << " read got " << r << " / " << op->get_length() << " bytes from obj " << oid << dendl;
+    }
+    osd->logger->inc("c_rd");
+    osd->logger->inc("c_rdb", op->get_length());
+    break;
+    
+  case CEPH_OSD_OP_STAT:
+    {
+      struct stat st;
+      memset(&st, sizeof(st), 0);
+      r = osd->store->stat(poid, &st);
+      if (r >= 0)
+       reply->set_length(st.st_size);
+    }
+    break;
+    
+  default:
+      assert(0);
+  }
+  
+  
+ done:
   if (r >= 0) {
     reply->set_result(0);
 
@@ -1141,6 +1156,20 @@ void ReplicatedPG::op_modify(MOSDOp *op)
       block_if_wrlocked(op)) 
     return; // op will be handled later, after the object unlocks
   
+  // check inc_lock?
+  if (op->get_inc_lock() > 0) {
+    __u32 cur = 0;
+    osd->store->getattr(poid, "inc_lock", &cur, sizeof(cur));
+    if (cur > op->get_inc_lock()) {
+      dout(10) << " inc_lock " << cur << " > " << op->get_inc_lock()
+              << " on " << poid << dendl;
+      MOSDOpReply *reply = new MOSDOpReply(op, -EINCLOCKED, osd->osdmap->get_epoch(), true);
+      osd->messenger->send_message(reply, op->get_client_inst());
+      delete op;
+      return;
+    }
+  }
+
   // balance-reads set?
   char v;
   if ((op->get_op() != CEPH_OSD_OP_BALANCEREADS && op->get_op() != CEPH_OSD_OP_UNBALANCEREADS) &&
index 1acf5eb41b9d51a4e0a93201c621355c15adf378..c72377df3655f89a371bd413a9790dde4270c456 100644 (file)
@@ -373,7 +373,7 @@ tid_t Objecter::stat_submit(OSDStat *st)
     MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
                           ex.oid, ex.layout, osdmap->get_epoch(), 
                           CEPH_OSD_OP_STAT, flags);
-    if (inc_lock >= 0) {
+    if (inc_lock > 0) {
       st->inc_lock = inc_lock;
       m->set_inc_lock(inc_lock);
     }
@@ -409,6 +409,12 @@ void Objecter::handle_osd_stat_reply(MOSDOpReply *m)
   if (pg.active_tids.empty()) close_pg( m->get_pg() );
   
   // success?
+  if (m->get_result() == -EINCLOCKED) {
+    dout(7) << " got -EINCLOCKED, resubmitting" << dendl;
+    stat_submit(st);
+    delete m;
+    return;
+  }
   if (m->get_result() == -EAGAIN) {
     dout(7) << " got -EAGAIN, resubmitting" << dendl;
     stat_submit(st);
@@ -495,7 +501,7 @@ tid_t Objecter::readx_submit(OSDRead *rd, ObjectExtent &ex, bool retry)
     MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, last_tid,
                           ex.oid, ex.layout, osdmap->get_epoch(), 
                           CEPH_OSD_OP_READ, flags);
-    if (inc_lock >= 0) {
+    if (inc_lock > 0) {
       rd->inc_lock = inc_lock;
       m->set_inc_lock(inc_lock);
     }
@@ -543,8 +549,9 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
   rd->ops.erase(tid);
 
   // success?
-  if (m->get_result() == -EAGAIN) {
-    dout(7) << " got -EAGAIN, resubmitting" << dendl;
+  if (m->get_result() == -EAGAIN ||
+      m->get_result() == -EINCLOCKED) {
+    dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl;
     readx_submit(rd, rd->ops[tid], true);
     delete m;
     return;
@@ -785,7 +792,7 @@ tid_t Objecter::modifyx_submit(OSDModify *wr, ObjectExtent &ex, tid_t usetid)
     MOSDOp *m = new MOSDOp(messenger->get_myinst(), client_inc, tid,
                           ex.oid, ex.layout, osdmap->get_epoch(),
                           wr->op, flags);
-    if (inc_lock >= 0) {
+    if (inc_lock > 0) {
       wr->inc_lock = inc_lock;
       m->set_inc_lock(inc_lock);
     }
@@ -855,8 +862,9 @@ void Objecter::handle_osd_modify_reply(MOSDOpReply *m)
     delete m;
     return;
   }
-  if (m->get_result() == -EAGAIN) {
-    dout(7) << " got -EAGAIN, resubmitting" << dendl;
+  if (m->get_result() == -EAGAIN ||
+      m->get_result() == -EINCLOCKED) {
+    dout(7) << " got -EAGAIN or -EINCLOCKED, resubmitting" << dendl;
     if (wr->onack) num_unacked--;
     if (wr->oncommit) num_uncommitted--;
     if (wr->waitfor_ack.count(tid)) 
index b1cb68f91400dceac87ee7003b428dea8a43ecd6..de50740a34927e1045ad95096a1412d3ab0ced6a 100644 (file)
@@ -72,7 +72,7 @@ class Objecter {
   public:
     list<ObjectExtent> extents;
     int inc_lock;
-    OSDOp() : inc_lock(-1) {}
+    OSDOp() : inc_lock(0) {}
     virtual ~OSDOp() {}
   };
 
@@ -191,7 +191,7 @@ class Objecter {
  public:
   Objecter(Messenger *m, MonMap *mm, OSDMap *om, Mutex& l) : 
     messenger(m), monmap(mm), osdmap(om), 
-    last_tid(0), client_inc(-1), inc_lock(-1),
+    last_tid(0), client_inc(-1), inc_lock(0),
     num_unacked(0), num_uncommitted(0),
     last_epoch_requested(0),
     client_lock(l), timer(l)