]> git.apps.os.sepia.ceph.com Git - ceph.git/commitdiff
osd: MODIFY is a flag; fix up op_read
authorSage Weil <sage@newdream.net>
Mon, 10 Nov 2008 18:36:24 +0000 (10:36 -0800)
committerSage Weil <sage@newdream.net>
Mon, 10 Nov 2008 18:36:24 +0000 (10:36 -0800)
src/include/ceph_fs.h
src/messages/MOSDOp.h
src/messages/MOSDOpReply.h
src/osd/ReplicatedPG.cc
src/osdc/Objecter.cc

index 5c8ea57ea289e675c2be4cbaf27f37b7864d1ea4..9954419d86931c2d67d42ab438538aaaf3dc2656 100644 (file)
@@ -1082,10 +1082,10 @@ enum {
        CEPH_OSD_OP_DNLOCK     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_LOCK | 6,
 
        /* fancy read */
-       CEPH_OSD_OP_GREP       = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 1,
+       CEPH_OSD_OP_GREP       = CEPH_OSD_OP_MODE_RD | CEPH_OSD_OP_TYPE_DATA | 3,
 
        /* fancy write */
-       CEPH_OSD_OP_APPEND     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 1,
+       CEPH_OSD_OP_APPEND     = CEPH_OSD_OP_MODE_WR | CEPH_OSD_OP_TYPE_DATA | 6,
 };
 
 static inline int ceph_osd_op_type_lock(int op)
@@ -1156,10 +1156,11 @@ enum {
        CEPH_OSD_OP_SAFE = 2,         /* want (or is) "safe" ack */
        CEPH_OSD_OP_RETRY = 4,        /* resend attempt */
        CEPH_OSD_OP_INCLOCK_FAIL = 8, /* fail on inclock collision */
-       CEPH_OSD_OP_BALANCE_READS = 16,
+       CEPH_OSD_OP_MODIFY = 16,      /* op is/was a mutation */
        CEPH_OSD_OP_ACKNVRAM = 32,    /* ACK when stable in NVRAM, not RAM */
        CEPH_OSD_OP_ORDERSNAP = 64,   /* EOLDSNAP if snapc is out of order */
        CEPH_OSD_OP_PEERSTAT = 128,   /* msg includes osd_peer_stat */
+       CEPH_OSD_OP_BALANCE_READS = 256,
 };
 
 #define EOLDSNAPC 44 /* ORDERSNAP flag set and writer has old snap context*/
@@ -1195,8 +1196,7 @@ struct ceph_osd_request_head {
 
        /* read or mutation */
        __le16 num_ops;
-       __u8 is_modify;
-       __u8 object_type;
+       __u16 object_type;
        struct ceph_osd_op ops[];  /* followed by snaps */
 } __attribute__ ((packed));
 
@@ -1210,8 +1210,7 @@ struct ceph_osd_reply_head {
 
        __le32 result;
 
-       __le16 num_ops;
-       __le16 is_modify;
+       __le32 num_ops;
        struct ceph_osd_op ops[0];
 } __attribute__ ((packed));
 
index 06e965fec43edceb442cecb3a65e6f698deba436..5fe4291f26072bbcb45ddd3c9d3ed4d6cc845dca 100644 (file)
@@ -56,7 +56,7 @@ public:
 
   eversion_t get_version() { return head.reassert_version; }
   
-  bool is_modify() { return head.is_modify; }
+  bool is_modify() { return head.flags & CEPH_OSD_OP_MODIFY; }
 
   unsigned get_inc_lock() const { return head.inc_lock; }
 
@@ -74,14 +74,13 @@ public:
  
 
 
-  MOSDOp(int inc, long tid, bool modify,
+  MOSDOp(int inc, long tid,
          object_t oid, ceph_object_layout ol, epoch_t mapepoch,
         int flags) :
     Message(CEPH_MSG_OSD_OP) {
     memset(&head, 0, sizeof(head));
     head.tid = tid;
     head.client_inc = inc;
-    head.is_modify = modify;
     head.oid = oid;
     head.layout = ol;
     head.osdmap_epoch = mapepoch;
index 3526b23f34910bdbaba1eff213d1fa4795866451..246a71b904efb63a09c2ad517b67dd6c0041e7fd 100644 (file)
@@ -43,7 +43,7 @@ class MOSDOpReply : public Message {
   __s32 get_result() { return head.result; }
   eversion_t get_version() { return head.reassert_version; }
 
-  bool is_modify() { return head.is_modify; }
+  bool is_modify() { return head.flags & CEPH_OSD_OP_MODIFY; }
 
   void set_result(int r) { head.result = r; }
   void set_version(eversion_t v) { head.reassert_version = v; }
@@ -57,10 +57,11 @@ public:
     Message(CEPH_MSG_OSD_OPREPLY) {
     memset(&head, 0, sizeof(head));
     head.tid = req->head.tid;
-    head.is_modify = req->is_modify();
     ops = req->ops;
     head.result = result;
-    head.flags = commit ? CEPH_OSD_OP_SAFE:0;
+    head.flags =
+      (req->head.flags & ~(CEPH_OSD_OP_SAFE|CEPH_OSD_OP_ACK)) |
+      (commit ? CEPH_OSD_OP_SAFE:CEPH_OSD_OP_ACK);
     head.oid = req->head.oid;
     head.layout = req->head.layout;
     head.osdmap_epoch = e;
index cec74ec80aea6aceb755860c2f577af5cf87bc29..743fae95964aeab5316e97347a0e75c21b8cd967 100644 (file)
@@ -171,11 +171,11 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
        ceph_object_layout layout;
        layout.ol_pgid = info.pgid.u.pg64;
        layout.ol_stripe_unit = 0;
-       MOSDOp *pop = new MOSDOp(0, osd->get_tid(), true,
+       MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
                                 oid,
                                 layout,
                                 osd->osdmap->get_epoch(),
-                                0);
+                                CEPH_OSD_OP_MODIFY);
        pop->add_simple_op(CEPH_OSD_OP_BALANCEREADS, 0, 0);
        do_op(pop);
       }
@@ -186,11 +186,11 @@ bool ReplicatedPG::preprocess_op(MOSDOp *op, utime_t now)
        ceph_object_layout layout;
        layout.ol_pgid = info.pgid.u.pg64;
        layout.ol_stripe_unit = 0;
-       MOSDOp *pop = new MOSDOp(0, osd->get_tid(), true,
+       MOSDOp *pop = new MOSDOp(0, osd->get_tid(),
                                 oid,
                                 layout,
                                 osd->osdmap->get_epoch(),
-                                0);
+                                CEPH_OSD_OP_MODIFY);
        pop->add_simple_op(CEPH_OSD_OP_UNBALANCEREADS, 0, 0);
        do_op(pop);
       }
@@ -626,19 +626,16 @@ void ReplicatedPG::op_read(MOSDOp *op)
   object_t oid = op->get_oid();
   pobject_t poid(info.pgid.pool(), 0, oid);
 
-  ceph_osd_op& readop = op->ops[0];
-
-  dout(10) << "op_read " << ceph_osd_op_name(readop.op)
-          << " " << oid 
-           << " " << readop.offset << "~" << readop.length
-           << dendl;
+  dout(10) << "op_read " << oid << " " << op->ops << dendl;
   
   // wrlocked?
   if (block_if_wrlocked(op)) 
     return;
 
-  // taking read shedding out for now, because i don't want to embed
-  // the osd_peer_stat in MOSDOp
+
+  bufferlist data;
+  int data_off = 0;
+  int result = 0;
 
   // !primary and unbalanced?
   //  (ignore ops forwarded from the primary)
@@ -683,14 +680,10 @@ void ReplicatedPG::op_read(MOSDOp *op)
     }
   }
 
-  // set up reply
-  MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); 
-  long r = 0;
-
   // do it.
   if (poid.oid.snap && !pick_read_snap(poid)) {
     // we have no revision for this request.
-    r = -ENOENT;
+    result = -ENOENT;
     goto done;
   } 
   
@@ -701,50 +694,68 @@ void ReplicatedPG::op_read(MOSDOp *op)
     if (cur > op->get_inc_lock()) {
       dout(10) << " inc_lock " << cur << " > " << op->get_inc_lock()
               << " on " << poid << dendl;
-      r = -EINCLOCKED;
+      result = -EINCLOCKED;
       goto done;
     }
   }
-  
-  switch (readop.op) {
-  case CEPH_OSD_OP_READ:
-    {
-      // read into a buffer
-      bufferlist bl;
-      r = osd->store->read(info.pgid.to_coll(), poid, 
-                          readop.offset, readop.length,
-                          bl);
-      reply->set_data(bl);
-      reply->get_header().data_off = readop.offset;
-      if (r >= 0) 
-       reply->ops[0].length = r;
-      else
-       reply->ops[0].length = 0;
-      dout(10) << " read got " << r << " / " << readop.length << " bytes from obj " << oid << dendl;
-    }
-    osd->logger->inc("c_rd");
-    osd->logger->inc("c_rdb", reply->ops[0].length);
-    break;
-    
-  case CEPH_OSD_OP_STAT:
-    {
-      struct stat st;
-      memset(&st, sizeof(st), 0);
-      r = osd->store->stat(info.pgid.to_coll(), poid, &st);
-      if (r >= 0)
-       reply->ops[0].length = st.st_size;
+
+  for (vector<ceph_osd_op>::iterator p = op->ops.begin(); p != op->ops.end(); p++) {
+    switch (p->op) {
+    case CEPH_OSD_OP_READ:
+      {
+       // read into a buffer
+       bufferlist bl;
+       int r = osd->store->read(info.pgid.to_coll(), poid, p->offset, p->length, bl);
+       if (data.length() == 0)
+         data_off = p->offset;
+       data.claim(bl);
+       if (r >= 0) 
+         p->length = r;
+       else {
+         result = r;
+         p->length = 0;
+       }
+       dout(10) << " read got " << r << " / " << p->length << " bytes from obj " << oid << dendl;
+      }
+      osd->logger->inc("c_rd");
+      osd->logger->inc("c_rdb", p->length);
+      break;
+      
+    case CEPH_OSD_OP_STAT:
+      {
+       struct stat st;
+       memset(&st, sizeof(st), 0);
+       int r = osd->store->stat(info.pgid.to_coll(), poid, &st);
+       if (r >= 0)
+         p->length = st.st_size;
+       else
+         result = r;
+      }
+      break;
+      
+    case CEPH_OSD_OP_GREP:
+      {
+       
+      }
+      break;
+      
+    default:
+      dout(1) << "unrecognized osd op " << p->op
+             << " " << ceph_osd_op_name(p->op)
+             << dendl;
+      result = -EOPNOTSUPP;
+      assert(0);  // for now
     }
-    break;
-    
-  default:
-      assert(0);
   }
   
-  
  done:
-  if (r >= 0) {
-    reply->set_result(0);
+  // reply
+  MOSDOpReply *reply = new MOSDOpReply(op, 0, osd->osdmap->get_epoch(), true); 
+  reply->set_data(data);
+  reply->get_header().data_off = data_off;
+  reply->set_result(result);
 
+  if (result >= 0) {
     utime_t now = g_clock.now();
     utime_t diff = now;
     diff -= op->get_recv_stamp();
@@ -756,12 +767,8 @@ void ReplicatedPG::op_read(MOSDOp *op)
     if (is_primary() &&
        g_conf.osd_balance_reads)
       stat_object_temp_rd[oid].hit(now);  // hit temp.
-
-  } else {
-    reply->set_result(r);   // error
   }
-  
-  // send it
+
   osd->messenger->send_message(reply, op->get_orig_source_inst());
   
   delete op;
index 8dbbd98be68823ad014b809312304f772fb48708..71a8a597e4f11940323f3660d39802f98061fb1f 100644 (file)
@@ -350,7 +350,7 @@ tid_t Objecter::read_submit(ReadOp *rd)
     int flags = rd->flags;
     if (rd->onfinish)
       flags |= CEPH_OSD_OP_ACK;
-    MOSDOp *m = new MOSDOp(client_inc, last_tid, false,
+    MOSDOp *m = new MOSDOp(client_inc, last_tid,
                           rd->oid, rd->layout, osdmap->get_epoch(), 
                           flags);
     m->ops = rd->ops;
@@ -425,11 +425,11 @@ void Objecter::handle_osd_read_reply(MOSDOpReply *m)
 
   if (rd->pbl)
     rd->pbl->claim(m->get_data());
-  if (rd->psize) {
-    ceph_osd_op& op = m->ops[0];
-    *(rd->psize) = op.length;
-  }
-
+  if (rd->psize)
+    for (vector<ceph_osd_op>::iterator p = m->ops.begin(); p != m->ops.end(); p++)
+      if (p->op == CEPH_OSD_OP_STAT)
+       *(rd->psize) = p->length;
+  
   // finish, clean up
   Context *onfinish = rd->onfinish;
   dout(7) << " " << bytes_read << " bytes " << dendl;
@@ -482,9 +482,9 @@ tid_t Objecter::modify_submit(ModifyOp *wr)
            << " osd" << pg.primary()
            << dendl;
   if (pg.primary() >= 0) {
-    MOSDOp *m = new MOSDOp(client_inc, wr->tid, true,
+    MOSDOp *m = new MOSDOp(client_inc, wr->tid,
                           wr->oid, wr->layout, osdmap->get_epoch(),
-                          flags);
+                          flags | CEPH_OSD_OP_MODIFY);
     m->ops = wr->ops;
     m->set_snap_seq(wr->snapc.seq);
     m->get_snaps() = wr->snapc.snaps;